diff --git a/pkg/blobserver/s3/s3.go b/pkg/blobserver/s3/s3.go index 5a30e59c9..32122e613 100644 --- a/pkg/blobserver/s3/s3.go +++ b/pkg/blobserver/s3/s3.go @@ -45,6 +45,7 @@ import ( "go4.org/fault" "go4.org/jsonconfig" + "go4.org/syncutil" ) var ( @@ -59,6 +60,8 @@ var ( faultGet = fault.NewInjector("s3_get") ) +const maxParallelHTTP = 5 + type s3Storage struct { s3Client *s3.Client bucket string @@ -88,6 +91,7 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora SecretAccessKey: config.RequiredString("aws_secret_access_key"), Hostname: hostname, }, + PutGate: syncutil.NewGate(maxParallelHTTP), } bucket := config.RequiredString("bucket") var dirPrefix string diff --git a/pkg/blobserver/s3/s3_test.go b/pkg/blobserver/s3/s3_test.go index b86909aaf..4152ea426 100644 --- a/pkg/blobserver/s3/s3_test.go +++ b/pkg/blobserver/s3/s3_test.go @@ -80,6 +80,7 @@ func TestS3WriteFiles(t *testing.T) { if _, err := schema.WriteFileFromReaderWithModTime(sto, name, time.Now(), f); err != nil { t.Fatalf("Error while writing %v to S3: %v", name, err) } + t.Logf("Wrote %v successfully to S3", name) } } diff --git a/pkg/misc/amazon/s3/client.go b/pkg/misc/amazon/s3/client.go index 2c0d66280..6396a71c3 100644 --- a/pkg/misc/amazon/s3/client.go +++ b/pkg/misc/amazon/s3/client.go @@ -37,6 +37,8 @@ import ( "time" "camlistore.org/pkg/blob" + + "go4.org/syncutil" ) const maxList = 1000 @@ -45,6 +47,10 @@ const maxList = 1000 type Client struct { *Auth Transport http.RoundTripper // or nil for the default + // PutGate limits the number of concurrent PutObject calls, because + // apparently S3 throttles us if there are too many. No limit if nil. + // Default in S3 blobserver is 5. + PutGate *syncutil.Gate } type Bucket struct { @@ -129,6 +135,10 @@ func (c *Client) Stat(key, bucket string) (size int64, reterr error) { } func (c *Client) PutObject(key, bucket string, md5 hash.Hash, size int64, body io.Reader) error { + if c.PutGate != nil { + c.PutGate.Start() + defer c.PutGate.Done() + } req := newReq(c.keyURL(bucket, key)) req.Method = "PUT" req.ContentLength = size