perkeep/blobserver/go/camlistored.go

437 lines
9.9 KiB
Go
Raw Normal View History

2010-06-12 21:45:58 +00:00
// Copyright 2010 Brad Fitzpatrick <brad@danga.com>
//
// See LICENSE.
package main
import (
2010-07-11 04:58:30 +00:00
"container/vector"
"crypto/sha1"
"encoding/base64"
"flag"
"fmt"
"http"
"io"
"io/ioutil"
2010-07-11 04:58:30 +00:00
"json"
"os"
"regexp"
)
// For `make`:
//import "./util/_obj/util"
// For `gofr`:
import "util/util"
// import "mime/multipart"
2010-07-02 23:50:07 +00:00
// import multipart "github.com/bradfitz/golang-mime-multipart"
2010-06-12 21:45:58 +00:00
var listen *string = flag.String("listen", "0.0.0.0:3179", "host:port to listen on")
var storageRoot *string = flag.String("root", "/tmp/camliroot", "Root directory to store files")
2010-07-18 16:56:31 +00:00
var stealthMode *bool = flag.Bool("stealth", true, "Run in stealth mode.")
2010-06-12 21:45:58 +00:00
2010-06-21 01:26:54 +00:00
var putPassword string
2010-06-12 21:45:58 +00:00
2010-06-14 04:51:18 +00:00
var kBasicAuthPattern *regexp.Regexp = regexp.MustCompile(`^Basic ([a-zA-Z0-9\+/=]+)`)
2010-06-12 21:45:58 +00:00
func badRequestError(conn *http.Conn, errorMessage string) {
conn.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(conn, "%s\n", errorMessage)
2010-06-12 21:45:58 +00:00
}
func serverError(conn *http.Conn, err os.Error) {
conn.WriteHeader(http.StatusInternalServerError)
fmt.Fprintf(conn, "Server error: %s\n", err)
}
2010-06-14 04:51:18 +00:00
func putAllowed(req *http.Request) bool {
auth, present := req.Header["Authorization"]
if !present {
return false
}
matches := kBasicAuthPattern.MatchStrings(auth)
if len(matches) != 2 {
return false
}
var outBuf []byte = make([]byte, base64.StdEncoding.DecodedLen(len(matches[1])))
bytes, err := base64.StdEncoding.Decode(outBuf, []uint8(matches[1]))
2010-06-21 01:26:54 +00:00
if err != nil {
return false
}
password := string(outBuf)
2010-06-14 04:51:18 +00:00
fmt.Println("Decoded bytes:", bytes, " error: ", err)
2010-06-21 01:26:54 +00:00
fmt.Println("Got userPass:", password)
return password != "" && password == putPassword
2010-06-21 01:26:54 +00:00
}
func getAllowed(req *http.Request) bool {
// For now...
return putAllowed(req)
2010-06-14 04:51:18 +00:00
}
2010-07-02 23:50:07 +00:00
func handleCamliForm(conn *http.Conn, req *http.Request) {
fmt.Fprintf(conn, `
<html>
<body>
<form method='POST' enctype="multipart/form-data" action="/camli/testform">
2010-07-02 23:50:07 +00:00
<input type="hidden" name="имя" value="брэд" />
Text unix: <input type="file" name="file-unix"><br>
Text win: <input type="file" name="file-win"><br>
Text mac: <input type="file" name="file-mac"><br>
Image png: <input type="file" name="image-png"><br>
<input type=submit>
</form>
</body>
</html>
`)
}
2010-07-11 04:58:30 +00:00
func returnJson(conn *http.Conn, data interface{}) {
bytes, err := json.MarshalIndent(data, "", " ")
if err != nil {
badRequestError(conn, fmt.Sprintf(
"JSON serialization error: %v", err))
return
}
conn.Write(bytes)
conn.Write([]byte("\n"))
}
2010-06-12 21:45:58 +00:00
func handleCamli(conn *http.Conn, req *http.Request) {
switch req.Method {
case "GET":
2010-06-13 00:15:49 +00:00
handleGet(conn, req)
case "POST":
switch req.URL.Path {
case "/camli/preupload":
handlePreUpload(conn, req)
case "/camli/upload":
handleMultiPartUpload(conn, req)
case "/camli/testform": // debug only
handleTestForm(conn, req)
case "/camli/form": // debug only
handleCamliForm(conn, req)
default:
badRequestError(conn, "Unsupported POST path.")
}
case "PUT": // no longer part of spec
handlePut(conn, req)
default:
badRequestError(conn, "Unsupported method.")
2010-06-13 00:15:49 +00:00
}
2010-06-12 21:45:58 +00:00
}
2010-06-13 00:15:49 +00:00
func handleGet(conn *http.Conn, req *http.Request) {
2010-06-21 01:26:54 +00:00
if !getAllowed(req) {
conn.SetHeader("WWW-Authenticate", "Basic realm=\"camlistored\"")
conn.WriteHeader(http.StatusUnauthorized)
fmt.Fprintf(conn, "Authentication required.")
return
}
blobRef := ParsePath(req.URL.Path)
if blobRef == nil {
2010-06-13 00:15:49 +00:00
badRequestError(conn, "Malformed GET URL.")
return
2010-06-13 00:15:49 +00:00
}
fileName := blobRef.FileName()
2010-06-13 00:15:49 +00:00
stat, err := os.Stat(fileName)
if err == os.ENOENT {
conn.WriteHeader(http.StatusNotFound)
fmt.Fprintf(conn, "Object not found.")
return
}
if err != nil {
serverError(conn, err)
return
2010-06-13 00:15:49 +00:00
}
file, err := os.Open(fileName, os.O_RDONLY, 0)
if err != nil {
serverError(conn, err)
return
2010-06-13 00:15:49 +00:00
}
conn.SetHeader("Content-Type", "application/octet-stream")
bytesCopied, err := io.Copy(conn, file)
// If there's an error at this point, it's too late to tell the client,
// as they've already been receiving bytes. But they should be smart enough
// to verify the digest doesn't match. But we close the (chunked) response anyway,
// to further signal errors.
if err != nil {
fmt.Fprintf(os.Stderr, "Error sending file: %v, err=%v\n", blobRef, err)
2010-06-13 00:15:49 +00:00
closer, _, err := conn.Hijack()
if err != nil {
closer.Close()
}
return
}
2010-06-13 00:15:49 +00:00
if bytesCopied != stat.Size {
fmt.Fprintf(os.Stderr, "Error sending file: %v, copied= %d, not %d%v\n", blobRef,
2010-06-13 00:15:49 +00:00
bytesCopied, stat.Size)
closer, _, err := conn.Hijack()
if err != nil {
closer.Close()
}
return
2010-06-13 00:15:49 +00:00
}
}
func handleTestForm(conn *http.Conn, req *http.Request) {
if !(req.Method == "POST" && req.URL.Path == "/camli/testform") {
2010-06-21 06:05:50 +00:00
badRequestError(conn, "Inconfigured handler.")
return
}
multipart, err := req.MultipartReader()
if multipart == nil {
badRequestError(conn, fmt.Sprintf("Expected multipart/form-data POST request; %v", err))
return
2010-06-21 06:05:50 +00:00
}
for {
part, err := multipart.NextPart()
if err != nil {
fmt.Println("Error reading:", err)
break
}
if part == nil {
break
}
formName := part.FormName()
fmt.Printf("New value [%s], part=%v\n", formName, part)
2010-07-02 23:50:07 +00:00
sha1 := sha1.New()
io.Copy(sha1, part)
fmt.Printf("Got part digest: %x\n", sha1.Sum())
}
fmt.Println("Done reading multipart body.")
}
func handlePreUpload(conn *http.Conn, req *http.Request) {
if !(req.Method == "POST" && req.URL.Path == "/camli/preupload") {
badRequestError(conn, "Inconfigured handler.")
return
}
req.ParseForm()
camliVersion := req.FormValue("camliversion")
if camliVersion == "" {
badRequestError(conn, "No camliversion")
return
}
n := 0
2010-07-11 04:58:30 +00:00
haveVector := new(vector.Vector)
haveChan := make(chan *map[string]interface{})
for {
2010-07-11 04:58:30 +00:00
key := fmt.Sprintf("blob%v", n+1)
value := req.FormValue(key)
if value == "" {
break
}
ref := ParseBlobRef(value)
if ref == nil {
2010-07-11 04:58:30 +00:00
badRequestError(conn, "Bogus blobref for key "+key)
return
}
if !ref.IsSupported() {
2010-07-11 04:58:30 +00:00
badRequestError(conn, "Unsupported or bogus blobref "+key)
}
n++
// Parallel stat all the files...
go func() {
fi, err := os.Stat(ref.FileName())
if err == nil && fi.IsRegular() {
info := make(map[string]interface{})
info["blobRef"] = ref.String()
info["size"] = fi.Size
haveChan <- &info
} else {
haveChan <- nil
}
}()
}
if n > 0 {
for have := range haveChan {
if have != nil {
haveVector.Push(have)
}
n--
if n == 0 {
break
}
}
}
2010-07-11 04:58:30 +00:00
ret := make(map[string]interface{})
ret["maxUploadSize"] = 2147483647 // 2GB.. *shrug*
ret["alreadyHave"] = haveVector.Copy()
2010-07-11 04:58:30 +00:00
ret["uploadUrl"] = "http://localhost:3179/camli/upload"
ret["uploadUrlExpirationSeconds"] = 86400
returnJson(conn, ret)
}
func handleMultiPartUpload(conn *http.Conn, req *http.Request) {
if !(req.Method == "POST" && req.URL.Path == "/camli/upload") {
badRequestError(conn, "Inconfigured handler.")
return
}
multipart, err := req.MultipartReader()
if multipart == nil {
badRequestError(conn, fmt.Sprintf(
"Expected multipart/form-data POST request; %v", err))
return
}
for {
part, err := multipart.NextPart()
if err != nil {
fmt.Println("Error reading multipart section:", err)
break
}
if part == nil {
break
}
formName := part.FormName()
fmt.Printf("New value [%s], part=%v\n", formName, part)
ref := ParseBlobRef(formName)
if ref == nil {
fmt.Printf("Ignoring form key [%s]\n", formName)
continue
}
ok, err := receiveBlob(ref, part)
if !ok {
fmt.Printf("Error receiving blob %v: %v\n", ref, err)
} else {
fmt.Printf("Received blob %v\n", ref)
}
}
fmt.Println("Done reading multipart body.")
2010-06-21 06:05:50 +00:00
}
func receiveBlob(blobRef *BlobRef, source io.Reader) (ok bool, err os.Error) {
hashedDirectory := blobRef.DirectoryName()
err = os.MkdirAll(hashedDirectory, 0700)
2010-06-12 21:45:58 +00:00
if err != nil {
return
}
var tempFile *os.File
tempFile, err = ioutil.TempFile(hashedDirectory, blobRef.FileBaseName()+".tmp")
2010-06-12 21:45:58 +00:00
if err != nil {
return
}
2010-06-12 21:45:58 +00:00
success := false // set true later
2010-06-12 21:45:58 +00:00
defer func() {
if !success {
fmt.Println("Removing temp file: ", tempFile.Name())
os.Remove(tempFile.Name())
}
}()
2010-06-12 21:45:58 +00:00
sha1 := sha1.New()
var written int64
written, err = io.Copy(util.NewTee(sha1, tempFile), source)
2010-06-12 21:45:58 +00:00
if err != nil {
return
2010-06-12 21:45:58 +00:00
}
if err = tempFile.Close(); err != nil {
return
2010-06-12 21:45:58 +00:00
}
fileName := blobRef.FileName()
2010-06-12 21:45:58 +00:00
if err = os.Rename(tempFile.Name(), fileName); err != nil {
return
2010-06-12 21:45:58 +00:00
}
stat, err := os.Lstat(fileName)
if err != nil {
return
2010-06-12 21:45:58 +00:00
}
if !stat.IsRegular() || stat.Size != written {
return false, os.NewError("Written size didn't match.")
2010-06-12 21:45:58 +00:00
}
success = true
return true, nil
}
func handlePut(conn *http.Conn, req *http.Request) {
blobRef := ParsePath(req.URL.Path)
if blobRef == nil {
badRequestError(conn, "Malformed PUT URL.")
return
}
if !blobRef.IsSupported() {
badRequestError(conn, "unsupported object hash function")
return
}
if !putAllowed(req) {
conn.SetHeader("WWW-Authenticate", "Basic realm=\"camlistored\"")
conn.WriteHeader(http.StatusUnauthorized)
fmt.Fprintf(conn, "Authentication required.")
return
}
// TODO(bradfitz): authn/authz checks here.
_, err := receiveBlob(blobRef, req.Body)
if err != nil {
serverError(conn, err)
return
}
2010-06-12 21:45:58 +00:00
fmt.Fprint(conn, "OK")
}
func HandleRoot(conn *http.Conn, req *http.Request) {
2010-07-18 16:56:31 +00:00
if *stealthMode {
fmt.Fprintf(conn, "Hi.\n")
} else {
fmt.Fprintf(conn, `
2010-06-12 21:45:58 +00:00
This is camlistored, a Camlistore storage daemon.
`)
2010-07-18 16:56:31 +00:00
}
2010-06-12 21:45:58 +00:00
}
func main() {
flag.Parse()
2010-06-21 01:26:54 +00:00
putPassword = os.Getenv("CAMLI_PASSWORD")
if len(putPassword) == 0 {
2010-06-12 21:45:58 +00:00
fmt.Fprintf(os.Stderr,
"No CAMLI_PASSWORD environment variable set.\n")
os.Exit(1)
}
{
fi, err := os.Stat(*storageRoot)
if err != nil || !fi.IsDirectory() {
fmt.Fprintf(os.Stderr,
"Storage root '%s' doesn't exist or is not a directory.\n",
*storageRoot)
os.Exit(1)
}
}
mux := http.NewServeMux()
mux.HandleFunc("/", HandleRoot)
mux.HandleFunc("/camli/", handleCamli)
fmt.Printf("Starting to listen on http://%v/\n", *listen)
err := http.ListenAndServe(*listen, mux)
if err != nil {
fmt.Fprintf(os.Stderr,
"Error in http server: %v\n", err)
os.Exit(1)
}
}