From bfb0d1e8de4e9d99a25c6cb9df96a01b599dcb5e Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 16 Mar 2014 19:39:43 -0700 Subject: [PATCH] s3: more fault injection, and simplify receive, removing spilling to disk Change-Id: Id7fb01c40dc863bee483887bbf14934161eb03a0 --- pkg/blobserver/s3/enumerate.go | 5 +- pkg/blobserver/s3/fetch.go | 3 ++ pkg/blobserver/s3/receive.go | 90 +++------------------------------- pkg/blobserver/s3/s3.go | 8 +++ pkg/blobserver/s3/stat.go | 6 ++- 5 files changed, 25 insertions(+), 87 deletions(-) diff --git a/pkg/blobserver/s3/enumerate.go b/pkg/blobserver/s3/enumerate.go index d561de891..90ac39ec0 100644 --- a/pkg/blobserver/s3/enumerate.go +++ b/pkg/blobserver/s3/enumerate.go @@ -46,8 +46,11 @@ func nextStr(s string) string { return string(b) } -func (sto *s3Storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { +func (sto *s3Storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) { defer close(dest) + if faultEnumerate.FailErr(&err) { + return + } startAt := after if _, ok := blob.Parse(after); ok { startAt = nextStr(after) diff --git a/pkg/blobserver/s3/fetch.go b/pkg/blobserver/s3/fetch.go index d3d3e3795..ddbb20e34 100644 --- a/pkg/blobserver/s3/fetch.go +++ b/pkg/blobserver/s3/fetch.go @@ -23,6 +23,9 @@ import ( ) func (sto *s3Storage) Fetch(blob blob.Ref) (file io.ReadCloser, size uint32, err error) { + if faultGet.FailErr(&err) { + return + } file, sz, err := sto.s3Client.Get(sto.bucket, blob.String()) return file, uint32(sz), err } diff --git a/pkg/blobserver/s3/receive.go b/pkg/blobserver/s3/receive.go index 86b14ad98..afd970a55 100644 --- a/pkg/blobserver/s3/receive.go +++ b/pkg/blobserver/s3/receive.go @@ -19,103 +19,25 @@ package s3 import ( "bytes" "crypto/md5" - "errors" - "hash" "io" - "io/ioutil" - "math/rand" - "os" - "strconv" "camlistore.org/pkg/blob" ) -const maxInMemorySlurp = 4 << 20 // 4MB. *shrug* - -// amazonSlurper slurps up a blob to memory (or spilling to disk if -// over maxInMemorySlurp) to verify its digest (and also gets its MD5 -// for Amazon's Content-MD5 header, even if the original blobref -// is e.g. sha1-xxxx) -type amazonSlurper struct { - blob blob.Ref // only used for tempfile's prefix - buf *bytes.Buffer - md5 hash.Hash - file *os.File // nil until allocated - reading bool // transitions at most once from false -> true -} - -func newAmazonSlurper(blob blob.Ref) *amazonSlurper { - return &amazonSlurper{ - blob: blob, - buf: new(bytes.Buffer), - md5: md5.New(), - } -} - -func (as *amazonSlurper) Read(p []byte) (n int, err error) { - if !as.reading { - as.reading = true - if as.file != nil { - as.file.Seek(0, 0) - } - } - if as.file != nil { - return as.file.Read(p) - } - return as.buf.Read(p) -} - -func (as *amazonSlurper) Write(p []byte) (n int, err error) { - if as.reading { - panic("write after read") - } - as.md5.Write(p) - if as.file != nil { - n, err = as.file.Write(p) - return - } - - if as.buf.Len()+len(p) > maxInMemorySlurp { - as.file, err = ioutil.TempFile("", as.blob.String()) - if err != nil { - return - } - _, err = io.Copy(as.file, as.buf) - if err != nil { - return - } - as.buf = nil - n, err = as.file.Write(p) - return - } - - return as.buf.Write(p) -} - -func (as *amazonSlurper) Cleanup() { - if as.file != nil { - os.Remove(as.file.Name()) - } -} - -var failPercent, _ = strconv.Atoi(os.Getenv("CAMLI_S3_FAIL_PERCENT")) - func (sto *s3Storage) ReceiveBlob(b blob.Ref, source io.Reader) (sr blob.SizedRef, err error) { - slurper := newAmazonSlurper(b) - defer slurper.Cleanup() + var buf bytes.Buffer + md5h := md5.New() - size, err := io.Copy(slurper, source) + size, err := io.Copy(io.MultiWriter(&buf, md5h), source) if err != nil { return sr, err } - if failPercent > 0 && failPercent > rand.Intn(100) { - // TODO(bradfitz): move this to its own package/type, for re-use in - // many places. - return sr, errors.New("fake injected error for testing") + if faultReceive.FailErr(&err) { + return } - err = sto.s3Client.PutObject(b.String(), sto.bucket, slurper.md5, size, slurper) + err = sto.s3Client.PutObject(b.String(), sto.bucket, md5h, size, &buf) if err != nil { return sr, err } diff --git a/pkg/blobserver/s3/s3.go b/pkg/blobserver/s3/s3.go index 982dbb66b..ac12588b2 100644 --- a/pkg/blobserver/s3/s3.go +++ b/pkg/blobserver/s3/s3.go @@ -38,10 +38,18 @@ import ( "net/http" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/fault" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/misc/amazon/s3" ) +var ( + faultReceive = fault.NewInjector("s3_receive") + faultEnumerate = fault.NewInjector("s3_enumerate") + faultStat = fault.NewInjector("s3_stat") + faultGet = fault.NewInjector("s3_get") +) + type s3Storage struct { s3Client *s3.Client bucket string diff --git a/pkg/blobserver/s3/stat.go b/pkg/blobserver/s3/stat.go index 0e0e7f3f9..6a2b3a3fc 100644 --- a/pkg/blobserver/s3/stat.go +++ b/pkg/blobserver/s3/stat.go @@ -26,9 +26,11 @@ import ( var statGate = syncutil.NewGate(20) // arbitrary -func (sto *s3Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error { +func (sto *s3Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) (err error) { + if faultStat.FailErr(&err) { + return + } var wg syncutil.Group - for _, br := range blobs { br := br statGate.Start()