diff --git a/lib/go/camli/blobserver/interface.go b/lib/go/camli/blobserver/interface.go index 12ba50530..7ab564321 100644 --- a/lib/go/camli/blobserver/interface.go +++ b/lib/go/camli/blobserver/interface.go @@ -22,6 +22,8 @@ import ( "os" ) +// TODO: put directional constraints on all the channel types + var CorruptBlobError = os.NewError("corrupt blob; digest doesn't match") type NamedPartition interface { diff --git a/lib/go/camli/blobserver/remote/remote.go b/lib/go/camli/blobserver/remote/remote.go index a114a9db0..4f88f8a6e 100644 --- a/lib/go/camli/blobserver/remote/remote.go +++ b/lib/go/camli/blobserver/remote/remote.go @@ -50,11 +50,11 @@ func newFromConfig(config jsonconfig.Obj) (storage blobserver.Storage, err os.Er } func (sto *remoteStorage) Remove(blobs []*blobref.BlobRef) os.Error { - return os.NewError("TODO: implement") + return sto.client.RemoveBlobs(blobs) } func (sto *remoteStorage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { - return os.NewError("TODO: implement") + return sto.client.Stat(dest, blobs, waitSeconds) } func (sto *remoteStorage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error) { diff --git a/lib/go/camli/client/client.go b/lib/go/camli/client/client.go index a660b8f9a..dc2c304cc 100644 --- a/lib/go/camli/client/client.go +++ b/lib/go/camli/client/client.go @@ -18,6 +18,7 @@ package client import ( "fmt" + "http" "log" "os" "sync" @@ -26,11 +27,11 @@ import ( type Stats struct { // The number of uploads that were requested, but perhaps // not actually performed if the server already had the items. - UploadRequests ByCountAndBytes + UploadRequests ByCountAndBytes // The uploads which were actually sent to the blobserver // due to the server not having the blobs - Uploads ByCountAndBytes + Uploads ByCountAndBytes } func (s *Stats) String() string { @@ -38,13 +39,15 @@ func (s *Stats) String() string { } type Client struct { - server string // URL prefix before "/camli/" + server string // URL prefix before "/camli/" password string - - statsMutex sync.Mutex + + httpClient *http.Client + + statsMutex sync.Mutex stats Stats - log *log.Logger // not nil + log *log.Logger // not nil } type ByCountAndBytes struct { @@ -58,17 +61,23 @@ func (bb *ByCountAndBytes) String() string { func New(server, password string) *Client { return &Client{ - server: server, - password: password, + server: server, + password: password, + httpClient: http.DefaultClient, } } +func (c *Client) SetHttpClient(client *http.Client) { + c.httpClient = client +} + func NewOrFail() *Client { log := log.New(os.Stderr, "", log.Ldate|log.Ltime) return &Client{server: blobServerOrDie(), password: passwordOrDie(), log: log} } type devNullWriter struct{} + func (_ *devNullWriter) Write(p []byte) (int, os.Error) { return len(p), nil } @@ -84,7 +93,7 @@ func (c *Client) SetLogger(logger *log.Logger) { func (c *Client) Stats() Stats { c.statsMutex.Lock() defer c.statsMutex.Unlock() - return c.stats // copy + return c.stats // copy } func (c *Client) HasAuthCredentials() bool { @@ -92,5 +101,5 @@ func (c *Client) HasAuthCredentials() bool { } func (c *Client) authHeader() string { - return "Basic " + encodeBase64("username:" + c.password) + return "Basic " + encodeBase64("username:"+c.password) } diff --git a/lib/go/camli/client/enumerate.go b/lib/go/camli/client/enumerate.go index 7144123f9..ac596b922 100644 --- a/lib/go/camli/client/enumerate.go +++ b/lib/go/camli/client/enumerate.go @@ -58,12 +58,12 @@ func (c *Client) EnumerateBlobsOpts(ch chan *blobref.SizedBlobRef, opts Enumerat url := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d&maxwaitsec=%d", c.server, http.URLEscape(after), enumerateBatchSize, waitSec) req := c.newRequest("GET", url) - resp, err := http.DefaultClient.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { return error("http request", err) } - json, err := c.jsonFromResponse(resp) + json, err := c.jsonFromResponse("enumerate-blobs", resp) if err != nil { return error("stat json parse error", err) } diff --git a/lib/go/camli/client/get.go b/lib/go/camli/client/get.go index f015cf452..3dbeaa734 100644 --- a/lib/go/camli/client/get.go +++ b/lib/go/camli/client/get.go @@ -59,7 +59,7 @@ func (c *Client) FetchVia(b *blobref.BlobRef, v []*blobref.BlobRef) (io.ReadClos } req := c.newRequest("GET", url) - resp, err := http.DefaultClient.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { return nil, 0, err } diff --git a/lib/go/camli/client/remove.go b/lib/go/camli/client/remove.go index a8b6c29ea..d49caabde 100644 --- a/lib/go/camli/client/remove.go +++ b/lib/go/camli/client/remove.go @@ -60,7 +60,7 @@ func (c *Client) RemoveBlobs(blobs []*blobref.BlobRef) os.Error { if c.HasAuthCredentials() { req.Header.Add("Authorization", c.authHeader()) } - resp, err := http.DefaultClient.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { return os.NewError(fmt.Sprintf("Got status code %d from blobserver for remove %s", resp.StatusCode, body)) diff --git a/lib/go/camli/client/upload.go b/lib/go/camli/client/upload.go index f4378ff90..22a9408dd 100644 --- a/lib/go/camli/client/upload.go +++ b/lib/go/camli/client/upload.go @@ -37,9 +37,9 @@ type UploadHandle struct { } type PutResult struct { - BlobRef *blobref.BlobRef - Size int64 - Skipped bool // already present on blobserver + BlobRef *blobref.BlobRef + Size int64 + Skipped bool // already present on blobserver } type nopCloser struct { @@ -63,11 +63,11 @@ func encodeBase64(s string) string { return string(buf) } -func (c *Client) jsonFromResponse(resp *http.Response) (map[string]interface{}, os.Error) { +func (c *Client) jsonFromResponse(requestName string, resp *http.Response) (map[string]interface{}, os.Error) { if resp.StatusCode != 200 { - log.Printf("Failed to JSON from response; status code is %d", resp.StatusCode) + log.Printf("After %s request, failed to JSON from response; status code is %d", requestName, resp.StatusCode) io.Copy(os.Stderr, resp.Body) - return nil, os.NewError(fmt.Sprintf("HTTP response code is %d; no JSON to parse.", resp.StatusCode)) + return nil, os.NewError(fmt.Sprintf("After %s request, HTTP response code is %d; no JSON to parse.", requestName, resp.StatusCode)) } // TODO: LimitReader here for paranoia buf := new(bytes.Buffer) @@ -80,6 +80,40 @@ func (c *Client) jsonFromResponse(resp *http.Response) (map[string]interface{}, return jmap, nil } +func (c *Client) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { + if len(blobs) == 0 { + return nil + } + + // TODO: if len(blobs) > 1000 or something, cut this up into + // multiple http requests, and also if the server returns a + // 400 error, per the blob-stat-protocol.txt document. + var buf bytes.Buffer + fmt.Fprintf(&buf, "camliversion=1") + for n, blob := range blobs { + if blob == nil { + panic("nil blob") + } + fmt.Fprintf(&buf, "&blob%d=%s", n+1, blob) + } + + req, err := http.NewRequest("POST", fmt.Sprintf("%s/camli/stat", c.server), strings.NewReader(buf.String())) + if err != nil { + return err + } + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") + req.ContentLength = int64(buf.Len()) + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("stat HTTP error: %v", err) + } + + resp = resp + + return os.NewError("TODO: implement") +} + func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { error := func(msg string, e os.Error) (*PutResult, os.Error) { err := fmt.Errorf("Error uploading blob %s: %s; err=%v", h.BlobRef, msg, e) @@ -97,23 +131,23 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { // Pre-upload. Check whether the blob already exists on the // server and if not, the URL to upload it to. url := fmt.Sprintf("%s/camli/stat", c.server) - requestBody := "camliversion=1&blob1="+blobRefString + requestBody := "camliversion=1&blob1=" + blobRefString req := c.newRequest("POST", url) req.Header.Add("Content-Type", "application/x-www-form-urlencoded") req.Body = &nopCloser{strings.NewReader(requestBody)} req.ContentLength = int64(len(requestBody)) req.TransferEncoding = nil - resp, err := http.DefaultClient.Do(req) + resp, err := c.httpClient.Do(req) if err != nil { return error("stat http error", err) } - pur, err := c.jsonFromResponse(resp) + pur, err := c.jsonFromResponse("pre-upload stat", resp) if err != nil { return error("json parse error", fmt.Errorf("response from %s wasn't valid JSON; wrong URL prefix?", url)) } - + uploadUrl, ok := pur["uploadUrl"].(string) if uploadUrl == "" { return error("stat json validity error: no 'uploadUrl'", nil) @@ -138,12 +172,13 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { // TODO: use a proper random boundary boundary := "sdf8sd8f7s9df9s7df9sd7sdf9s879vs7d8v7sd8v7sd8v" + // TODO-GO: add a multipart writer class. multiPartHeader := fmt.Sprintf( - "--%s\r\nContent-Type: application/octet-stream\r\n" + - "Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"\r\n\r\n", - boundary, - h.BlobRef, h.BlobRef) - multiPartFooter := "\r\n--"+boundary+"--\r\n" + "--%s\r\nContent-Type: application/octet-stream\r\n"+ + "Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"\r\n\r\n", + boundary, + h.BlobRef, h.BlobRef) + multiPartFooter := "\r\n--" + boundary + "--\r\n" c.log.Printf("Uploading to URL: %s", uploadUrl) req = c.newRequest("POST", uploadUrl) @@ -151,11 +186,11 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { req.Body = &nopCloser{io.MultiReader( strings.NewReader(multiPartHeader), h.Contents, - strings.NewReader(multiPartFooter))} + strings.NewReader(multiPartFooter))} req.ContentLength = int64(len(multiPartHeader)) + h.Size + int64(len(multiPartFooter)) req.TransferEncoding = nil - resp, err = http.DefaultClient.Do(req) + resp, err = c.httpClient.Do(req) if err != nil { return error("upload http error", err) } @@ -182,7 +217,7 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { } } - ures, err := c.jsonFromResponse(resp) + ures, err := c.jsonFromResponse("upload", resp) if err != nil { return error("json parse from upload error", err) } @@ -216,7 +251,7 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) { return pr, nil } else { return error(fmt.Sprintf("Server got blob, but reports wrong length (%v; expected %d)", - size, h.Size), nil) + size, h.Size),nil) } default: return error("unsupported type of 'size' in received response", nil)