blobserver/s3: limit concurrent PutObject calls

Fixes issue #675

Since camput does not seem affected by the above issue, and that camput
(pkg/client really) main difference in how it uploads is that it does
gated requests, it seemed to follow that gating the requests might fix
the problem.

With this change, I don't see the problem anymore either in
TestS3WriteFiles or by manually uploading with the web UI.

I suppose this indicates that S3 throttles incoming connections pretty
strictly, but I have no formal evidence of it.

Change-Id: I48f10923a710f4a21e44bc251e648db37ae504ec
This commit is contained in:
mpl 2016-01-22 22:08:46 +01:00
parent 39f50aa70e
commit b9aec12f31
3 changed files with 15 additions and 0 deletions

View File

@ -45,6 +45,7 @@ import (
"go4.org/fault"
"go4.org/jsonconfig"
"go4.org/syncutil"
)
var (
@ -59,6 +60,8 @@ var (
faultGet = fault.NewInjector("s3_get")
)
const maxParallelHTTP = 5
type s3Storage struct {
s3Client *s3.Client
bucket string
@ -88,6 +91,7 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
SecretAccessKey: config.RequiredString("aws_secret_access_key"),
Hostname: hostname,
},
PutGate: syncutil.NewGate(maxParallelHTTP),
}
bucket := config.RequiredString("bucket")
var dirPrefix string

View File

@ -80,6 +80,7 @@ func TestS3WriteFiles(t *testing.T) {
if _, err := schema.WriteFileFromReaderWithModTime(sto, name, time.Now(), f); err != nil {
t.Fatalf("Error while writing %v to S3: %v", name, err)
}
t.Logf("Wrote %v successfully to S3", name)
}
}

View File

@ -37,6 +37,8 @@ import (
"time"
"camlistore.org/pkg/blob"
"go4.org/syncutil"
)
const maxList = 1000
@ -45,6 +47,10 @@ const maxList = 1000
type Client struct {
*Auth
Transport http.RoundTripper // or nil for the default
// PutGate limits the number of concurrent PutObject calls, because
// apparently S3 throttles us if there are too many. No limit if nil.
// Default in S3 blobserver is 5.
PutGate *syncutil.Gate
}
type Bucket struct {
@ -129,6 +135,10 @@ func (c *Client) Stat(key, bucket string) (size int64, reterr error) {
}
func (c *Client) PutObject(key, bucket string, md5 hash.Hash, size int64, body io.Reader) error {
if c.PutGate != nil {
c.PutGate.Start()
defer c.PutGate.Done()
}
req := newReq(c.keyURL(bucket, key))
req.Method = "PUT"
req.ContentLength = size