client: allow UploadHandle.Size to be -1, and associated cleanup, including:

-- new StorageConfiger interface & use it in camlistored.
-- bunch of little bug fixes and TODOs done.
This commit is contained in:
Brad Fitzpatrick 2011-05-10 14:55:12 -07:00
parent bce74ef24c
commit d43263035a
5 changed files with 127 additions and 39 deletions

View File

@ -30,14 +30,15 @@ import (
"strings"
)
func CreateUploadHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) {
func CreateUploadHandler(storage blobserver.BlobReceiveConfiger) func(http.ResponseWriter, *http.Request) {
return func(conn http.ResponseWriter, req *http.Request) {
handleMultiPartUpload(conn, req, storage)
}
}
func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver) {
func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiveConfiger) {
if !(req.Method == "POST" && strings.Contains(req.URL.Path, "/camli/upload")) {
log.Printf("Inconfigured handler upload handler")
httputil.BadRequestError(conn, "Inconfigured handler.")
return
}
@ -106,8 +107,7 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece
}
log.Println("Done reading multipart body.")
configer, _ := blobReceiver.(blobserver.Configer) // TODO: ugly?
ret := commonUploadResponse(configer, req)
ret := commonUploadResponse(blobReceiver, req)
received := make([]map[string]interface{}, 0)
for _, got := range receivedBlobs {
@ -134,9 +134,7 @@ func commonUploadResponse(configer blobserver.Configer, req *http.Request) map[s
// TODO: camli/upload isn't part of the spec. we should pick
// something different here just to make it obvious that this
// isn't a well-known URL and accidentally encourage lazy clients.
if configer != nil {
ret["uploadUrl"] = configer.Config().URLBase + "/camli/upload"
}
ret["uploadUrl"] = configer.Config().URLBase + "/camli/upload"
return ret
}

View File

@ -25,15 +25,15 @@ import (
var CorruptBlobError = os.NewError("corrupt blob; digest doesn't match")
type NamedPartition interface {
Name() string // "" for default, "queue-indexer", etc
Name() string // "" for default, "queue-indexer", etc
}
type Partition interface {
NamedPartition
Writable() bool // accepts direct uploads (excluding mirroring from default partition)
Readable() bool // can return blobs (e.g. indexer partition can't)
IsQueue() bool // is a temporary queue partition (supports deletes)
Writable() bool // accepts direct uploads (excluding mirroring from default partition)
Readable() bool // can return blobs (e.g. indexer partition can't)
IsQueue() bool // is a temporary queue partition (supports deletes)
// TODO: rename this. just "UploadMirrors"?
GetMirrorPartitions() []Partition
@ -55,8 +55,9 @@ type BlobStatter interface {
// waitSeconds is the max time to wait for the blobs to exist,
// or 0 for no delay.
Stat(dest chan<- *blobref.SizedBlobRef,
blobs []*blobref.BlobRef,
waitSeconds int) os.Error
blobs []*blobref.BlobRef,
waitSeconds int) os.Error
// TODO-GO: file a gofmt bug on how ugly those lines above look
}
// QueueCreator is implemented by Storage interfaces which support
@ -83,9 +84,9 @@ type BlobEnumerator interface {
// EnumerateBlobs must close the channel. (even if limit
// was hit and more blobs remain)
EnumerateBlobs(dest chan<- *blobref.SizedBlobRef,
after string,
limit uint,
waitSeconds int) os.Error
after string,
limit uint,
waitSeconds int) os.Error
}
// Cache is the minimal interface expected of a blob cache.
@ -95,9 +96,14 @@ type Cache interface {
BlobStatter
}
type BlobReceiveConfiger interface {
BlobReceiver
Configer
}
type Config struct {
Writable, Readable bool
IsQueue bool // supports deletes
IsQueue bool // supports deletes
// the "http://host:port" and optional path (but without trailing slash) to have "/camli/*" appended
URLBase string
@ -121,3 +127,8 @@ type Storage interface {
// Returns the blob notification bus
GetBlobHub() BlobHub
}
type StorageConfiger interface {
Storage
Configer
}

View File

@ -73,7 +73,12 @@ func (c *Client) SetHttpClient(client *http.Client) {
func NewOrFail() *Client {
log := log.New(os.Stderr, "", log.Ldate|log.Ltime)
return &Client{server: blobServerOrDie(), password: passwordOrDie(), log: log}
return &Client{
server: blobServerOrDie(),
password: passwordOrDie(),
httpClient: http.DefaultClient,
log: log,
}
}
type devNullWriter struct{}

View File

@ -31,9 +31,11 @@ import (
"strings"
)
var _ = log.Printf
type UploadHandle struct {
BlobRef *blobref.BlobRef
Size int64
Size int64 // or -1 if size isn't known
Contents io.Reader
}
@ -57,7 +59,7 @@ func newResFormatError(s string, arg ...interface{}) ResponseFormatError {
return ResponseFormatError(fmt.Errorf(s, arg...))
}
func parseStatResponse(r io.Reader) (*statResponse, os.Error) {
func parseStatResponse(r io.Reader) (sr *statResponse, _ os.Error) {
var (
ok bool
err os.Error
@ -67,6 +69,11 @@ func parseStatResponse(r io.Reader) (*statResponse, os.Error) {
if err = json.NewDecoder(io.LimitReader(r, 5<<20)).Decode(&jmap); err != nil {
return nil, ResponseFormatError(err)
}
defer func() {
if sr == nil {
log.Printf("parseStatResponse got map: %#v", jmap)
}
}()
s.uploadUrl, ok = jmap["uploadUrl"].(string)
if !ok {
@ -206,7 +213,9 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
c.statsMutex.Lock()
c.stats.UploadRequests.Blobs++
c.stats.UploadRequests.Bytes += h.Size
if h.Size != -1 {
c.stats.UploadRequests.Bytes += h.Size
}
c.statsMutex.Unlock()
blobRefString := h.BlobRef.String()
@ -238,6 +247,27 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
pr := &PutResult{BlobRef: h.BlobRef, Size: h.Size}
if _, ok := stat.HaveMap[h.BlobRef.String()]; ok {
pr.Skipped = true
// Consume the buffer that was provided, just for
// consistency. But if it's a closer, do that
// instead. But if they didn't provide a size,
// we consume it anyway just to get the size
// for stats.
closer, _ := h.Contents.(io.Closer)
if h.Size >= 0 && closer != nil {
closer.Close()
} else {
n, err := io.Copy(ioutil.Discard, h.Contents)
if err != nil {
return nil, err
}
if h.Size == -1 {
pr.Size = n
c.statsMutex.Lock()
c.stats.UploadRequests.Bytes += pr.Size
c.statsMutex.Unlock()
}
}
return pr, nil
}
@ -255,43 +285,55 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
c.log.Printf("Uploading to URL: %s", stat.uploadUrl)
req = c.newRequest("POST", stat.uploadUrl)
req.Header.Set("Content-Type", "multipart/form-data; boundary="+boundary)
contentsSize := int64(0)
req.Body = ioutil.NopCloser(io.MultiReader(
strings.NewReader(multiPartHeader),
h.Contents,
countingReader{h.Contents, &contentsSize},
strings.NewReader(multiPartFooter)))
req.ContentLength = int64(len(multiPartHeader)) + h.Size + int64(len(multiPartFooter))
if h.Size >= 0 {
req.ContentLength = int64(len(multiPartHeader)) + h.Size + int64(len(multiPartFooter))
}
req.TransferEncoding = nil
resp, err = c.httpClient.Do(req)
if err != nil {
return error("upload http error", err)
return error("upload http error: %v", err)
}
if h.Size >= 0 {
if contentsSize != h.Size {
return error("UploadHandle declared size %d but Contents length was %d", h.Size, contentsSize)
}
} else {
h.Size = contentsSize
}
// The only valid HTTP responses are 200 and 303.
if resp.StatusCode != 200 && resp.StatusCode != 303 {
return error(fmt.Sprintf("invalid http response %d in upload response", resp.StatusCode), nil)
return error("invalid http response %d in upload response", resp.StatusCode)
}
if resp.StatusCode == 303 {
otherLocation := resp.Header.Get("Location")
if otherLocation == "" {
return error("303 without a Location", nil)
return error("303 without a Location")
}
baseUrl, _ := http.ParseURL(stat.uploadUrl)
absUrl, err := baseUrl.ParseURL(otherLocation)
if err != nil {
return error("303 Location URL relative resolve error", err)
return error("303 Location URL relative resolve error: %v", err)
}
otherLocation = absUrl.String()
resp, _, err = http.Get(otherLocation)
if err != nil {
return error("error following 303 redirect after upload", err)
return error("error following 303 redirect after upload: %v", err)
}
}
ures, err := c.jsonFromResponse("upload", resp)
if err != nil {
return error("json parse from upload error", err)
return error("json parse from upload error: %v", err)
}
errorText, ok := ures["errorText"].(string)
@ -301,18 +343,18 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
received, ok := ures["received"].([]interface{})
if !ok {
return error("upload json validity error: no 'received'", nil)
return error("upload json validity error: no 'received'")
}
for _, rit := range received {
it, ok := rit.(map[string]interface{})
if !ok {
return error("upload json validity error: 'received' is malformed", nil)
return error("upload json validity error: 'received' is malformed")
}
if it["blobRef"] == blobRefString {
switch size := it["size"].(type) {
case nil:
return error("upload json validity error: 'received' is missing 'size'", nil)
return error("upload json validity error: 'received' is missing 'size'")
case float64:
if int64(size) == h.Size {
// Success!
@ -322,14 +364,25 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
c.statsMutex.Unlock()
return pr, nil
} else {
return error(fmt.Sprintf("Server got blob, but reports wrong length (%v; expected %d)",
size, h.Size),nil)
return error("Server got blob, but reports wrong length (%v; expected %d)",
size, h.Size)
}
default:
return error("unsupported type of 'size' in received response", nil)
return error("unsupported type of 'size' in received response")
}
}
}
return nil, os.NewError("Server didn't receive blob.")
}
type countingReader struct {
r io.Reader
n *int64
}
func (cr countingReader) Read(p []byte) (n int, err os.Error) {
n, err = cr.r.Read(p)
*cr.n += int64(n)
return
}

View File

@ -63,8 +63,31 @@ func unsupportedHandler(conn http.ResponseWriter, req *http.Request) {
httputil.BadRequestError(conn, "Unsupported camlistore path or method.")
}
type storageAndConfig struct {
blobserver.Storage
config *blobserver.Config
}
func (s *storageAndConfig) Config() *blobserver.Config {
return s.config
}
// where prefix is like "/" or "/s3/" for e.g. "/camli/" or "/s3/camli/*"
func makeCamliHandler(prefix, baseURL string, storage blobserver.Storage) http.Handler {
if !strings.HasSuffix(prefix, "/") {
panic("expected prefix to end in slash")
}
baseURL = strings.TrimRight(baseURL, "/")
storageConfig := &storageAndConfig{
storage,
&blobserver.Config{
Writable: true,
Readable: true,
IsQueue: false,
URLBase: baseURL + prefix[:len(prefix)-1],
},
}
return http.HandlerFunc(func(conn http.ResponseWriter, req *http.Request) {
action, err := parseCamliPath(req.URL.Path[len(prefix)-1:])
if err != nil {
@ -73,13 +96,11 @@ func makeCamliHandler(prefix, baseURL string, storage blobserver.Storage) http.H
unsupportedHandler(conn, req)
return
}
// TODO: actually deal with partitions here
part := &partitionConfig{"", true, true, false, nil, baseURL + prefix[:len(prefix)-1]}
handleCamliUsingStorage(conn, req, action, part, storage)
handleCamliUsingStorage(conn, req, action, storageConfig)
})
}
func handleCamliUsingStorage(conn http.ResponseWriter, req *http.Request, action string, partition blobserver.Partition, storage blobserver.Storage) {
func handleCamliUsingStorage(conn http.ResponseWriter, req *http.Request, action string, storage blobserver.StorageConfiger) {
handler := unsupportedHandler
switch req.Method {
case "GET":