From 46542e2e3f71d8a75f904b1442f2295785ceff80 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 4 Apr 2011 15:15:09 -0700 Subject: [PATCH] Amazon S3 stat/upload support. Basically works. Might be a bug or two yet, and requires a yet-unsubmitted fix to Go's HTTP library. --- lib/go/camli/blobserver/s3/receive.go | 93 ++++++++++++++++++++++++++- lib/go/camli/blobserver/s3/stat.go | 17 +++-- lib/go/camli/misc/amazon/s3/client.go | 60 +++++++++++++++++ 3 files changed, 161 insertions(+), 9 deletions(-) diff --git a/lib/go/camli/blobserver/s3/receive.go b/lib/go/camli/blobserver/s3/receive.go index 54f788782..17ddd7153 100644 --- a/lib/go/camli/blobserver/s3/receive.go +++ b/lib/go/camli/blobserver/s3/receive.go @@ -17,7 +17,11 @@ limitations under the License. package s3 import ( + "bytes" + "crypto/md5" + "hash" "io" + "io/ioutil" "log" "os" @@ -28,7 +32,90 @@ import ( var _ = log.Printf -func (s3 *s3Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (*blobref.SizedBlobRef, os.Error) { - log.Printf("want to upload to s3: %s", blob) - return nil, os.NewError("no s3 upload") +const maxInMemorySlurp = 4 << 20 // 4MB. *shrug* + +// amazonSlurper slurps up a blob to memory (or spilling to disk if +// over maxInMemorySlurp) to verify its digest (and also gets its MD5 +// for Amazon's Content-MD5 header, even if the original blobref +// is e.g. sha1-xxxx) +type amazonSlurper struct { + blob *blobref.BlobRef // only used for tempfile's prefix + buf *bytes.Buffer + md5 hash.Hash + file *os.File // nil until allocated + reading bool // transitions at most once from false -> true +} + +func newAmazonSlurper(blob *blobref.BlobRef) *amazonSlurper { + return &amazonSlurper{ + blob: blob, + buf: new(bytes.Buffer), + md5: md5.New(), + } +} + +func (as *amazonSlurper) Read(p []byte) (n int, err os.Error) { + if !as.reading { + as.reading = true + if as.file != nil { + as.file.Seek(0, 0) + } + } + if as.file != nil { + return as.file.Read(p) + } + return as.buf.Read(p) +} + +func (as *amazonSlurper) Write(p []byte) (n int, err os.Error) { + if as.reading { + panic("write after read") + } + as.md5.Write(p) + if as.file != nil { + n, err = as.file.Write(p) + return + } + + if as.buf.Len()+len(p) > maxInMemorySlurp { + as.file, err = ioutil.TempFile("", as.blob.String()) + if err != nil { + return + } + _, err = io.Copy(as.file, as.buf) + if err != nil { + return + } + as.buf = nil + n, err = as.file.Write(p) + return + } + + return as.buf.Write(p) +} + +func (as *amazonSlurper) Cleanup() { + if as.file != nil { + os.Remove(as.file.Name()) + } +} + +func (sto *s3Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (*blobref.SizedBlobRef, os.Error) { + slurper := newAmazonSlurper(blob) + defer slurper.Cleanup() + + hash := blob.Hash() + size, err := io.Copy(io.MultiWriter(hash, slurper), source) + if err != nil { + return nil, err + } + if !blob.HashMatches(hash) { + return nil, blobserver.CorruptBlobError + } + err = sto.s3Client.PutObject(blob.String(), sto.bucket, slurper.md5, size, slurper) + if err != nil { + return nil, err + } + sb := &blobref.SizedBlobRef{BlobRef: blob, Size: size} + return sb, nil } diff --git a/lib/go/camli/blobserver/s3/stat.go b/lib/go/camli/blobserver/s3/stat.go index 093a44728..be1c57dd7 100644 --- a/lib/go/camli/blobserver/s3/stat.go +++ b/lib/go/camli/blobserver/s3/stat.go @@ -22,16 +22,21 @@ import ( "camli/blobref" "camli/blobserver" - //"camli/misc/amazon/s3" ) var _ = log.Printf -func (s3 *s3Storage) Stat(dest chan *blobref.SizedBlobRef, - partition blobserver.Partition, - blobs []*blobref.BlobRef, - waitSeconds int) os.Error { - // TODO: implement +func (sto *s3Storage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error { + // TODO: do n stats in parallel + for _, br := range blobs { + size, err := sto.s3Client.Stat(br.String(), sto.bucket) + log.Printf("stat of %s: %d, %v", br.String(), size, err) + if err == nil { + dest <- &blobref.SizedBlobRef{BlobRef: br, Size: size} + } else { + // TODO: handle + } + } return nil } diff --git a/lib/go/camli/misc/amazon/s3/client.go b/lib/go/camli/misc/amazon/s3/client.go index 788b6ad72..4f42d28e0 100644 --- a/lib/go/camli/misc/amazon/s3/client.go +++ b/lib/go/camli/misc/amazon/s3/client.go @@ -17,11 +17,16 @@ limitations under the License. package s3 import ( + "bytes" + "encoding/base64" "fmt" + "hash" "http" + "io" "io/ioutil" "log" "os" + "strconv" ) var _ = log.Printf @@ -51,6 +56,7 @@ func newReq(url string) *http.Request { return &http.Request{ Method: "GET", URL: u, + Host: u.Host, Proto: "HTTP/1.1", ProtoMajor: 1, ProtoMinor: 1, @@ -67,6 +73,60 @@ func (c *Client) Buckets() ([]*Bucket, os.Error) { return nil, err } slurp, _ := ioutil.ReadAll(res.Body) + res.Body.Close() log.Printf("got: %q", string(slurp)) return nil, nil } + +// Returns 0, os.ENOENT if not on S3, otherwise reterr is real. +func (c *Client) Stat(name, bucket string) (size int64, reterr os.Error) { + req := newReq("http://" + bucket + ".s3.amazonaws.com/" + name) + req.Method = "HEAD" + c.Auth.SignRequest(req) + log.Printf("pre-Stat Do") + res, err := c.httpClient().Do(req) + log.Printf("post-Stat Do: res=%v, err=%v", res, err) + if err != nil { + return 0, err + } + if res != nil { + res.Write(os.Stderr) + } + if res.StatusCode == http.StatusNotFound { + log.Printf("state of %s == EOF", name) + return 0, os.ENOENT + } + + if res.Body != nil { + //defer res.Body.Close() + } + return strconv.Atoi64(res.Header.Get("Content-Length")) +} + +func (c *Client) PutObject(name, bucket string, md5 hash.Hash, size int64, body io.Reader) os.Error { + req := newReq("https://" + bucket + ".s3.amazonaws.com/" + name) + req.Method = "PUT" + req.ContentLength = size + if md5 != nil { + b64 := new(bytes.Buffer) + encoder := base64.NewEncoder(base64.StdEncoding, b64) + encoder.Write(md5.Sum()) + encoder.Close() + req.Header.Set("Content-MD5", b64.String()) + } + c.Auth.SignRequest(req) + req.Body = ioutil.NopCloser(body) + + res, err := c.httpClient().Do(req) + if res != nil && res.Body != nil { + defer res.Body.Close() + } + if err != nil { + return err + } + if res.StatusCode != 200 { + res.Write(os.Stderr) + return fmt.Errorf("Got response code %d from s3", res.StatusCode) + } + return nil +}