Some work on remote blobserver & camli client; allow client to use alt http clients

This commit is contained in:
Brad Fitzpatrick 2011-05-10 06:25:18 -07:00
parent a31431a11b
commit c4eb6bb68d
7 changed files with 81 additions and 35 deletions

View File

@ -22,6 +22,8 @@ import (
"os"
)
// TODO: put directional constraints on all the channel types
var CorruptBlobError = os.NewError("corrupt blob; digest doesn't match")
type NamedPartition interface {

View File

@ -50,11 +50,11 @@ func newFromConfig(config jsonconfig.Obj) (storage blobserver.Storage, err os.Er
}
func (sto *remoteStorage) Remove(blobs []*blobref.BlobRef) os.Error {
return os.NewError("TODO: implement")
return sto.client.RemoveBlobs(blobs)
}
func (sto *remoteStorage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
return os.NewError("TODO: implement")
return sto.client.Stat(dest, blobs, waitSeconds)
}
func (sto *remoteStorage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error) {

View File

@ -18,6 +18,7 @@ package client
import (
"fmt"
"http"
"log"
"os"
"sync"
@ -26,11 +27,11 @@ import (
type Stats struct {
// The number of uploads that were requested, but perhaps
// not actually performed if the server already had the items.
UploadRequests ByCountAndBytes
UploadRequests ByCountAndBytes
// The uploads which were actually sent to the blobserver
// due to the server not having the blobs
Uploads ByCountAndBytes
Uploads ByCountAndBytes
}
func (s *Stats) String() string {
@ -38,13 +39,15 @@ func (s *Stats) String() string {
}
type Client struct {
server string // URL prefix before "/camli/"
server string // URL prefix before "/camli/"
password string
statsMutex sync.Mutex
httpClient *http.Client
statsMutex sync.Mutex
stats Stats
log *log.Logger // not nil
log *log.Logger // not nil
}
type ByCountAndBytes struct {
@ -58,17 +61,23 @@ func (bb *ByCountAndBytes) String() string {
func New(server, password string) *Client {
return &Client{
server: server,
password: password,
server: server,
password: password,
httpClient: http.DefaultClient,
}
}
func (c *Client) SetHttpClient(client *http.Client) {
c.httpClient = client
}
func NewOrFail() *Client {
log := log.New(os.Stderr, "", log.Ldate|log.Ltime)
return &Client{server: blobServerOrDie(), password: passwordOrDie(), log: log}
}
type devNullWriter struct{}
func (_ *devNullWriter) Write(p []byte) (int, os.Error) {
return len(p), nil
}
@ -84,7 +93,7 @@ func (c *Client) SetLogger(logger *log.Logger) {
func (c *Client) Stats() Stats {
c.statsMutex.Lock()
defer c.statsMutex.Unlock()
return c.stats // copy
return c.stats // copy
}
func (c *Client) HasAuthCredentials() bool {
@ -92,5 +101,5 @@ func (c *Client) HasAuthCredentials() bool {
}
func (c *Client) authHeader() string {
return "Basic " + encodeBase64("username:" + c.password)
return "Basic " + encodeBase64("username:"+c.password)
}

View File

@ -58,12 +58,12 @@ func (c *Client) EnumerateBlobsOpts(ch chan *blobref.SizedBlobRef, opts Enumerat
url := fmt.Sprintf("%s/camli/enumerate-blobs?after=%s&limit=%d&maxwaitsec=%d",
c.server, http.URLEscape(after), enumerateBatchSize, waitSec)
req := c.newRequest("GET", url)
resp, err := http.DefaultClient.Do(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return error("http request", err)
}
json, err := c.jsonFromResponse(resp)
json, err := c.jsonFromResponse("enumerate-blobs", resp)
if err != nil {
return error("stat json parse error", err)
}

View File

@ -59,7 +59,7 @@ func (c *Client) FetchVia(b *blobref.BlobRef, v []*blobref.BlobRef) (io.ReadClos
}
req := c.newRequest("GET", url)
resp, err := http.DefaultClient.Do(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, 0, err
}

View File

@ -60,7 +60,7 @@ func (c *Client) RemoveBlobs(blobs []*blobref.BlobRef) os.Error {
if c.HasAuthCredentials() {
req.Header.Add("Authorization", c.authHeader())
}
resp, err := http.DefaultClient.Do(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return os.NewError(fmt.Sprintf("Got status code %d from blobserver for remove %s", resp.StatusCode, body))

View File

@ -37,9 +37,9 @@ type UploadHandle struct {
}
type PutResult struct {
BlobRef *blobref.BlobRef
Size int64
Skipped bool // already present on blobserver
BlobRef *blobref.BlobRef
Size int64
Skipped bool // already present on blobserver
}
type nopCloser struct {
@ -63,11 +63,11 @@ func encodeBase64(s string) string {
return string(buf)
}
func (c *Client) jsonFromResponse(resp *http.Response) (map[string]interface{}, os.Error) {
func (c *Client) jsonFromResponse(requestName string, resp *http.Response) (map[string]interface{}, os.Error) {
if resp.StatusCode != 200 {
log.Printf("Failed to JSON from response; status code is %d", resp.StatusCode)
log.Printf("After %s request, failed to JSON from response; status code is %d", requestName, resp.StatusCode)
io.Copy(os.Stderr, resp.Body)
return nil, os.NewError(fmt.Sprintf("HTTP response code is %d; no JSON to parse.", resp.StatusCode))
return nil, os.NewError(fmt.Sprintf("After %s request, HTTP response code is %d; no JSON to parse.", requestName, resp.StatusCode))
}
// TODO: LimitReader here for paranoia
buf := new(bytes.Buffer)
@ -80,6 +80,40 @@ func (c *Client) jsonFromResponse(resp *http.Response) (map[string]interface{},
return jmap, nil
}
func (c *Client) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
if len(blobs) == 0 {
return nil
}
// TODO: if len(blobs) > 1000 or something, cut this up into
// multiple http requests, and also if the server returns a
// 400 error, per the blob-stat-protocol.txt document.
var buf bytes.Buffer
fmt.Fprintf(&buf, "camliversion=1")
for n, blob := range blobs {
if blob == nil {
panic("nil blob")
}
fmt.Fprintf(&buf, "&blob%d=%s", n+1, blob)
}
req, err := http.NewRequest("POST", fmt.Sprintf("%s/camli/stat", c.server), strings.NewReader(buf.String()))
if err != nil {
return err
}
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.ContentLength = int64(buf.Len())
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("stat HTTP error: %v", err)
}
resp = resp
return os.NewError("TODO: implement")
}
func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
error := func(msg string, e os.Error) (*PutResult, os.Error) {
err := fmt.Errorf("Error uploading blob %s: %s; err=%v", h.BlobRef, msg, e)
@ -97,23 +131,23 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
// Pre-upload. Check whether the blob already exists on the
// server and if not, the URL to upload it to.
url := fmt.Sprintf("%s/camli/stat", c.server)
requestBody := "camliversion=1&blob1="+blobRefString
requestBody := "camliversion=1&blob1=" + blobRefString
req := c.newRequest("POST", url)
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")
req.Body = &nopCloser{strings.NewReader(requestBody)}
req.ContentLength = int64(len(requestBody))
req.TransferEncoding = nil
resp, err := http.DefaultClient.Do(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return error("stat http error", err)
}
pur, err := c.jsonFromResponse(resp)
pur, err := c.jsonFromResponse("pre-upload stat", resp)
if err != nil {
return error("json parse error", fmt.Errorf("response from %s wasn't valid JSON; wrong URL prefix?", url))
}
uploadUrl, ok := pur["uploadUrl"].(string)
if uploadUrl == "" {
return error("stat json validity error: no 'uploadUrl'", nil)
@ -138,12 +172,13 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
// TODO: use a proper random boundary
boundary := "sdf8sd8f7s9df9s7df9sd7sdf9s879vs7d8v7sd8v7sd8v"
// TODO-GO: add a multipart writer class.
multiPartHeader := fmt.Sprintf(
"--%s\r\nContent-Type: application/octet-stream\r\n" +
"Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"\r\n\r\n",
boundary,
h.BlobRef, h.BlobRef)
multiPartFooter := "\r\n--"+boundary+"--\r\n"
"--%s\r\nContent-Type: application/octet-stream\r\n"+
"Content-Disposition: form-data; name=\"%s\"; filename=\"%s\"\r\n\r\n",
boundary,
h.BlobRef, h.BlobRef)
multiPartFooter := "\r\n--" + boundary + "--\r\n"
c.log.Printf("Uploading to URL: %s", uploadUrl)
req = c.newRequest("POST", uploadUrl)
@ -151,11 +186,11 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
req.Body = &nopCloser{io.MultiReader(
strings.NewReader(multiPartHeader),
h.Contents,
strings.NewReader(multiPartFooter))}
strings.NewReader(multiPartFooter))}
req.ContentLength = int64(len(multiPartHeader)) + h.Size + int64(len(multiPartFooter))
req.TransferEncoding = nil
resp, err = http.DefaultClient.Do(req)
resp, err = c.httpClient.Do(req)
if err != nil {
return error("upload http error", err)
}
@ -182,7 +217,7 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
}
}
ures, err := c.jsonFromResponse(resp)
ures, err := c.jsonFromResponse("upload", resp)
if err != nil {
return error("json parse from upload error", err)
}
@ -216,7 +251,7 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, os.Error) {
return pr, nil
} else {
return error(fmt.Sprintf("Server got blob, but reports wrong length (%v; expected %d)",
size, h.Size), nil)
size, h.Size),nil)
}
default:
return error("unsupported type of 'size' in received response", nil)