Amazon S3 stat/upload support. Basically works.

Might be a bug or two yet, and requires a yet-unsubmitted fix to
Go's HTTP library.
This commit is contained in:
Brad Fitzpatrick 2011-04-04 15:15:09 -07:00
parent 17dc73b291
commit 46542e2e3f
3 changed files with 161 additions and 9 deletions

View File

@ -17,7 +17,11 @@ limitations under the License.
package s3
import (
"bytes"
"crypto/md5"
"hash"
"io"
"io/ioutil"
"log"
"os"
@ -28,7 +32,90 @@ import (
var _ = log.Printf
func (s3 *s3Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (*blobref.SizedBlobRef, os.Error) {
log.Printf("want to upload to s3: %s", blob)
return nil, os.NewError("no s3 upload")
const maxInMemorySlurp = 4 << 20 // 4MB. *shrug*
// amazonSlurper slurps up a blob to memory (or spilling to disk if
// over maxInMemorySlurp) to verify its digest (and also gets its MD5
// for Amazon's Content-MD5 header, even if the original blobref
// is e.g. sha1-xxxx)
type amazonSlurper struct {
blob *blobref.BlobRef // only used for tempfile's prefix
buf *bytes.Buffer
md5 hash.Hash
file *os.File // nil until allocated
reading bool // transitions at most once from false -> true
}
func newAmazonSlurper(blob *blobref.BlobRef) *amazonSlurper {
return &amazonSlurper{
blob: blob,
buf: new(bytes.Buffer),
md5: md5.New(),
}
}
func (as *amazonSlurper) Read(p []byte) (n int, err os.Error) {
if !as.reading {
as.reading = true
if as.file != nil {
as.file.Seek(0, 0)
}
}
if as.file != nil {
return as.file.Read(p)
}
return as.buf.Read(p)
}
func (as *amazonSlurper) Write(p []byte) (n int, err os.Error) {
if as.reading {
panic("write after read")
}
as.md5.Write(p)
if as.file != nil {
n, err = as.file.Write(p)
return
}
if as.buf.Len()+len(p) > maxInMemorySlurp {
as.file, err = ioutil.TempFile("", as.blob.String())
if err != nil {
return
}
_, err = io.Copy(as.file, as.buf)
if err != nil {
return
}
as.buf = nil
n, err = as.file.Write(p)
return
}
return as.buf.Write(p)
}
func (as *amazonSlurper) Cleanup() {
if as.file != nil {
os.Remove(as.file.Name())
}
}
func (sto *s3Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (*blobref.SizedBlobRef, os.Error) {
slurper := newAmazonSlurper(blob)
defer slurper.Cleanup()
hash := blob.Hash()
size, err := io.Copy(io.MultiWriter(hash, slurper), source)
if err != nil {
return nil, err
}
if !blob.HashMatches(hash) {
return nil, blobserver.CorruptBlobError
}
err = sto.s3Client.PutObject(blob.String(), sto.bucket, slurper.md5, size, slurper)
if err != nil {
return nil, err
}
sb := &blobref.SizedBlobRef{BlobRef: blob, Size: size}
return sb, nil
}

View File

@ -22,16 +22,21 @@ import (
"camli/blobref"
"camli/blobserver"
//"camli/misc/amazon/s3"
)
var _ = log.Printf
func (s3 *s3Storage) Stat(dest chan *blobref.SizedBlobRef,
partition blobserver.Partition,
blobs []*blobref.BlobRef,
waitSeconds int) os.Error {
// TODO: implement
func (sto *s3Storage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
// TODO: do n stats in parallel
for _, br := range blobs {
size, err := sto.s3Client.Stat(br.String(), sto.bucket)
log.Printf("stat of %s: %d, %v", br.String(), size, err)
if err == nil {
dest <- &blobref.SizedBlobRef{BlobRef: br, Size: size}
} else {
// TODO: handle
}
}
return nil
}

View File

@ -17,11 +17,16 @@ limitations under the License.
package s3
import (
"bytes"
"encoding/base64"
"fmt"
"hash"
"http"
"io"
"io/ioutil"
"log"
"os"
"strconv"
)
var _ = log.Printf
@ -51,6 +56,7 @@ func newReq(url string) *http.Request {
return &http.Request{
Method: "GET",
URL: u,
Host: u.Host,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
@ -67,6 +73,60 @@ func (c *Client) Buckets() ([]*Bucket, os.Error) {
return nil, err
}
slurp, _ := ioutil.ReadAll(res.Body)
res.Body.Close()
log.Printf("got: %q", string(slurp))
return nil, nil
}
// Returns 0, os.ENOENT if not on S3, otherwise reterr is real.
func (c *Client) Stat(name, bucket string) (size int64, reterr os.Error) {
req := newReq("http://" + bucket + ".s3.amazonaws.com/" + name)
req.Method = "HEAD"
c.Auth.SignRequest(req)
log.Printf("pre-Stat Do")
res, err := c.httpClient().Do(req)
log.Printf("post-Stat Do: res=%v, err=%v", res, err)
if err != nil {
return 0, err
}
if res != nil {
res.Write(os.Stderr)
}
if res.StatusCode == http.StatusNotFound {
log.Printf("state of %s == EOF", name)
return 0, os.ENOENT
}
if res.Body != nil {
//defer res.Body.Close()
}
return strconv.Atoi64(res.Header.Get("Content-Length"))
}
func (c *Client) PutObject(name, bucket string, md5 hash.Hash, size int64, body io.Reader) os.Error {
req := newReq("https://" + bucket + ".s3.amazonaws.com/" + name)
req.Method = "PUT"
req.ContentLength = size
if md5 != nil {
b64 := new(bytes.Buffer)
encoder := base64.NewEncoder(base64.StdEncoding, b64)
encoder.Write(md5.Sum())
encoder.Close()
req.Header.Set("Content-MD5", b64.String())
}
c.Auth.SignRequest(req)
req.Body = ioutil.NopCloser(body)
res, err := c.httpClient().Do(req)
if res != nil && res.Body != nil {
defer res.Body.Close()
}
if err != nil {
return err
}
if res.StatusCode != 200 {
res.Write(os.Stderr)
return fmt.Errorf("Got response code %d from s3", res.StatusCode)
}
return nil
}