diff --git a/blobserver/go/Makefile b/blobserver/go/Makefile index ee1dfd94a..1e9f2a9ce 100644 --- a/blobserver/go/Makefile +++ b/blobserver/go/Makefile @@ -7,7 +7,14 @@ include $(GOROOT)/src/Make.$(GOARCH) TARG=camlistored GOFILES=\ camlistored.go\ + auth.go\ blobref.go\ + enumerate.go\ + get.go\ + http_util.go\ + preupload.go\ + temp_testing.go\ + upload.go\ include $(GOROOT)/src/Make.cmd diff --git a/blobserver/go/auth.go b/blobserver/go/auth.go new file mode 100644 index 000000000..b6e2faeca --- /dev/null +++ b/blobserver/go/auth.go @@ -0,0 +1,53 @@ +package main + +import ( + "encoding/base64" + "fmt" + "http" + "regexp" + "strings" +) + +var kBasicAuthPattern *regexp.Regexp = regexp.MustCompile(`^Basic ([a-zA-Z0-9\+/=]+)`) + +var accessPassword string + +func isAuthorized(req *http.Request) bool { + auth, present := req.Header["Authorization"] + if !present { + return false + } + matches := kBasicAuthPattern.MatchStrings(auth) + if len(matches) != 2 { + return false + } + encoded := matches[1] + enc := base64.StdEncoding + decBuf := make([]byte, enc.DecodedLen(len(encoded))) + n, err := enc.Decode(decBuf, []byte(encoded)) + if err != nil { + return false + } + userpass := strings.Split(string(decBuf[0:n]), ":", 2) + if len(userpass) != 2 { + fmt.Println("didn't get two pieces") + return false + } + password := userpass[1] // username at index 0 is currently unused + return password != "" && password == accessPassword +} + +// requireAuth wraps a function with another function that enforces +// HTTP Basic Auth. +func requireAuth(handler func(conn *http.Conn, req *http.Request)) func (conn *http.Conn, req *http.Request) { + return func (conn *http.Conn, req *http.Request) { + if !isAuthorized(req) { + conn.SetHeader("WWW-Authenticate", "Basic realm=\"camlistored\"") + conn.WriteHeader(http.StatusUnauthorized) + fmt.Fprintf(conn, "Authentication required.\n") + return + } + handler(conn, req) + } +} + diff --git a/blobserver/go/blobref.go b/blobserver/go/blobref.go index 318a9c87e..8715affe4 100644 --- a/blobserver/go/blobref.go +++ b/blobserver/go/blobref.go @@ -67,7 +67,8 @@ func (o *BlobRef) FileBaseName() string { } func (o *BlobRef) DirectoryName() string { - return fmt.Sprintf("%s/%s/%s", *storageRoot, o.Digest[0:3], o.Digest[3:6]) + return fmt.Sprintf("%s/%s/%s/%s", + *storageRoot, o.HashName, o.Digest[0:3], o.Digest[3:6]) } func (o *BlobRef) FileName() string { diff --git a/blobserver/go/camlistored.go b/blobserver/go/camlistored.go index 1db75e022..d6e8aa449 100644 --- a/blobserver/go/camlistored.go +++ b/blobserver/go/camlistored.go @@ -5,120 +5,16 @@ package main import ( - "container/vector" - "crypto/sha1" - "encoding/base64" "flag" "fmt" "http" - "io" - "io/ioutil" - "json" "os" - "regexp" - "strings" ) -// For `make`: -//import "./util/_obj/util" -// For `gofr`: -import "util/util" - -// import "mime/multipart" -// import multipart "github.com/bradfitz/golang-mime-multipart" - var listen *string = flag.String("listen", "0.0.0.0:3179", "host:port to listen on") var storageRoot *string = flag.String("root", "/tmp/camliroot", "Root directory to store files") var stealthMode *bool = flag.Bool("stealth", true, "Run in stealth mode.") -var accessPassword string - -var kBasicAuthPattern *regexp.Regexp = regexp.MustCompile(`^Basic ([a-zA-Z0-9\+/=]+)`) - -func badRequestError(conn *http.Conn, errorMessage string) { - conn.WriteHeader(http.StatusBadRequest) - fmt.Fprintf(conn, "%s\n", errorMessage) -} - -func serverError(conn *http.Conn, err os.Error) { - conn.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(conn, "Server error: %s\n", err) -} - -func putAllowed(req *http.Request) bool { - return isAuthorized(req) -} - -func isAuthorized(req *http.Request) bool { - auth, present := req.Header["Authorization"] - if !present { - return false - } - matches := kBasicAuthPattern.MatchStrings(auth) - if len(matches) != 2 { - return false - } - encoded := matches[1] - enc := base64.StdEncoding - decBuf := make([]byte, enc.DecodedLen(len(encoded))) - n, err := enc.Decode(decBuf, []byte(encoded)) - if err != nil { - return false - } - userpass := strings.Split(string(decBuf[0:n]), ":", 2) - if len(userpass) != 2 { - fmt.Println("didn't get two pieces") - return false - } - password := userpass[1] // username at index 0 is currently unused - return password != "" && password == accessPassword -} - -func getAllowed(req *http.Request) bool { - // For now... - return putAllowed(req) -} - -func handleCamliForm(conn *http.Conn, req *http.Request) { - fmt.Fprintf(conn, ` - - -
- -Text unix:
-Text win:
-Text mac:
-Image png:
- -
- - -`) -} - -func returnJson(conn *http.Conn, data interface{}) { - bytes, err := json.MarshalIndent(data, "", " ") - if err != nil { - badRequestError(conn, fmt.Sprintf( - "JSON serialization error: %v", err)) - return - } - conn.Write(bytes) - conn.Write([]byte("\n")) -} - -func requireAuth(handler func(conn *http.Conn, req *http.Request)) func (conn *http.Conn, req *http.Request) { - return func (conn *http.Conn, req *http.Request) { - if !isAuthorized(req) { - conn.SetHeader("WWW-Authenticate", "Basic realm=\"camlistored\"") - conn.WriteHeader(http.StatusUnauthorized) - fmt.Fprintf(conn, "Authentication required.\n") - return - } - handler(conn, req) - } -} - func handleCamli(conn *http.Conn, req *http.Request) { handler := func (conn *http.Conn, req *http.Request) { badRequestError(conn, "Unsupported path or method.") @@ -132,6 +28,8 @@ func handleCamli(conn *http.Conn, req *http.Request) { handler = requireAuth(handlePreUpload) case "/camli/upload": handler = requireAuth(handleMultiPartUpload) + case "/camli/enumerate-blobs": + handler = requireAuth(handleEnumerateBlobs) case "/camli/testform": // debug only handler = handleTestForm case "/camli/form": // debug only @@ -143,300 +41,11 @@ func handleCamli(conn *http.Conn, req *http.Request) { handler(conn, req) } -func handleGet(conn *http.Conn, req *http.Request) { - if !getAllowed(req) { - conn.SetHeader("WWW-Authenticate", "Basic realm=\"camlistored\"") - conn.WriteHeader(http.StatusUnauthorized) - fmt.Fprintf(conn, "Authentication required.") - return - } - - blobRef := ParsePath(req.URL.Path) - if blobRef == nil { - badRequestError(conn, "Malformed GET URL.") - return - } - fileName := blobRef.FileName() - stat, err := os.Stat(fileName) - if err == os.ENOENT { - conn.WriteHeader(http.StatusNotFound) - fmt.Fprintf(conn, "Object not found.") - return - } - if err != nil { - serverError(conn, err) - return - } - file, err := os.Open(fileName, os.O_RDONLY, 0) - if err != nil { - serverError(conn, err) - return - } - conn.SetHeader("Content-Type", "application/octet-stream") - bytesCopied, err := io.Copy(conn, file) - - // If there's an error at this point, it's too late to tell the client, - // as they've already been receiving bytes. But they should be smart enough - // to verify the digest doesn't match. But we close the (chunked) response anyway, - // to further signal errors. - if err != nil { - fmt.Fprintf(os.Stderr, "Error sending file: %v, err=%v\n", blobRef, err) - closer, _, err := conn.Hijack() - if err != nil { - closer.Close() - } - return - } - if bytesCopied != stat.Size { - fmt.Fprintf(os.Stderr, "Error sending file: %v, copied= %d, not %d%v\n", blobRef, - bytesCopied, stat.Size) - closer, _, err := conn.Hijack() - if err != nil { - closer.Close() - } - return - } -} - -func handleTestForm(conn *http.Conn, req *http.Request) { - if !(req.Method == "POST" && req.URL.Path == "/camli/testform") { - badRequestError(conn, "Inconfigured handler.") - return - } - - multipart, err := req.MultipartReader() - if multipart == nil { - badRequestError(conn, fmt.Sprintf("Expected multipart/form-data POST request; %v", err)) - return - } - - for { - part, err := multipart.NextPart() - if err != nil { - fmt.Println("Error reading:", err) - break - } - if part == nil { - break - } - formName := part.FormName() - fmt.Printf("New value [%s], part=%v\n", formName, part) - - sha1 := sha1.New() - io.Copy(sha1, part) - fmt.Printf("Got part digest: %x\n", sha1.Sum()) - - } - fmt.Println("Done reading multipart body.") - -} - -func handlePreUpload(conn *http.Conn, req *http.Request) { - if !(req.Method == "POST" && req.URL.Path == "/camli/preupload") { - badRequestError(conn, "Inconfigured handler.") - return - } - - req.ParseForm() - camliVersion := req.FormValue("camliversion") - if camliVersion == "" { - badRequestError(conn, "No camliversion") - return - } - n := 0 - haveVector := new(vector.Vector) - - haveChan := make(chan *map[string]interface{}) - for { - key := fmt.Sprintf("blob%v", n+1) - value := req.FormValue(key) - if value == "" { - break - } - ref := ParseBlobRef(value) - if ref == nil { - badRequestError(conn, "Bogus blobref for key "+key) - return - } - if !ref.IsSupported() { - badRequestError(conn, "Unsupported or bogus blobref "+key) - } - n++ - - // Parallel stat all the files... - go func() { - fi, err := os.Stat(ref.FileName()) - if err == nil && fi.IsRegular() { - info := make(map[string]interface{}) - info["blobRef"] = ref.String() - info["size"] = fi.Size - haveChan <- &info - } else { - haveChan <- nil - } - }() - } - - if n > 0 { - for have := range haveChan { - if have != nil { - haveVector.Push(have) - } - n-- - if n == 0 { - break - } - } - } - - ret := make(map[string]interface{}) - ret["maxUploadSize"] = 2147483647 // 2GB.. *shrug* - ret["alreadyHave"] = haveVector.Copy() - ret["uploadUrlExpirationSeconds"] = 86400 - - if len(req.Host) > 0 { - scheme := "http" // TODO: https - ret["uploadUrl"] = fmt.Sprintf("%s://%s/camli/upload", - scheme, req.Host) - } else { - ret["uploadUrl"] = "/camli/upload" - } - - - returnJson(conn, ret) -} - -func handleMultiPartUpload(conn *http.Conn, req *http.Request) { - if !(req.Method == "POST" && req.URL.Path == "/camli/upload") { - badRequestError(conn, "Inconfigured handler.") - return - } - - if !putAllowed(req) { - conn.SetHeader("WWW-Authenticate", "Basic realm=\"camlistored\"") - conn.WriteHeader(http.StatusUnauthorized) - fmt.Fprintf(conn, "Authentication required.") - return - } - - multipart, err := req.MultipartReader() - if multipart == nil { - badRequestError(conn, fmt.Sprintf( - "Expected multipart/form-data POST request; %v", err)) - return - } - - for { - part, err := multipart.NextPart() - if err != nil { - fmt.Println("Error reading multipart section:", err) - break - } - if part == nil { - break - } - formName := part.FormName() - fmt.Printf("New value [%s], part=%v\n", formName, part) - - ref := ParseBlobRef(formName) - if ref == nil { - fmt.Printf("Ignoring form key [%s]\n", formName) - continue - } - - ok, err := receiveBlob(ref, part) - if !ok { - fmt.Printf("Error receiving blob %v: %v\n", ref, err) - } else { - fmt.Printf("Received blob %v\n", ref) - } - } - fmt.Println("Done reading multipart body.") -} - -func receiveBlob(blobRef *BlobRef, source io.Reader) (ok bool, err os.Error) { - hashedDirectory := blobRef.DirectoryName() - err = os.MkdirAll(hashedDirectory, 0700) - if err != nil { - return - } - - var tempFile *os.File - tempFile, err = ioutil.TempFile(hashedDirectory, blobRef.FileBaseName()+".tmp") - if err != nil { - return - } - - success := false // set true later - defer func() { - if !success { - fmt.Println("Removing temp file: ", tempFile.Name()) - os.Remove(tempFile.Name()) - } - }() - - sha1 := sha1.New() - var written int64 - written, err = io.Copy(util.NewTee(sha1, tempFile), source) - if err != nil { - return - } - if err = tempFile.Close(); err != nil { - return - } - - fileName := blobRef.FileName() - if err = os.Rename(tempFile.Name(), fileName); err != nil { - return - } - - stat, err := os.Lstat(fileName) - if err != nil { - return - } - if !stat.IsRegular() || stat.Size != written { - return false, os.NewError("Written size didn't match.") - } - - success = true - return true, nil -} - -func handlePut(conn *http.Conn, req *http.Request) { - blobRef := ParsePath(req.URL.Path) - if blobRef == nil { - badRequestError(conn, "Malformed PUT URL.") - return - } - - if !blobRef.IsSupported() { - badRequestError(conn, "unsupported object hash function") - return - } - - if !putAllowed(req) { - conn.SetHeader("WWW-Authenticate", "Basic realm=\"camlistored\"") - conn.WriteHeader(http.StatusUnauthorized) - fmt.Fprintf(conn, "Authentication required.") - return - } - - _, err := receiveBlob(blobRef, req.Body) - if err != nil { - serverError(conn, err) - return - } - - fmt.Fprint(conn, "OK") -} - -func HandleRoot(conn *http.Conn, req *http.Request) { +func handleRoot(conn *http.Conn, req *http.Request) { if *stealthMode { fmt.Fprintf(conn, "Hi.\n") } else { - fmt.Fprintf(conn, ` -This is camlistored, a Camlistore storage daemon. -`) + fmt.Fprintf(conn, "This is camlistored, a Camlistore storage daemon.\n"); } } @@ -461,7 +70,7 @@ func main() { } mux := http.NewServeMux() - mux.HandleFunc("/", HandleRoot) + mux.HandleFunc("/", handleRoot) mux.HandleFunc("/camli/", handleCamli) fmt.Printf("Starting to listen on http://%v/\n", *listen) diff --git a/blobserver/go/enumerate.go b/blobserver/go/enumerate.go new file mode 100644 index 000000000..456a6e360 --- /dev/null +++ b/blobserver/go/enumerate.go @@ -0,0 +1,11 @@ +package main + +import ( + "fmt" + "http" +) + +func handleEnumerateBlobs(conn *http.Conn, req *http.Request) { + fmt.Fprintf(conn, "Unsupported."); +} + diff --git a/blobserver/go/get.go b/blobserver/go/get.go new file mode 100644 index 000000000..444a07ea5 --- /dev/null +++ b/blobserver/go/get.go @@ -0,0 +1,56 @@ +package main + +import ( + "fmt" + "http" + "os" + "io" +) + +func handleGet(conn *http.Conn, req *http.Request) { + blobRef := ParsePath(req.URL.Path) + if blobRef == nil { + badRequestError(conn, "Malformed GET URL.") + return + } + fileName := blobRef.FileName() + stat, err := os.Stat(fileName) + if err == os.ENOENT { + conn.WriteHeader(http.StatusNotFound) + fmt.Fprintf(conn, "Object not found.") + return + } + if err != nil { + serverError(conn, err) + return + } + file, err := os.Open(fileName, os.O_RDONLY, 0) + if err != nil { + serverError(conn, err) + return + } + conn.SetHeader("Content-Type", "application/octet-stream") + bytesCopied, err := io.Copy(conn, file) + + // If there's an error at this point, it's too late to tell the client, + // as they've already been receiving bytes. But they should be smart enough + // to verify the digest doesn't match. But we close the (chunked) response anyway, + // to further signal errors. + if err != nil { + fmt.Fprintf(os.Stderr, "Error sending file: %v, err=%v\n", blobRef, err) + closer, _, err := conn.Hijack() + if err != nil { + closer.Close() + } + return + } + if bytesCopied != stat.Size { + fmt.Fprintf(os.Stderr, "Error sending file: %v, copied= %d, not %d%v\n", blobRef, + bytesCopied, stat.Size) + closer, _, err := conn.Hijack() + if err != nil { + closer.Close() + } + return + } +} diff --git a/blobserver/go/http_util.go b/blobserver/go/http_util.go new file mode 100644 index 000000000..9c953edb6 --- /dev/null +++ b/blobserver/go/http_util.go @@ -0,0 +1,29 @@ +package main + +import ( + "fmt" + "http" + "json" + "os" +) + +func badRequestError(conn *http.Conn, errorMessage string) { + conn.WriteHeader(http.StatusBadRequest) + fmt.Fprintf(conn, "%s\n", errorMessage) +} + +func serverError(conn *http.Conn, err os.Error) { + conn.WriteHeader(http.StatusInternalServerError) + fmt.Fprintf(conn, "Server error: %s\n", err) +} + +func returnJson(conn *http.Conn, data interface{}) { + bytes, err := json.MarshalIndent(data, "", " ") + if err != nil { + badRequestError(conn, fmt.Sprintf( + "JSON serialization error: %v", err)) + return + } + conn.Write(bytes) + conn.Write([]byte("\n")) +} diff --git a/blobserver/go/preupload.go b/blobserver/go/preupload.go new file mode 100644 index 000000000..dde40209d --- /dev/null +++ b/blobserver/go/preupload.go @@ -0,0 +1,83 @@ +package main + +import ( + "http" + "container/vector" + "fmt" + "os" +) + +func handlePreUpload(conn *http.Conn, req *http.Request) { + if !(req.Method == "POST" && req.URL.Path == "/camli/preupload") { + badRequestError(conn, "Inconfigured handler.") + return + } + + req.ParseForm() + camliVersion := req.FormValue("camliversion") + if camliVersion == "" { + badRequestError(conn, "No camliversion") + return + } + n := 0 + haveVector := new(vector.Vector) + + haveChan := make(chan *map[string]interface{}) + for { + key := fmt.Sprintf("blob%v", n+1) + value := req.FormValue(key) + if value == "" { + break + } + ref := ParseBlobRef(value) + if ref == nil { + badRequestError(conn, "Bogus blobref for key "+key) + return + } + if !ref.IsSupported() { + badRequestError(conn, "Unsupported or bogus blobref "+key) + } + n++ + + // Parallel stat all the files... + go func() { + fi, err := os.Stat(ref.FileName()) + if err == nil && fi.IsRegular() { + info := make(map[string]interface{}) + info["blobRef"] = ref.String() + info["size"] = fi.Size + haveChan <- &info + } else { + haveChan <- nil + } + }() + } + + if n > 0 { + for have := range haveChan { + if have != nil { + haveVector.Push(have) + } + n-- + if n == 0 { + break + } + } + } + + ret := make(map[string]interface{}) + ret["maxUploadSize"] = 2147483647 // 2GB.. *shrug* + ret["alreadyHave"] = haveVector.Copy() + ret["uploadUrlExpirationSeconds"] = 86400 + + if len(req.Host) > 0 { + scheme := "http" // TODO: https + ret["uploadUrl"] = fmt.Sprintf("%s://%s/camli/upload", + scheme, req.Host) + } else { + ret["uploadUrl"] = "/camli/upload" + } + + returnJson(conn, ret) +} + diff --git a/blobserver/go/temp_testing.go b/blobserver/go/temp_testing.go new file mode 100644 index 000000000..c83ab9995 --- /dev/null +++ b/blobserver/go/temp_testing.go @@ -0,0 +1,59 @@ +package main + +import ( + "crypto/sha1" + "fmt" + "http" + "io" +) + +func handleCamliForm(conn *http.Conn, req *http.Request) { + fmt.Fprintf(conn, ` + + +
+ +Text unix:
+Text win:
+Text mac:
+Image png:
+ +
+ + +`) +} + + +func handleTestForm(conn *http.Conn, req *http.Request) { + if !(req.Method == "POST" && req.URL.Path == "/camli/testform") { + badRequestError(conn, "Inconfigured handler.") + return + } + + multipart, err := req.MultipartReader() + if multipart == nil { + badRequestError(conn, fmt.Sprintf("Expected multipart/form-data POST request; %v", err)) + return + } + + for { + part, err := multipart.NextPart() + if err != nil { + fmt.Println("Error reading:", err) + break + } + if part == nil { + break + } + formName := part.FormName() + fmt.Printf("New value [%s], part=%v\n", formName, part) + + sha1 := sha1.New() + io.Copy(sha1, part) + fmt.Printf("Got part digest: %x\n", sha1.Sum()) + + } + fmt.Println("Done reading multipart body.") + +} diff --git a/blobserver/go/upload.go b/blobserver/go/upload.go new file mode 100644 index 000000000..22042f210 --- /dev/null +++ b/blobserver/go/upload.go @@ -0,0 +1,125 @@ +package main + +import ( + "http" + "fmt" + "io" + "io/ioutil" + "os" + ) + +// For `make`: +//import "./util/_obj/util" +// For `gofr`: +import "util/util" + +func handleMultiPartUpload(conn *http.Conn, req *http.Request) { + if !(req.Method == "POST" && req.URL.Path == "/camli/upload") { + badRequestError(conn, "Inconfigured handler.") + return + } + + multipart, err := req.MultipartReader() + if multipart == nil { + badRequestError(conn, fmt.Sprintf( + "Expected multipart/form-data POST request; %v", err)) + return + } + + for { + part, err := multipart.NextPart() + if err != nil { + fmt.Println("Error reading multipart section:", err) + break + } + if part == nil { + break + } + formName := part.FormName() + fmt.Printf("New value [%s], part=%v\n", formName, part) + + ref := ParseBlobRef(formName) + if ref == nil { + fmt.Printf("Ignoring form key [%s]\n", formName) + continue + } + + ok, err := receiveBlob(ref, part) + if !ok { + fmt.Printf("Error receiving blob %v: %v\n", ref, err) + } else { + fmt.Printf("Received blob %v\n", ref) + } + } + fmt.Println("Done reading multipart body.") +} + +func receiveBlob(blobRef *BlobRef, source io.Reader) (ok bool, err os.Error) { + hashedDirectory := blobRef.DirectoryName() + err = os.MkdirAll(hashedDirectory, 0700) + if err != nil { + return + } + + var tempFile *os.File + tempFile, err = ioutil.TempFile(hashedDirectory, blobRef.FileBaseName()+".tmp") + if err != nil { + return + } + + success := false // set true later + defer func() { + if !success { + fmt.Println("Removing temp file: ", tempFile.Name()) + os.Remove(tempFile.Name()) + } + }() + + hash := blobRef.Hash() + var written int64 + written, err = io.Copy(util.NewTee(hash, tempFile), source) + if err != nil { + return + } + if err = tempFile.Close(); err != nil { + return + } + + fileName := blobRef.FileName() + if err = os.Rename(tempFile.Name(), fileName); err != nil { + return + } + + stat, err := os.Lstat(fileName) + if err != nil { + return + } + if !stat.IsRegular() || stat.Size != written { + return false, os.NewError("Written size didn't match.") + } + + success = true + return true, nil +} + +func handlePut(conn *http.Conn, req *http.Request) { + blobRef := ParsePath(req.URL.Path) + if blobRef == nil { + badRequestError(conn, "Malformed PUT URL.") + return + } + + if !blobRef.IsSupported() { + badRequestError(conn, "unsupported object hash function") + return + } + + _, err := receiveBlob(blobRef, req.Body) + if err != nil { + serverError(conn, err) + return + } + + fmt.Fprint(conn, "OK") +} +