pkg/client: fix directory upload deadlock, clean up HTTP handling

Unexports pkg/client.Client.TransportForConfig in the process.

Fixes camlistore/camlistore#662

Change-Id: Ifc2d6b0fee70d5db86654897d2d377b343b6d38b
This commit is contained in:
Brad Fitzpatrick 2015-12-29 15:41:12 -08:00
parent 4086789731
commit 66f0ddac0e
12 changed files with 112 additions and 97 deletions

View File

@ -23,7 +23,6 @@ import (
"fmt"
"io"
"log"
"net/http"
"os"
"path/filepath"
@ -32,7 +31,6 @@ import (
"camlistore.org/pkg/cacher"
"camlistore.org/pkg/client"
"camlistore.org/pkg/cmdmain"
"camlistore.org/pkg/httputil"
"camlistore.org/pkg/index"
"camlistore.org/pkg/osutil"
"camlistore.org/pkg/schema"
@ -77,6 +75,10 @@ func main() {
var cl *client.Client
var items []blob.Ref
optTransportConfig := client.OptionTransportConfig(&client.TransportConfig{
Verbose: *flagHTTP,
})
if *flagShared != "" {
if client.ExplicitServer() != "" {
log.Fatal("Can't use --shared with an explicit blobserver; blobserver is implicit from the --shared URL.")
@ -86,7 +88,9 @@ func main() {
}
cl1, target, err := client.NewFromShareRoot(*flagShared,
client.OptionInsecure(*flagInsecureTLS),
client.OptionTrustedCert(*flagTrustedCert))
client.OptionTrustedCert(*flagTrustedCert),
optTransportConfig,
)
if err != nil {
log.Fatal(err)
}
@ -96,7 +100,7 @@ func main() {
if *flagTrustedCert != "" {
log.Fatal("Can't use --cert without --shared.")
}
cl = client.NewOrFail(client.OptionInsecure(*flagInsecureTLS))
cl = client.NewOrFail(client.OptionInsecure(*flagInsecureTLS), optTransportConfig)
for n := 0; n < flag.NArg(); n++ {
arg := flag.Arg(n)
br, ok := blob.Parse(arg)
@ -107,11 +111,7 @@ func main() {
}
}
tr := cl.TransportForConfig(&client.TransportConfig{
Verbose: *flagHTTP,
})
httpStats, _ := tr.(*httputil.StatsTransport)
cl.SetHTTPClient(&http.Client{Transport: tr})
httpStats := cl.HTTPStats()
diskCacheFetcher, err := cacher.NewDiskCache(cl)
if err != nil {

View File

@ -23,7 +23,6 @@ import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"os/exec"
"os/signal"
@ -114,7 +113,6 @@ func main() {
}
} else {
cl = client.NewOrFail() // automatic from flags
cl.SetHTTPClient(&http.Client{Transport: cl.TransportForConfig(nil)})
var ok bool
root, ok = blob.Parse(rootArg)
@ -137,7 +135,6 @@ func main() {
}
} else {
cl = client.NewOrFail() // automatic from flags
cl.SetHTTPClient(&http.Client{Transport: cl.TransportForConfig(nil)})
}
diskCacheFetcher, err := cacher.NewDiskCache(cl)

View File

@ -67,9 +67,9 @@ func init() {
stats := up.Stats()
if *cmdmain.FlagVerbose {
log.Printf("Client stats: %s", stats.String())
if up.transport != nil {
log.Printf(" #HTTP reqs: %d", up.transport.Requests())
h1, h2 := up.transport.ProtoVersions()
if up.stats != nil {
log.Printf(" #HTTP reqs: %d", up.stats.Requests())
h1, h2 := up.stats.ProtoVersions()
log.Printf(" responses: %d (h1), %d (h2)\n", h1, h2)
}
}
@ -153,18 +153,15 @@ func newUploader() *Uploader {
}
cc = client.NewStorageClient(ss)
} else {
cc = client.NewOrFail()
proxy := http.ProxyFromEnvironment
var proxy func(*http.Request) (*url.URL, error)
if flagProxyLocal {
proxy = proxyFromEnvironment
}
tr := cc.TransportForConfig(
&client.TransportConfig{
Proxy: proxy,
Verbose: *flagHTTP,
})
httpStats, _ = tr.(*httputil.StatsTransport)
cc.SetHTTPClient(&http.Client{Transport: tr})
cc = client.NewOrFail(client.OptionTransportConfig(&client.TransportConfig{
Proxy: proxy,
Verbose: *flagHTTP,
}))
httpStats = cc.HTTPStats()
}
if *cmdmain.FlagVerbose {
cc.SetLogger(log.New(cmdmain.Stderr, "", log.LstdFlags))
@ -178,10 +175,10 @@ func newUploader() *Uploader {
}
return &Uploader{
Client: cc,
transport: httpStats,
pwd: pwd,
fdGate: syncutil.NewGate(100), // gate things that waste fds, assuming a low system limit
Client: cc,
stats: httpStats,
pwd: pwd,
fdGate: syncutil.NewGate(100), // gate things that waste fds, assuming a low system limit
}
}

View File

@ -189,7 +189,6 @@ func TestUploadDirectories(t *testing.T) {
// uploaded in, so the directory comes before the file. This
// was the old deadlock.
defer setAndRestore(&uploadWorkers, 1)()
defer setAndRestore(&dirUploadWorkers, 1)()
defer setAndRestore(&statCacheWorkers, 5)()
e := &env{

View File

@ -71,7 +71,6 @@ var flagUseSQLiteChildCache bool // Use sqlite for the statcache and havecache.
var (
uploadWorkers = 5 // concurrent upload workers (negative means unbounded: memory hog)
dirUploadWorkers = 3 // concurrent directory uploading workers
statCacheWorkers = 5 // concurrent statcache workers
)
@ -105,7 +104,6 @@ func init() {
flags.BoolVar(&cmd.argsFromInput, "stdinargs", false, "If true, filenames to upload are sent one-per-line on stdin. EOF means to quit the process with exit status 0.")
// limit number of goroutines to limit memory
uploadWorkers = 2
dirUploadWorkers = 2
statCacheWorkers = 2
}
flagCacheLog = flags.Bool("logcache", false, "log caching details")
@ -843,7 +841,7 @@ func (n *node) SetPutResult(res *client.PutResult, err error) {
panic("SetPutResult called twice on node " + n.fullPath)
}
n.res, n.err = res, err
n.cond.Signal()
n.cond.Broadcast()
}
func (n *node) PutResult() (*client.PutResult, error) {
@ -1031,7 +1029,10 @@ func (t *TreeUpload) run() {
}
})
} else {
dirUpload := chanworker.NewWorker(dirUploadWorkers, func(el interface{}, ok bool) {
// dirUpload is unbounded because directories can depend on directories.
// We bound the number of HTTP requests in flight instead.
// TODO(bradfitz): remove this chanworker stuff?
dirUpload := chanworker.NewWorker(-1, func(el interface{}, ok bool) {
if !ok {
log.Printf("done uploading directories - done with all uploads.")
uploadsdonec <- true
@ -1050,6 +1051,7 @@ func (t *TreeUpload) run() {
if !ok {
log.Printf("done with all uploads.")
close(dirUpload)
log.Printf("closed dirUpload")
return
}
n := el.(*node)

View File

@ -22,7 +22,6 @@ import (
"fmt"
"io/ioutil"
"log"
"net/http"
"os"
"path/filepath"
"strings"
@ -165,14 +164,13 @@ func (c *initCmd) clientConfigFromServer() (*clientconfig.Config, error) {
cl := client.NewFromParams(server,
auth.NewBasicAuth(fields[0], fields[1]),
client.OptionInsecure(c.insecureTLS))
cl.SetHTTPClient(&http.Client{Transport: cl.TransportForConfig(nil)})
var cc clientconfig.Config
helpRoot, err := cl.HelpRoot()
if err != nil {
return nil, err
}
var cc clientconfig.Config
if err := cl.GetJSON(helpRoot+"?clientConfig=true", &cc); err != nil {
return nil, err
}

View File

@ -42,7 +42,7 @@ type Uploader struct {
// of a Client?
altStatReceiver blobserver.StatReceiver
transport *httputil.StatsTransport // for HTTP statistics
stats *httputil.StatsTransport // if non-nil, HTTP statistics
pwd string
statCache UploadCache
haveCache HaveCache

View File

@ -18,7 +18,6 @@ package main
import (
"log"
"net/http"
"camlistore.org/pkg/client"
"camlistore.org/pkg/cmdmain"
@ -37,17 +36,12 @@ const serverFlagHelp = "Format is is either a URL prefix (with optional path), a
// * host:port
// * https?://host[:port][/path]
func newClient(server string, opts ...client.ClientOption) *client.Client {
var cl *client.Client
if server == "" {
cl = client.NewOrFail(opts...)
} else {
cl = client.New(server, opts...)
if err := cl.SetupAuth(); err != nil {
log.Fatalf("Could not setup auth for connecting to %v: %v", server, err)
}
return client.NewOrFail(opts...)
}
cl := client.New(server, opts...)
if err := cl.SetupAuth(); err != nil {
log.Fatalf("Could not setup auth for connecting to %v: %v", server, err)
}
cl.SetHTTPClient(&http.Client{
Transport: cl.TransportForConfig(nil),
})
return cl
}

View File

@ -185,12 +185,9 @@ func (c *syncCmd) storageFromParam(which storageType, val string) (blobserver.St
return disk, nil
}
cl := client.New(val, client.OptionInsecure(c.insecureTLS))
if httpClient == nil {
httpClient = &http.Client{
Transport: cl.TransportForConfig(nil),
}
if httpClient != nil {
cl.SetHTTPClient(httpClient)
}
cl.SetHTTPClient(httpClient)
if err := cl.SetupAuth(); err != nil {
return nil, fmt.Errorf("could not setup auth for connecting to %v: %v", val, err)
}
@ -245,17 +242,11 @@ func (c *syncCmd) syncAll() error {
for _, sh := range syncHandlers {
from := client.New(sh.From, client.OptionInsecure(c.insecureTLS))
from.SetLogger(c.logger)
from.SetHTTPClient(&http.Client{
Transport: from.TransportForConfig(nil),
})
if err := from.SetupAuth(); err != nil {
return fmt.Errorf("could not setup auth for connecting to %v: %v", sh.From, err)
}
to := client.New(sh.To, client.OptionInsecure(c.insecureTLS))
to.SetLogger(c.logger)
to.SetHTTPClient(&http.Client{
Transport: to.TransportForConfig(nil),
})
if err := to.SetupAuth(); err != nil {
return fmt.Errorf("could not setup auth for connecting to %v: %v", sh.To, err)
}

View File

@ -21,7 +21,6 @@ package app
import (
"errors"
"fmt"
"net/http"
"os"
"strings"
@ -46,9 +45,6 @@ func Client() (*client.Client, error) {
return nil, fmt.Errorf("invalid auth string syntax. got %q, want \"username:password\"", authString)
}
cl := client.NewFromParams(server, auth.NewBasicAuth(userpass[0], userpass[1]))
cl.SetHTTPClient(&http.Client{
Transport: cl.TransportForConfig(nil),
})
return cl, nil
}

View File

@ -127,13 +127,15 @@ type Client struct {
// a share.
via map[string]string // target => via (target is referenced from via)
log *log.Logger // not nil
httpGate *syncutil.Gate
log *log.Logger // not nil
httpGate *syncutil.Gate
transportConfig *TransportConfig // or nil
paramsOnly bool // config file and env vars are ignored.
}
const maxParallelHTTP = 5
const maxParallelHTTP_h1 = 5
const maxParallelHTTP_h2 = 50
// New returns a new Camlistore Client.
// The provided server is either "host:port" (assumed http, not https) or a URL prefix, with or without a path, or a server alias from the client configuration file. A server alias should not be confused with a hostname, therefore it cannot contain any colon or period.
@ -147,7 +149,7 @@ func New(server string, opts ...ClientOption) *Client {
}
server = serverConf.Server
}
return newFromParams(server, auth.None{}, opts...)
return newClient(server, auth.None{}, opts...)
}
func NewOrFail(opts ...ClientOption) *Client {
@ -195,27 +197,37 @@ type TransportConfig struct {
Verbose bool // Verbose enables verbose logging of HTTP requests.
}
// TransportForConfig returns a transport for the client, setting the correct
func (c *Client) useHTTP2(tc *TransportConfig) bool {
if !c.useTLS() {
return false
}
if android.IsChild() {
// No particular reason; just untested so far.
return false
}
if os.Getenv("HTTPS_PROXY") != "" || os.Getenv("https_proxy") != "" ||
(tc != nil && tc.Proxy != nil) {
// Also just untested. Which proxies support h2 anyway?
return false
}
return true
}
// transportForConfig returns a transport for the client, setting the correct
// Proxy, Dial, and TLSClientConfig if needed. It does not mutate c.
// It is the caller's responsibility to then use that transport to set
// the client's httpClient with SetHTTPClient.
func (c *Client) TransportForConfig(tc *TransportConfig) http.RoundTripper {
func (c *Client) transportForConfig(tc *TransportConfig) http.RoundTripper {
if c == nil {
return nil
}
var transport http.RoundTripper
useH2 := c.useTLS() && !android.IsChild()
proxy := http.ProxyFromEnvironment
if tc != nil && tc.Proxy != nil {
proxy = tc.Proxy
useH2 = false
} else {
if os.Getenv("HTTPS_PROXY") != "" || os.Getenv("https_proxy") != "" {
useH2 = false
}
}
if useH2 {
if c.useHTTP2(tc) {
transport = &http2.Transport{
DialTLS: c.http2DialTLSFunc(),
}
@ -224,7 +236,7 @@ func (c *Client) TransportForConfig(tc *TransportConfig) http.RoundTripper {
DialTLS: c.DialTLSFunc(),
Dial: c.DialFunc(),
Proxy: proxy,
MaxIdleConnsPerHost: maxParallelHTTP,
MaxIdleConnsPerHost: maxParallelHTTP_h1,
}
}
httpStats := &httputil.StatsTransport{
@ -240,10 +252,29 @@ func (c *Client) TransportForConfig(tc *TransportConfig) http.RoundTripper {
return transport
}
// HTTPStats returns the client's underlying httputil.StatsTransport, if in use.
// If another transport is being used, nil is returned.
func (c *Client) HTTPStats() *httputil.StatsTransport {
st, _ := c.httpClient.Transport.(*httputil.StatsTransport)
return st
}
type ClientOption interface {
modifyClient(*Client)
}
func OptionTransportConfig(tc *TransportConfig) ClientOption {
return optionTransportConfig{tc}
}
type optionTransportConfig struct {
tc *TransportConfig
}
func (o optionTransportConfig) modifyClient(c *Client) {
c.transportConfig = o.tc
}
func OptionInsecure(v bool) ClientOption {
return optionInsecure(v)
}
@ -281,7 +312,7 @@ func NewFromShareRoot(shareBlobURL string, opts ...ClientOption) (c *Client, tar
if m == nil {
return nil, blob.Ref{}, fmt.Errorf("Unkown share URL base")
}
c = New(m[1])
c = New(m[1], opts...)
c.discoOnce.Do(noop)
c.prefixOnce.Do(noop)
c.prefixv = m[1]
@ -290,11 +321,6 @@ func NewFromShareRoot(shareBlobURL string, opts ...ClientOption) (c *Client, tar
c.via = make(map[string]string)
root = m[2]
for _, v := range opts {
v.modifyClient(c)
}
c.SetHTTPClient(&http.Client{Transport: c.TransportForConfig(nil)})
req := c.newRequest("GET", shareBlobURL, nil)
res, err := c.expect2XX(req)
if err != nil {
@ -1139,27 +1165,42 @@ func (c *Client) Close() error {
// and auth but does not use any on-disk config files or environment variables
// for its configuration. It may still use the disk for caches.
func NewFromParams(server string, mode auth.AuthMode, opts ...ClientOption) *Client {
cl := newFromParams(server, mode, opts...)
cl := newClient(server, mode, opts...)
cl.paramsOnly = true
return cl
}
func newFromParams(server string, mode auth.AuthMode, opts ...ClientOption) *Client {
httpClient := &http.Client{
Transport: &http.Transport{
MaxIdleConnsPerHost: maxParallelHTTP,
},
}
// TODO(bradfitz): move auth mode into a ClientOption? And
// OptionNoDiskConfig to delete NewFromParams, etc, and just have New?
func newClient(server string, mode auth.AuthMode, opts ...ClientOption) *Client {
c := &Client{
server: server,
httpClient: httpClient,
httpGate: syncutil.NewGate(maxParallelHTTP),
haveCache: noHaveCache{},
log: log.New(os.Stderr, "", log.Ldate|log.Ltime),
authMode: mode,
server: server,
haveCache: noHaveCache{},
log: log.New(os.Stderr, "", log.Ldate|log.Ltime),
authMode: mode,
}
for _, v := range opts {
v.modifyClient(c)
}
if c.httpClient == nil {
c.httpClient = &http.Client{
Transport: c.transportForConfig(c.transportConfig),
}
}
c.httpGate = syncutil.NewGate(httpGateSize(c.httpClient.Transport))
return c
}
func httpGateSize(rt http.RoundTripper) int {
switch v := rt.(type) {
case *httputil.StatsTransport:
return httpGateSize(v.Transport)
case *http.Transport:
return maxParallelHTTP_h1
case *http2.Transport:
return maxParallelHTTP_h2
default:
return maxParallelHTTP_h1 // conservative default
}
}

View File

@ -158,7 +158,7 @@ func TestTransportSetup(t *testing.T) {
android.OnAndroidHook = func() bool {
return tt.onAndroid
}
rt := cl.TransportForConfig(nil)
rt := cl.transportForConfig(nil)
var tr *http.Transport
if tt.onAndroid {
tr = rt.(*android.StatsTransport).Rt.(*httputil.StatsTransport).Transport.(*http.Transport)