From 66f0ddac0e2f8311a60a21628489e0eafee3fdcf Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 29 Dec 2015 15:41:12 -0800 Subject: [PATCH] pkg/client: fix directory upload deadlock, clean up HTTP handling Unexports pkg/client.Client.TransportForConfig in the process. Fixes camlistore/camlistore#662 Change-Id: Ifc2d6b0fee70d5db86654897d2d377b343b6d38b --- cmd/camget/camget.go | 18 +++--- cmd/cammount/cammount.go | 3 - cmd/camput/camput.go | 29 +++++----- cmd/camput/camput_test.go | 1 - cmd/camput/files.go | 10 ++-- cmd/camput/init.go | 4 +- cmd/camput/uploader.go | 2 +- cmd/camtool/camtool.go | 16 ++---- cmd/camtool/sync.go | 13 +---- pkg/app/app.go | 4 -- pkg/client/client.go | 107 ++++++++++++++++++++++++----------- pkg/client/transport_test.go | 2 +- 12 files changed, 112 insertions(+), 97 deletions(-) diff --git a/cmd/camget/camget.go b/cmd/camget/camget.go index 17c36f945..6f4aca58a 100644 --- a/cmd/camget/camget.go +++ b/cmd/camget/camget.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "log" - "net/http" "os" "path/filepath" @@ -32,7 +31,6 @@ import ( "camlistore.org/pkg/cacher" "camlistore.org/pkg/client" "camlistore.org/pkg/cmdmain" - "camlistore.org/pkg/httputil" "camlistore.org/pkg/index" "camlistore.org/pkg/osutil" "camlistore.org/pkg/schema" @@ -77,6 +75,10 @@ func main() { var cl *client.Client var items []blob.Ref + optTransportConfig := client.OptionTransportConfig(&client.TransportConfig{ + Verbose: *flagHTTP, + }) + if *flagShared != "" { if client.ExplicitServer() != "" { log.Fatal("Can't use --shared with an explicit blobserver; blobserver is implicit from the --shared URL.") @@ -86,7 +88,9 @@ func main() { } cl1, target, err := client.NewFromShareRoot(*flagShared, client.OptionInsecure(*flagInsecureTLS), - client.OptionTrustedCert(*flagTrustedCert)) + client.OptionTrustedCert(*flagTrustedCert), + optTransportConfig, + ) if err != nil { log.Fatal(err) } @@ -96,7 +100,7 @@ func main() { if *flagTrustedCert != "" { log.Fatal("Can't use --cert without --shared.") } - cl = client.NewOrFail(client.OptionInsecure(*flagInsecureTLS)) + cl = client.NewOrFail(client.OptionInsecure(*flagInsecureTLS), optTransportConfig) for n := 0; n < flag.NArg(); n++ { arg := flag.Arg(n) br, ok := blob.Parse(arg) @@ -107,11 +111,7 @@ func main() { } } - tr := cl.TransportForConfig(&client.TransportConfig{ - Verbose: *flagHTTP, - }) - httpStats, _ := tr.(*httputil.StatsTransport) - cl.SetHTTPClient(&http.Client{Transport: tr}) + httpStats := cl.HTTPStats() diskCacheFetcher, err := cacher.NewDiskCache(cl) if err != nil { diff --git a/cmd/cammount/cammount.go b/cmd/cammount/cammount.go index 773f3d101..cc8933787 100644 --- a/cmd/cammount/cammount.go +++ b/cmd/cammount/cammount.go @@ -23,7 +23,6 @@ import ( "fmt" "io/ioutil" "log" - "net/http" "os" "os/exec" "os/signal" @@ -114,7 +113,6 @@ func main() { } } else { cl = client.NewOrFail() // automatic from flags - cl.SetHTTPClient(&http.Client{Transport: cl.TransportForConfig(nil)}) var ok bool root, ok = blob.Parse(rootArg) @@ -137,7 +135,6 @@ func main() { } } else { cl = client.NewOrFail() // automatic from flags - cl.SetHTTPClient(&http.Client{Transport: cl.TransportForConfig(nil)}) } diskCacheFetcher, err := cacher.NewDiskCache(cl) diff --git a/cmd/camput/camput.go b/cmd/camput/camput.go index a6f70d950..0375fc922 100644 --- a/cmd/camput/camput.go +++ b/cmd/camput/camput.go @@ -67,9 +67,9 @@ func init() { stats := up.Stats() if *cmdmain.FlagVerbose { log.Printf("Client stats: %s", stats.String()) - if up.transport != nil { - log.Printf(" #HTTP reqs: %d", up.transport.Requests()) - h1, h2 := up.transport.ProtoVersions() + if up.stats != nil { + log.Printf(" #HTTP reqs: %d", up.stats.Requests()) + h1, h2 := up.stats.ProtoVersions() log.Printf(" responses: %d (h1), %d (h2)\n", h1, h2) } } @@ -153,18 +153,15 @@ func newUploader() *Uploader { } cc = client.NewStorageClient(ss) } else { - cc = client.NewOrFail() - proxy := http.ProxyFromEnvironment + var proxy func(*http.Request) (*url.URL, error) if flagProxyLocal { proxy = proxyFromEnvironment } - tr := cc.TransportForConfig( - &client.TransportConfig{ - Proxy: proxy, - Verbose: *flagHTTP, - }) - httpStats, _ = tr.(*httputil.StatsTransport) - cc.SetHTTPClient(&http.Client{Transport: tr}) + cc = client.NewOrFail(client.OptionTransportConfig(&client.TransportConfig{ + Proxy: proxy, + Verbose: *flagHTTP, + })) + httpStats = cc.HTTPStats() } if *cmdmain.FlagVerbose { cc.SetLogger(log.New(cmdmain.Stderr, "", log.LstdFlags)) @@ -178,10 +175,10 @@ func newUploader() *Uploader { } return &Uploader{ - Client: cc, - transport: httpStats, - pwd: pwd, - fdGate: syncutil.NewGate(100), // gate things that waste fds, assuming a low system limit + Client: cc, + stats: httpStats, + pwd: pwd, + fdGate: syncutil.NewGate(100), // gate things that waste fds, assuming a low system limit } } diff --git a/cmd/camput/camput_test.go b/cmd/camput/camput_test.go index 2af08960b..1c69360cc 100644 --- a/cmd/camput/camput_test.go +++ b/cmd/camput/camput_test.go @@ -189,7 +189,6 @@ func TestUploadDirectories(t *testing.T) { // uploaded in, so the directory comes before the file. This // was the old deadlock. defer setAndRestore(&uploadWorkers, 1)() - defer setAndRestore(&dirUploadWorkers, 1)() defer setAndRestore(&statCacheWorkers, 5)() e := &env{ diff --git a/cmd/camput/files.go b/cmd/camput/files.go index 5290136d9..0a0f6d5b4 100644 --- a/cmd/camput/files.go +++ b/cmd/camput/files.go @@ -71,7 +71,6 @@ var flagUseSQLiteChildCache bool // Use sqlite for the statcache and havecache. var ( uploadWorkers = 5 // concurrent upload workers (negative means unbounded: memory hog) - dirUploadWorkers = 3 // concurrent directory uploading workers statCacheWorkers = 5 // concurrent statcache workers ) @@ -105,7 +104,6 @@ func init() { flags.BoolVar(&cmd.argsFromInput, "stdinargs", false, "If true, filenames to upload are sent one-per-line on stdin. EOF means to quit the process with exit status 0.") // limit number of goroutines to limit memory uploadWorkers = 2 - dirUploadWorkers = 2 statCacheWorkers = 2 } flagCacheLog = flags.Bool("logcache", false, "log caching details") @@ -843,7 +841,7 @@ func (n *node) SetPutResult(res *client.PutResult, err error) { panic("SetPutResult called twice on node " + n.fullPath) } n.res, n.err = res, err - n.cond.Signal() + n.cond.Broadcast() } func (n *node) PutResult() (*client.PutResult, error) { @@ -1031,7 +1029,10 @@ func (t *TreeUpload) run() { } }) } else { - dirUpload := chanworker.NewWorker(dirUploadWorkers, func(el interface{}, ok bool) { + // dirUpload is unbounded because directories can depend on directories. + // We bound the number of HTTP requests in flight instead. + // TODO(bradfitz): remove this chanworker stuff? + dirUpload := chanworker.NewWorker(-1, func(el interface{}, ok bool) { if !ok { log.Printf("done uploading directories - done with all uploads.") uploadsdonec <- true @@ -1050,6 +1051,7 @@ func (t *TreeUpload) run() { if !ok { log.Printf("done with all uploads.") close(dirUpload) + log.Printf("closed dirUpload") return } n := el.(*node) diff --git a/cmd/camput/init.go b/cmd/camput/init.go index 1733837cb..35478dab0 100644 --- a/cmd/camput/init.go +++ b/cmd/camput/init.go @@ -22,7 +22,6 @@ import ( "fmt" "io/ioutil" "log" - "net/http" "os" "path/filepath" "strings" @@ -165,14 +164,13 @@ func (c *initCmd) clientConfigFromServer() (*clientconfig.Config, error) { cl := client.NewFromParams(server, auth.NewBasicAuth(fields[0], fields[1]), client.OptionInsecure(c.insecureTLS)) - cl.SetHTTPClient(&http.Client{Transport: cl.TransportForConfig(nil)}) - var cc clientconfig.Config helpRoot, err := cl.HelpRoot() if err != nil { return nil, err } + var cc clientconfig.Config if err := cl.GetJSON(helpRoot+"?clientConfig=true", &cc); err != nil { return nil, err } diff --git a/cmd/camput/uploader.go b/cmd/camput/uploader.go index c1b107b85..efab9b72a 100644 --- a/cmd/camput/uploader.go +++ b/cmd/camput/uploader.go @@ -42,7 +42,7 @@ type Uploader struct { // of a Client? altStatReceiver blobserver.StatReceiver - transport *httputil.StatsTransport // for HTTP statistics + stats *httputil.StatsTransport // if non-nil, HTTP statistics pwd string statCache UploadCache haveCache HaveCache diff --git a/cmd/camtool/camtool.go b/cmd/camtool/camtool.go index a34708341..d95004f1e 100644 --- a/cmd/camtool/camtool.go +++ b/cmd/camtool/camtool.go @@ -18,7 +18,6 @@ package main import ( "log" - "net/http" "camlistore.org/pkg/client" "camlistore.org/pkg/cmdmain" @@ -37,17 +36,12 @@ const serverFlagHelp = "Format is is either a URL prefix (with optional path), a // * host:port // * https?://host[:port][/path] func newClient(server string, opts ...client.ClientOption) *client.Client { - var cl *client.Client if server == "" { - cl = client.NewOrFail(opts...) - } else { - cl = client.New(server, opts...) - if err := cl.SetupAuth(); err != nil { - log.Fatalf("Could not setup auth for connecting to %v: %v", server, err) - } + return client.NewOrFail(opts...) + } + cl := client.New(server, opts...) + if err := cl.SetupAuth(); err != nil { + log.Fatalf("Could not setup auth for connecting to %v: %v", server, err) } - cl.SetHTTPClient(&http.Client{ - Transport: cl.TransportForConfig(nil), - }) return cl } diff --git a/cmd/camtool/sync.go b/cmd/camtool/sync.go index 5be8a823d..3be8e9323 100644 --- a/cmd/camtool/sync.go +++ b/cmd/camtool/sync.go @@ -185,12 +185,9 @@ func (c *syncCmd) storageFromParam(which storageType, val string) (blobserver.St return disk, nil } cl := client.New(val, client.OptionInsecure(c.insecureTLS)) - if httpClient == nil { - httpClient = &http.Client{ - Transport: cl.TransportForConfig(nil), - } + if httpClient != nil { + cl.SetHTTPClient(httpClient) } - cl.SetHTTPClient(httpClient) if err := cl.SetupAuth(); err != nil { return nil, fmt.Errorf("could not setup auth for connecting to %v: %v", val, err) } @@ -245,17 +242,11 @@ func (c *syncCmd) syncAll() error { for _, sh := range syncHandlers { from := client.New(sh.From, client.OptionInsecure(c.insecureTLS)) from.SetLogger(c.logger) - from.SetHTTPClient(&http.Client{ - Transport: from.TransportForConfig(nil), - }) if err := from.SetupAuth(); err != nil { return fmt.Errorf("could not setup auth for connecting to %v: %v", sh.From, err) } to := client.New(sh.To, client.OptionInsecure(c.insecureTLS)) to.SetLogger(c.logger) - to.SetHTTPClient(&http.Client{ - Transport: to.TransportForConfig(nil), - }) if err := to.SetupAuth(); err != nil { return fmt.Errorf("could not setup auth for connecting to %v: %v", sh.To, err) } diff --git a/pkg/app/app.go b/pkg/app/app.go index 66dc20eab..59c864999 100644 --- a/pkg/app/app.go +++ b/pkg/app/app.go @@ -21,7 +21,6 @@ package app import ( "errors" "fmt" - "net/http" "os" "strings" @@ -46,9 +45,6 @@ func Client() (*client.Client, error) { return nil, fmt.Errorf("invalid auth string syntax. got %q, want \"username:password\"", authString) } cl := client.NewFromParams(server, auth.NewBasicAuth(userpass[0], userpass[1])) - cl.SetHTTPClient(&http.Client{ - Transport: cl.TransportForConfig(nil), - }) return cl, nil } diff --git a/pkg/client/client.go b/pkg/client/client.go index 9b7252aa4..12f92d9c8 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -127,13 +127,15 @@ type Client struct { // a share. via map[string]string // target => via (target is referenced from via) - log *log.Logger // not nil - httpGate *syncutil.Gate + log *log.Logger // not nil + httpGate *syncutil.Gate + transportConfig *TransportConfig // or nil paramsOnly bool // config file and env vars are ignored. } -const maxParallelHTTP = 5 +const maxParallelHTTP_h1 = 5 +const maxParallelHTTP_h2 = 50 // New returns a new Camlistore Client. // The provided server is either "host:port" (assumed http, not https) or a URL prefix, with or without a path, or a server alias from the client configuration file. A server alias should not be confused with a hostname, therefore it cannot contain any colon or period. @@ -147,7 +149,7 @@ func New(server string, opts ...ClientOption) *Client { } server = serverConf.Server } - return newFromParams(server, auth.None{}, opts...) + return newClient(server, auth.None{}, opts...) } func NewOrFail(opts ...ClientOption) *Client { @@ -195,27 +197,37 @@ type TransportConfig struct { Verbose bool // Verbose enables verbose logging of HTTP requests. } -// TransportForConfig returns a transport for the client, setting the correct +func (c *Client) useHTTP2(tc *TransportConfig) bool { + if !c.useTLS() { + return false + } + if android.IsChild() { + // No particular reason; just untested so far. + return false + } + if os.Getenv("HTTPS_PROXY") != "" || os.Getenv("https_proxy") != "" || + (tc != nil && tc.Proxy != nil) { + // Also just untested. Which proxies support h2 anyway? + return false + } + return true +} + +// transportForConfig returns a transport for the client, setting the correct // Proxy, Dial, and TLSClientConfig if needed. It does not mutate c. // It is the caller's responsibility to then use that transport to set // the client's httpClient with SetHTTPClient. -func (c *Client) TransportForConfig(tc *TransportConfig) http.RoundTripper { +func (c *Client) transportForConfig(tc *TransportConfig) http.RoundTripper { if c == nil { return nil } var transport http.RoundTripper - useH2 := c.useTLS() && !android.IsChild() proxy := http.ProxyFromEnvironment if tc != nil && tc.Proxy != nil { proxy = tc.Proxy - useH2 = false - } else { - if os.Getenv("HTTPS_PROXY") != "" || os.Getenv("https_proxy") != "" { - useH2 = false - } } - if useH2 { + if c.useHTTP2(tc) { transport = &http2.Transport{ DialTLS: c.http2DialTLSFunc(), } @@ -224,7 +236,7 @@ func (c *Client) TransportForConfig(tc *TransportConfig) http.RoundTripper { DialTLS: c.DialTLSFunc(), Dial: c.DialFunc(), Proxy: proxy, - MaxIdleConnsPerHost: maxParallelHTTP, + MaxIdleConnsPerHost: maxParallelHTTP_h1, } } httpStats := &httputil.StatsTransport{ @@ -240,10 +252,29 @@ func (c *Client) TransportForConfig(tc *TransportConfig) http.RoundTripper { return transport } +// HTTPStats returns the client's underlying httputil.StatsTransport, if in use. +// If another transport is being used, nil is returned. +func (c *Client) HTTPStats() *httputil.StatsTransport { + st, _ := c.httpClient.Transport.(*httputil.StatsTransport) + return st +} + type ClientOption interface { modifyClient(*Client) } +func OptionTransportConfig(tc *TransportConfig) ClientOption { + return optionTransportConfig{tc} +} + +type optionTransportConfig struct { + tc *TransportConfig +} + +func (o optionTransportConfig) modifyClient(c *Client) { + c.transportConfig = o.tc +} + func OptionInsecure(v bool) ClientOption { return optionInsecure(v) } @@ -281,7 +312,7 @@ func NewFromShareRoot(shareBlobURL string, opts ...ClientOption) (c *Client, tar if m == nil { return nil, blob.Ref{}, fmt.Errorf("Unkown share URL base") } - c = New(m[1]) + c = New(m[1], opts...) c.discoOnce.Do(noop) c.prefixOnce.Do(noop) c.prefixv = m[1] @@ -290,11 +321,6 @@ func NewFromShareRoot(shareBlobURL string, opts ...ClientOption) (c *Client, tar c.via = make(map[string]string) root = m[2] - for _, v := range opts { - v.modifyClient(c) - } - c.SetHTTPClient(&http.Client{Transport: c.TransportForConfig(nil)}) - req := c.newRequest("GET", shareBlobURL, nil) res, err := c.expect2XX(req) if err != nil { @@ -1139,27 +1165,42 @@ func (c *Client) Close() error { // and auth but does not use any on-disk config files or environment variables // for its configuration. It may still use the disk for caches. func NewFromParams(server string, mode auth.AuthMode, opts ...ClientOption) *Client { - cl := newFromParams(server, mode, opts...) + cl := newClient(server, mode, opts...) cl.paramsOnly = true return cl } -func newFromParams(server string, mode auth.AuthMode, opts ...ClientOption) *Client { - httpClient := &http.Client{ - Transport: &http.Transport{ - MaxIdleConnsPerHost: maxParallelHTTP, - }, - } +// TODO(bradfitz): move auth mode into a ClientOption? And +// OptionNoDiskConfig to delete NewFromParams, etc, and just have New? + +func newClient(server string, mode auth.AuthMode, opts ...ClientOption) *Client { c := &Client{ - server: server, - httpClient: httpClient, - httpGate: syncutil.NewGate(maxParallelHTTP), - haveCache: noHaveCache{}, - log: log.New(os.Stderr, "", log.Ldate|log.Ltime), - authMode: mode, + server: server, + haveCache: noHaveCache{}, + log: log.New(os.Stderr, "", log.Ldate|log.Ltime), + authMode: mode, } for _, v := range opts { v.modifyClient(c) } + if c.httpClient == nil { + c.httpClient = &http.Client{ + Transport: c.transportForConfig(c.transportConfig), + } + } + c.httpGate = syncutil.NewGate(httpGateSize(c.httpClient.Transport)) return c } + +func httpGateSize(rt http.RoundTripper) int { + switch v := rt.(type) { + case *httputil.StatsTransport: + return httpGateSize(v.Transport) + case *http.Transport: + return maxParallelHTTP_h1 + case *http2.Transport: + return maxParallelHTTP_h2 + default: + return maxParallelHTTP_h1 // conservative default + } +} diff --git a/pkg/client/transport_test.go b/pkg/client/transport_test.go index 7b803bb60..b41ceb588 100644 --- a/pkg/client/transport_test.go +++ b/pkg/client/transport_test.go @@ -158,7 +158,7 @@ func TestTransportSetup(t *testing.T) { android.OnAndroidHook = func() bool { return tt.onAndroid } - rt := cl.TransportForConfig(nil) + rt := cl.transportForConfig(nil) var tr *http.Transport if tt.onAndroid { tr = rt.(*android.StatsTransport).Rt.(*httputil.StatsTransport).Transport.(*http.Transport)