From d43263035ac1ff078782ad582415e1800368e11e Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Tue, 10 May 2011 14:55:12 -0700 Subject: [PATCH] client: allow UploadHandle.Size to be -1, and associated cleanup, including: -- new StorageConfiger interface & use it in camlistored. -- bunch of little bug fixes and TODOs done. --- lib/go/camli/blobserver/handlers/upload.go | 12 ++- lib/go/camli/blobserver/interface.go | 31 +++++--- lib/go/camli/client/client.go | 7 +- lib/go/camli/client/upload.go | 87 +++++++++++++++++----- server/go/camlistored/camlistored.go | 29 +++++++- 5 files changed, 127 insertions(+), 39 deletions(-) diff --git a/lib/go/camli/blobserver/handlers/upload.go b/lib/go/camli/blobserver/handlers/upload.go index d6011b972..4c049cbec 100644 --- a/lib/go/camli/blobserver/handlers/upload.go +++ b/lib/go/camli/blobserver/handlers/upload.go @@ -30,14 +30,15 @@ import ( "strings" ) -func CreateUploadHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { +func CreateUploadHandler(storage blobserver.BlobReceiveConfiger) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { handleMultiPartUpload(conn, req, storage) } } -func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver) { +func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiveConfiger) { if !(req.Method == "POST" && strings.Contains(req.URL.Path, "/camli/upload")) { + log.Printf("Inconfigured handler upload handler") httputil.BadRequestError(conn, "Inconfigured handler.") return } @@ -106,8 +107,7 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece } log.Println("Done reading multipart body.") - configer, _ := blobReceiver.(blobserver.Configer) // TODO: ugly? - ret := commonUploadResponse(configer, req) + ret := commonUploadResponse(blobReceiver, req) received := make([]map[string]interface{}, 0) for _, got := range receivedBlobs { @@ -134,9 +134,7 @@ func commonUploadResponse(configer blobserver.Configer, req *http.Request) map[s // TODO: camli/upload isn't part of the spec. we should pick // something different here just to make it obvious that this // isn't a well-known URL and accidentally encourage lazy clients. - if configer != nil { - ret["uploadUrl"] = configer.Config().URLBase + "/camli/upload" - } + ret["uploadUrl"] = configer.Config().URLBase + "/camli/upload" return ret } diff --git a/lib/go/camli/blobserver/interface.go b/lib/go/camli/blobserver/interface.go index 3aea397f6..0aa09b6fe 100644 --- a/lib/go/camli/blobserver/interface.go +++ b/lib/go/camli/blobserver/interface.go @@ -25,15 +25,15 @@ import ( var CorruptBlobError = os.NewError("corrupt blob; digest doesn't match") type NamedPartition interface { - Name() string // "" for default, "queue-indexer", etc + Name() string // "" for default, "queue-indexer", etc } type Partition interface { NamedPartition - Writable() bool // accepts direct uploads (excluding mirroring from default partition) - Readable() bool // can return blobs (e.g. indexer partition can't) - IsQueue() bool // is a temporary queue partition (supports deletes) + Writable() bool // accepts direct uploads (excluding mirroring from default partition) + Readable() bool // can return blobs (e.g. indexer partition can't) + IsQueue() bool // is a temporary queue partition (supports deletes) // TODO: rename this. just "UploadMirrors"? GetMirrorPartitions() []Partition @@ -55,8 +55,9 @@ type BlobStatter interface { // waitSeconds is the max time to wait for the blobs to exist, // or 0 for no delay. Stat(dest chan<- *blobref.SizedBlobRef, - blobs []*blobref.BlobRef, - waitSeconds int) os.Error + blobs []*blobref.BlobRef, + waitSeconds int) os.Error + // TODO-GO: file a gofmt bug on how ugly those lines above look } // QueueCreator is implemented by Storage interfaces which support @@ -83,9 +84,9 @@ type BlobEnumerator interface { // EnumerateBlobs must close the channel. (even if limit // was hit and more blobs remain) EnumerateBlobs(dest chan<- *blobref.SizedBlobRef, - after string, - limit uint, - waitSeconds int) os.Error + after string, + limit uint, + waitSeconds int) os.Error } // Cache is the minimal interface expected of a blob cache. @@ -95,9 +96,14 @@ type Cache interface { BlobStatter } +type BlobReceiveConfiger interface { + BlobReceiver + Configer +} + type Config struct { Writable, Readable bool - IsQueue bool // supports deletes + IsQueue bool // supports deletes // the "http://host:port" and optional path (but without trailing slash) to have "/camli/*" appended URLBase string @@ -121,3 +127,8 @@ type Storage interface { // Returns the blob notification bus GetBlobHub() BlobHub } + +type StorageConfiger interface { + Storage + Configer +} diff --git a/lib/go/camli/client/client.go b/lib/go/camli/client/client.go index bb1326d94..128fc07cb 100644 --- a/lib/go/camli/client/client.go +++ b/lib/go/camli/client/client.go @@ -73,7 +73,12 @@ func (c *Client) SetHttpClient(client *http.Client) { func NewOrFail() *Client { log := log.New(os.Stderr, "", log.Ldate|log.Ltime) - return &Client{server: blobServerOrDie(), password: passwordOrDie(), log: log} + return &Client{ + server: blobServerOrDie(), + password: passwordOrDie(), + httpClient: http.DefaultClient, + log: log, + } } type devNullWriter struct{} diff --git a/lib/go/camli/client/upload.go b/lib/go/camli/client/upload.go index 7b5fd48d2..565873b87 100644 --- a/lib/go/camli/client/upload.go +++ b/lib/go/camli/client/upload.go @@ -31,9 +31,11 @@ import ( "strings" ) +var _ = log.Printf + type UploadHandle struct { BlobRef *blobref.BlobRef - Size int64 + Size int64 // or -1 if size isn't known Contents io.Reader } @@ -57,7 +59,7 @@ func newResFormatError(s string, arg ...interface{}) ResponseFormatError { return ResponseFormatError(fmt.Errorf(s, arg...)) } -func parseStatResponse(r io.Reader) (*statResponse, os.Error) { +func parseStatResponse(r io.Reader) (sr *statResponse, _ os.Error) { var ( ok bool err os.Error @@ -67,6 +69,11 @@ func parseStatResponse(r io.Reader) (*statResponse, os.Error) { if err = json.NewDecoder(io.LimitReader(r, 5<<20)).Decode(&jmap); err != nil { return nil, ResponseFormatError(err) } + defer func() { + if sr == nil { + log.Printf("parseStatResponse got map: %#v", jmap) + } + }() s.uploadUrl, ok = jmap["uploadUrl"].(string) if !ok { @@ -206,7 +213,9 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { c.statsMutex.Lock() c.stats.UploadRequests.Blobs++ - c.stats.UploadRequests.Bytes += h.Size + if h.Size != -1 { + c.stats.UploadRequests.Bytes += h.Size + } c.statsMutex.Unlock() blobRefString := h.BlobRef.String() @@ -238,6 +247,27 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { pr := &PutResult{BlobRef: h.BlobRef, Size: h.Size} if _, ok := stat.HaveMap[h.BlobRef.String()]; ok { pr.Skipped = true + + // Consume the buffer that was provided, just for + // consistency. But if it's a closer, do that + // instead. But if they didn't provide a size, + // we consume it anyway just to get the size + // for stats. + closer, _ := h.Contents.(io.Closer) + if h.Size >= 0 && closer != nil { + closer.Close() + } else { + n, err := io.Copy(ioutil.Discard, h.Contents) + if err != nil { + return nil, err + } + if h.Size == -1 { + pr.Size = n + c.statsMutex.Lock() + c.stats.UploadRequests.Bytes += pr.Size + c.statsMutex.Unlock() + } + } return pr, nil } @@ -255,43 +285,55 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { c.log.Printf("Uploading to URL: %s", stat.uploadUrl) req = c.newRequest("POST", stat.uploadUrl) req.Header.Set("Content-Type", "multipart/form-data; boundary="+boundary) + + contentsSize := int64(0) req.Body = ioutil.NopCloser(io.MultiReader( strings.NewReader(multiPartHeader), - h.Contents, + countingReader{h.Contents, &contentsSize}, strings.NewReader(multiPartFooter))) - req.ContentLength = int64(len(multiPartHeader)) + h.Size + int64(len(multiPartFooter)) + if h.Size >= 0 { + req.ContentLength = int64(len(multiPartHeader)) + h.Size + int64(len(multiPartFooter)) + } req.TransferEncoding = nil resp, err = c.httpClient.Do(req) if err != nil { - return error("upload http error", err) + return error("upload http error: %v", err) + } + + if h.Size >= 0 { + if contentsSize != h.Size { + return error("UploadHandle declared size %d but Contents length was %d", h.Size, contentsSize) + } + } else { + h.Size = contentsSize } // The only valid HTTP responses are 200 and 303. if resp.StatusCode != 200 && resp.StatusCode != 303 { - return error(fmt.Sprintf("invalid http response %d in upload response", resp.StatusCode), nil) + return error("invalid http response %d in upload response", resp.StatusCode) } if resp.StatusCode == 303 { otherLocation := resp.Header.Get("Location") if otherLocation == "" { - return error("303 without a Location", nil) + return error("303 without a Location") } baseUrl, _ := http.ParseURL(stat.uploadUrl) absUrl, err := baseUrl.ParseURL(otherLocation) if err != nil { - return error("303 Location URL relative resolve error", err) + return error("303 Location URL relative resolve error: %v", err) } otherLocation = absUrl.String() resp, _, err = http.Get(otherLocation) if err != nil { - return error("error following 303 redirect after upload", err) + return error("error following 303 redirect after upload: %v", err) } } ures, err := c.jsonFromResponse("upload", resp) if err != nil { - return error("json parse from upload error", err) + return error("json parse from upload error: %v", err) } errorText, ok := ures["errorText"].(string) @@ -301,18 +343,18 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { received, ok := ures["received"].([]interface{}) if !ok { - return error("upload json validity error: no 'received'", nil) + return error("upload json validity error: no 'received'") } for _, rit := range received { it, ok := rit.(map[string]interface{}) if !ok { - return error("upload json validity error: 'received' is malformed", nil) + return error("upload json validity error: 'received' is malformed") } if it["blobRef"] == blobRefString { switch size := it["size"].(type) { case nil: - return error("upload json validity error: 'received' is missing 'size'", nil) + return error("upload json validity error: 'received' is missing 'size'") case float64: if int64(size) == h.Size { // Success! @@ -322,14 +364,25 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { c.statsMutex.Unlock() return pr, nil } else { - return error(fmt.Sprintf("Server got blob, but reports wrong length (%v; expected %d)", - size, h.Size),nil) + return error("Server got blob, but reports wrong length (%v; expected %d)", + size, h.Size) } default: - return error("unsupported type of 'size' in received response", nil) + return error("unsupported type of 'size' in received response") } } } return nil, os.NewError("Server didn't receive blob.") } + +type countingReader struct { + r io.Reader + n *int64 +} + +func (cr countingReader) Read(p []byte) (n int, err os.Error) { + n, err = cr.r.Read(p) + *cr.n += int64(n) + return +} diff --git a/server/go/camlistored/camlistored.go b/server/go/camlistored/camlistored.go index 5215132e7..febe20c8c 100644 --- a/server/go/camlistored/camlistored.go +++ b/server/go/camlistored/camlistored.go @@ -63,8 +63,31 @@ func unsupportedHandler(conn http.ResponseWriter, req *http.Request) { httputil.BadRequestError(conn, "Unsupported camlistore path or method.") } +type storageAndConfig struct { + blobserver.Storage + config *blobserver.Config +} + +func (s *storageAndConfig) Config() *blobserver.Config { + return s.config +} + // where prefix is like "/" or "/s3/" for e.g. "/camli/" or "/s3/camli/*" func makeCamliHandler(prefix, baseURL string, storage blobserver.Storage) http.Handler { + if !strings.HasSuffix(prefix, "/") { + panic("expected prefix to end in slash") + } + baseURL = strings.TrimRight(baseURL, "/") + + storageConfig := &storageAndConfig{ + storage, + &blobserver.Config{ + Writable: true, + Readable: true, + IsQueue: false, + URLBase: baseURL + prefix[:len(prefix)-1], + }, + } return http.HandlerFunc(func(conn http.ResponseWriter, req *http.Request) { action, err := parseCamliPath(req.URL.Path[len(prefix)-1:]) if err != nil { @@ -73,13 +96,11 @@ func makeCamliHandler(prefix, baseURL string, storage blobserver.Storage) http.H unsupportedHandler(conn, req) return } - // TODO: actually deal with partitions here - part := &partitionConfig{"", true, true, false, nil, baseURL + prefix[:len(prefix)-1]} - handleCamliUsingStorage(conn, req, action, part, storage) + handleCamliUsingStorage(conn, req, action, storageConfig) }) } -func handleCamliUsingStorage(conn http.ResponseWriter, req *http.Request, action string, partition blobserver.Partition, storage blobserver.Storage) { +func handleCamliUsingStorage(conn http.ResponseWriter, req *http.Request, action string, storage blobserver.StorageConfiger) { handler := unsupportedHandler switch req.Method { case "GET":