Upload work.

This commit is contained in:
Brad Fitzpatrick 2010-12-24 07:46:12 -08:00
parent eca9778120
commit 0adecfb4cc
2 changed files with 129 additions and 55 deletions

View File

@ -18,16 +18,25 @@ import (
"log"
"os"
"strings"
"strconv"
)
// Things that can be uploaded. (at most one of these)
var flagBlob *bool = flag.Bool("blob", true, "upload a file's bytes as a single blob")
var flagFile *bool = flag.Bool("file", false, "upload a file's bytes as a blob, as well as its JSON file record")
var flagVerbose *bool = flag.Bool("verbose", false, "be verbose")
type UploadHandle struct {
blobref *blobref.BlobRef
contents io.ReadSeeker
BlobRef *blobref.BlobRef
Size int64
Contents io.ReadSeeker
}
type PutResult struct {
BlobRef *blobref.BlobRef
Size int64
Skipped bool // already present on blobserver
}
// Upload agent
@ -46,66 +55,75 @@ func encodeBase64(s string) string {
return string(buf)
}
func (a *Agent) Upload(h *UploadHandle) {
func jsonFromResponse(resp *http.Response) (map[string]interface{}, os.Error) {
// TODO: LimitReader here for paranoia
buf := new(bytes.Buffer)
io.Copy(buf, resp.Body)
resp.Body.Close()
fmt.Printf("Got HTTP response: [%s]\n", buf)
jmap := make(map[string]interface{})
if jerr := json.Unmarshal(buf.Bytes(), &jmap); jerr != nil {
return nil, jerr
}
return jmap, nil
}
func (a *Agent) Upload(h *UploadHandle) (*PutResult, os.Error) {
url := fmt.Sprintf("%s/camli/preupload", a.server)
fmt.Println("Need to upload: ", h, "to", url)
error := func(msg string, e os.Error) {
fmt.Fprintf(os.Stderr, "%s on %v: %v\n", msg, h.blobref, e)
return
error := func(msg string, e os.Error) (*PutResult, os.Error) {
err := os.NewError(fmt.Sprintf("Error uploading blob %s: %s; err=%s",
h.BlobRef, msg, e))
log.Print(err.String())
return nil, err
}
authHeader := "Basic " + encodeBase64("username:" + a.password)
blobRefString := h.BlobRef.String()
req := http.NewPostRequest(
url,
"application/x-www-form-urlencoded",
strings.NewReader("camliversion=1&blob1="+h.blobref.String()))
strings.NewReader("camliversion=1&blob1="+blobRefString))
req.Header["Authorization"] = authHeader
log.Printf("Request is %v", req.Request)
resp, err := req.Send()
if err != nil {
log.Exitf("Upload error for %v: %v\n", h.blobref, err)
return error("preupload http error", err)
}
fmt.Println("Got response:", resp)
buf := new(bytes.Buffer)
io.Copy(buf, resp.Body)
resp.Body.Close()
pur := make(map[string]interface{})
jerr := json.Unmarshal(buf.Bytes(), &pur)
if jerr != nil {
error("preupload parse error", jerr)
return
pur, err := jsonFromResponse(resp)
if err != nil {
return error("preupload json parse error", err)
}
uploadUrl, ok := pur["uploadUrl"].(string)
if uploadUrl == "" {
error("no uploadUrl in preupload response", nil)
return
return error("preupload json validity error: no 'uploadUrl'", nil)
}
alreadyHave, ok := pur["alreadyHave"].([]interface{})
if !ok {
error("no alreadyHave array in preupload response", nil)
return
return error("preupload json validity error: no 'alreadyHave'", nil)
}
pr := &PutResult{BlobRef: h.BlobRef, Size: h.Size}
for _, haveObj := range alreadyHave {
haveObj := haveObj.(map[string]interface{})
if haveObj["blobRef"].(string) == h.blobref.String() {
fmt.Println("already have it!")
// TODO: signal success
return
if haveObj["blobRef"].(string) == h.BlobRef.String() {
pr.Skipped = true
return pr, nil
}
}
fmt.Println("preupload done:", pur, alreadyHave)
boundary := "sdf8sd8f7s9df9s7df9sd7sdf9s879vs7d8v7sd8v7sd8v"
h.contents.Seek(0, 0)
h.Contents.Seek(0, 0)
req = http.NewPostRequest(uploadUrl,
"multipart/form-data; boundary="+boundary,
@ -113,47 +131,87 @@ func (a *Agent) Upload(h *UploadHandle) {
strings.NewReader(fmt.Sprintf(
"--%s\r\nContent-Disposition: form-data; name=\"%s\"\r\n\r\n",
boundary,
h.blobref)),
h.contents,
h.BlobRef)),
h.Contents,
strings.NewReader("\r\n--"+boundary+"--\r\n")))
req.Header["Authorization"] = authHeader
resp, err = req.Send()
if err != nil {
error("camli upload error", err)
return
return error("upload http error", err)
}
fmt.Println("Uploaded!")
fmt.Println("Got response: ", resp)
resp.Write(os.Stdout)
// The only valid HTTP responses are 200 and 303.
if resp.StatusCode != 200 && resp.StatusCode != 303 {
return error(fmt.Sprintf("invalid http response %d in upload response", resp.StatusCode), nil)
}
if resp.StatusCode == 303 {
// TODO
log.Exitf("TODO: handle 303? or does the Go http client do it already? how to enforce only 200 and 303 if so?")
}
ures, err := jsonFromResponse(resp)
if err != nil {
return error("json parse from upload error", err)
}
errorText, ok := ures["errorText"].(string)
if ok {
log.Printf("Blob server reports error: %s", errorText)
}
received, ok := ures["received"].([]interface{})
if !ok {
return error("upload json validity error: no 'received'", nil)
}
for _, rit := range received {
it, ok := rit.(map[string]string)
if !ok {
return error("upload json validity error: 'received' is malformed", nil)
}
if it["blobRef"] == blobRefString {
sizeStr, hasSize := it["size"]
if !hasSize {
return error("upload json validity error: 'received' is missing 'size'", nil)
}
gotSize, _ := strconv.Atoi64(sizeStr)
if gotSize == h.Size {
// Success!
return pr, nil
}
}
}
return nil, os.NewError("Server didn't receive blob.")
}
func (a *Agent) Wait() int {
// TODO
return 0
}
func blobName(contents io.ReadSeeker) *blobref.BlobRef {
func blobDetails(contents io.ReadSeeker) (bref *blobref.BlobRef, size int64, err os.Error) {
s1 := sha1.New()
contents.Seek(0, 0)
io.Copy(s1, contents)
return blobref.Parse(fmt.Sprintf("sha1-%x", s1.Sum()))
size, err = io.Copy(s1, contents)
if err != nil {
return
}
return blobref.Parse(fmt.Sprintf("sha1-%x", s1.Sum())), size, nil
}
func (a *Agent) UploadFileBlob(filename string) (*blobref.BlobRef, os.Error) {
func (a *Agent) UploadFileBlob(filename string) (*PutResult, os.Error) {
log.Printf("Uploading filename: %s", filename)
file, err := os.Open(filename, os.O_RDONLY, 0)
if err != nil {
return nil, err
}
log.Printf("blob is:", blobName(file))
handle := &UploadHandle{blobName(file), file}
a.Upload(handle)
return handle.blobref, nil
ref, size, err := blobDetails(file)
if err != nil {
return nil, err
}
handle := &UploadHandle{ref, size, file}
return a.Upload(handle)
}
func (a *Agent) UploadFile(filename string) (*blobref.BlobRef, os.Error) {
func (a *Agent) UploadFile(filename string) (*PutResult, os.Error) {
return nil, nil
}
@ -180,6 +238,21 @@ Usage: camliup
os.Exit(1)
}
var wereErrors = false
func handleResult(what string, pr *PutResult, err os.Error) {
if err != nil {
log.Printf("Error putting %s: %s", what, err)
wereErrors = true
return
}
if *flagVerbose {
fmt.Printf("Put %s: %q\n", what, pr)
} else {
fmt.Println(pr.BlobRef.String())
}
}
func main() {
flag.Parse()
@ -191,13 +264,14 @@ func main() {
if *flagFile || *flagBlob {
for n := 0; n < flag.NArg(); n++ {
if *flagBlob {
agent.UploadFileBlob(flag.Arg(n))
pr, err := agent.UploadFileBlob(flag.Arg(n))
handleResult("blob", pr, err)
} else {
agent.UploadFile(flag.Arg(n))
pr, err := agent.UploadFile(flag.Arg(n))
handleResult("file", pr, err)
}
}
}
stats := agent.Wait()
fmt.Println("Done uploading; stats:", stats)
}

View File

@ -118,7 +118,7 @@ Content-Type: application/octet-stream
--randomboundaryXYZ--
-----------------------------------------------------
Response (statis may be a 200 or a 303 to this data)
Response (status may be a 200 or a 303 to this data)
-----------------------------------------------------
HTTP/1.1 200 OK