From 0adecfb4cc7e4cd76d9d8f4d7020f754ff40cb65 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Fri, 24 Dec 2010 07:46:12 -0800 Subject: [PATCH] Upload work. --- clients/go/camput/camput.go | 182 ++++++++++++++++++-------- doc/protocol/blob-upload-protocol.txt | 2 +- 2 files changed, 129 insertions(+), 55 deletions(-) diff --git a/clients/go/camput/camput.go b/clients/go/camput/camput.go index f862d44d6..6fd1ac60b 100644 --- a/clients/go/camput/camput.go +++ b/clients/go/camput/camput.go @@ -18,16 +18,25 @@ import ( "log" "os" "strings" + "strconv" ) // Things that can be uploaded. (at most one of these) var flagBlob *bool = flag.Bool("blob", true, "upload a file's bytes as a single blob") var flagFile *bool = flag.Bool("file", false, "upload a file's bytes as a blob, as well as its JSON file record") +var flagVerbose *bool = flag.Bool("verbose", false, "be verbose") type UploadHandle struct { - blobref *blobref.BlobRef - contents io.ReadSeeker + BlobRef *blobref.BlobRef + Size int64 + Contents io.ReadSeeker +} + +type PutResult struct { + BlobRef *blobref.BlobRef + Size int64 + Skipped bool // already present on blobserver } // Upload agent @@ -46,66 +55,75 @@ func encodeBase64(s string) string { return string(buf) } -func (a *Agent) Upload(h *UploadHandle) { +func jsonFromResponse(resp *http.Response) (map[string]interface{}, os.Error) { + // TODO: LimitReader here for paranoia + buf := new(bytes.Buffer) + io.Copy(buf, resp.Body) + resp.Body.Close() + + fmt.Printf("Got HTTP response: [%s]\n", buf) + jmap := make(map[string]interface{}) + if jerr := json.Unmarshal(buf.Bytes(), &jmap); jerr != nil { + return nil, jerr + } + return jmap, nil +} + +func (a *Agent) Upload(h *UploadHandle) (*PutResult, os.Error) { url := fmt.Sprintf("%s/camli/preupload", a.server) fmt.Println("Need to upload: ", h, "to", url) - error := func(msg string, e os.Error) { - fmt.Fprintf(os.Stderr, "%s on %v: %v\n", msg, h.blobref, e) - return + error := func(msg string, e os.Error) (*PutResult, os.Error) { + err := os.NewError(fmt.Sprintf("Error uploading blob %s: %s; err=%s", + h.BlobRef, msg, e)) + log.Print(err.String()) + return nil, err } authHeader := "Basic " + encodeBase64("username:" + a.password) + blobRefString := h.BlobRef.String() req := http.NewPostRequest( url, "application/x-www-form-urlencoded", - strings.NewReader("camliversion=1&blob1="+h.blobref.String())) + strings.NewReader("camliversion=1&blob1="+blobRefString)) req.Header["Authorization"] = authHeader log.Printf("Request is %v", req.Request) resp, err := req.Send() if err != nil { - log.Exitf("Upload error for %v: %v\n", h.blobref, err) + return error("preupload http error", err) } - fmt.Println("Got response:", resp) - buf := new(bytes.Buffer) - io.Copy(buf, resp.Body) - resp.Body.Close() - - pur := make(map[string]interface{}) - jerr := json.Unmarshal(buf.Bytes(), &pur) - if jerr != nil { - error("preupload parse error", jerr) - return + pur, err := jsonFromResponse(resp) + if err != nil { + return error("preupload json parse error", err) } - + uploadUrl, ok := pur["uploadUrl"].(string) if uploadUrl == "" { - error("no uploadUrl in preupload response", nil) - return + return error("preupload json validity error: no 'uploadUrl'", nil) } alreadyHave, ok := pur["alreadyHave"].([]interface{}) if !ok { - error("no alreadyHave array in preupload response", nil) - return + return error("preupload json validity error: no 'alreadyHave'", nil) } + pr := &PutResult{BlobRef: h.BlobRef, Size: h.Size} + for _, haveObj := range alreadyHave { haveObj := haveObj.(map[string]interface{}) - if haveObj["blobRef"].(string) == h.blobref.String() { - fmt.Println("already have it!") - // TODO: signal success - return + if haveObj["blobRef"].(string) == h.BlobRef.String() { + pr.Skipped = true + return pr, nil } } fmt.Println("preupload done:", pur, alreadyHave) boundary := "sdf8sd8f7s9df9s7df9sd7sdf9s879vs7d8v7sd8v7sd8v" - h.contents.Seek(0, 0) + h.Contents.Seek(0, 0) req = http.NewPostRequest(uploadUrl, "multipart/form-data; boundary="+boundary, @@ -113,47 +131,87 @@ func (a *Agent) Upload(h *UploadHandle) { strings.NewReader(fmt.Sprintf( "--%s\r\nContent-Disposition: form-data; name=\"%s\"\r\n\r\n", boundary, - h.blobref)), - h.contents, + h.BlobRef)), + h.Contents, strings.NewReader("\r\n--"+boundary+"--\r\n"))) req.Header["Authorization"] = authHeader resp, err = req.Send() - if err != nil { - error("camli upload error", err) - return + return error("upload http error", err) } - fmt.Println("Uploaded!") - fmt.Println("Got response: ", resp) - resp.Write(os.Stdout) + + // The only valid HTTP responses are 200 and 303. + if resp.StatusCode != 200 && resp.StatusCode != 303 { + return error(fmt.Sprintf("invalid http response %d in upload response", resp.StatusCode), nil) + } + + if resp.StatusCode == 303 { + // TODO + log.Exitf("TODO: handle 303? or does the Go http client do it already? how to enforce only 200 and 303 if so?") + } + + ures, err := jsonFromResponse(resp) + if err != nil { + return error("json parse from upload error", err) + } + + errorText, ok := ures["errorText"].(string) + if ok { + log.Printf("Blob server reports error: %s", errorText) + } + + received, ok := ures["received"].([]interface{}) + if !ok { + return error("upload json validity error: no 'received'", nil) + } + + for _, rit := range received { + it, ok := rit.(map[string]string) + if !ok { + return error("upload json validity error: 'received' is malformed", nil) + } + if it["blobRef"] == blobRefString { + sizeStr, hasSize := it["size"] + if !hasSize { + return error("upload json validity error: 'received' is missing 'size'", nil) + } + gotSize, _ := strconv.Atoi64(sizeStr) + if gotSize == h.Size { + // Success! + return pr, nil + } + } + } + + return nil, os.NewError("Server didn't receive blob.") } -func (a *Agent) Wait() int { - // TODO - return 0 -} - -func blobName(contents io.ReadSeeker) *blobref.BlobRef { +func blobDetails(contents io.ReadSeeker) (bref *blobref.BlobRef, size int64, err os.Error) { s1 := sha1.New() contents.Seek(0, 0) - io.Copy(s1, contents) - return blobref.Parse(fmt.Sprintf("sha1-%x", s1.Sum())) + size, err = io.Copy(s1, contents) + if err != nil { + return + } + return blobref.Parse(fmt.Sprintf("sha1-%x", s1.Sum())), size, nil } -func (a *Agent) UploadFileBlob(filename string) (*blobref.BlobRef, os.Error) { +func (a *Agent) UploadFileBlob(filename string) (*PutResult, os.Error) { log.Printf("Uploading filename: %s", filename) file, err := os.Open(filename, os.O_RDONLY, 0) if err != nil { return nil, err } - log.Printf("blob is:", blobName(file)) - handle := &UploadHandle{blobName(file), file} - a.Upload(handle) - return handle.blobref, nil + ref, size, err := blobDetails(file) + if err != nil { + return nil, err + } + handle := &UploadHandle{ref, size, file} + return a.Upload(handle) } -func (a *Agent) UploadFile(filename string) (*blobref.BlobRef, os.Error) { +func (a *Agent) UploadFile(filename string) (*PutResult, os.Error) { return nil, nil } @@ -180,6 +238,21 @@ Usage: camliup os.Exit(1) } +var wereErrors = false + +func handleResult(what string, pr *PutResult, err os.Error) { + if err != nil { + log.Printf("Error putting %s: %s", what, err) + wereErrors = true + return + } + if *flagVerbose { + fmt.Printf("Put %s: %q\n", what, pr) + } else { + fmt.Println(pr.BlobRef.String()) + } +} + func main() { flag.Parse() @@ -191,13 +264,14 @@ func main() { if *flagFile || *flagBlob { for n := 0; n < flag.NArg(); n++ { if *flagBlob { - agent.UploadFileBlob(flag.Arg(n)) + pr, err := agent.UploadFileBlob(flag.Arg(n)) + handleResult("blob", pr, err) } else { - agent.UploadFile(flag.Arg(n)) + pr, err := agent.UploadFile(flag.Arg(n)) + handleResult("file", pr, err) } } } - stats := agent.Wait() - fmt.Println("Done uploading; stats:", stats) + } diff --git a/doc/protocol/blob-upload-protocol.txt b/doc/protocol/blob-upload-protocol.txt index 5a201be77..8ec9f83cc 100644 --- a/doc/protocol/blob-upload-protocol.txt +++ b/doc/protocol/blob-upload-protocol.txt @@ -118,7 +118,7 @@ Content-Type: application/octet-stream --randomboundaryXYZ-- ----------------------------------------------------- -Response (statis may be a 200 or a 303 to this data) +Response (status may be a 200 or a 303 to this data) ----------------------------------------------------- HTTP/1.1 200 OK