diff --git a/clients/go/cammount/cacher.go b/clients/go/cammount/cacher.go index 99a74a64a..434481dd5 100644 --- a/clients/go/cammount/cacher.go +++ b/clients/go/cammount/cacher.go @@ -47,7 +47,7 @@ func (cf *CachingFetcher) Fetch(br *blobref.BlobRef) (file blobref.ReadSeekClose return nil, 0, err } - _, err = cf.c.ReceiveBlob(br, sblob, nil) + _, err = cf.c.ReceiveBlob(br, sblob) if err != nil { return nil, 0, err } diff --git a/lib/go/camli/blobserver/blobhub.go b/lib/go/camli/blobserver/blobhub.go index abb75c6a3..bd7d418d1 100644 --- a/lib/go/camli/blobserver/blobhub.go +++ b/lib/go/camli/blobserver/blobhub.go @@ -121,28 +121,18 @@ func (h *SimpleBlobHub) UnregisterBlobListener(blob *blobref.BlobRef, ch chan *b type SimpleBlobHubPartitionMap struct { hubLock sync.Mutex - hubMap map[string]BlobHub + hub BlobHub } -func (spm *SimpleBlobHubPartitionMap) GetBlobHub(partition Partition) BlobHub { - name := "" - if partition != nil { - name = partition.Name() - } +func (spm *SimpleBlobHubPartitionMap) GetBlobHub() BlobHub { spm.hubLock.Lock() defer spm.hubLock.Unlock() - if spm.hubMap == nil { - spm.hubMap = make(map[string]BlobHub) + if spm.hub == nil { + // TODO: in the future, allow for different blob hub + // implementations rather than the + // everything-in-memory-on-a-single-machine SimpleBlobHub. + spm.hub = new(SimpleBlobHub) } - if hub, ok := spm.hubMap[name]; ok { - return hub - } - - // TODO: in the future, allow for different blob hub - // implementations rather than the - // everything-in-memory-on-a-single-machine SimpleBlobHub. - hub := new(SimpleBlobHub) - spm.hubMap[name] = hub - return hub + return spm.hub } diff --git a/lib/go/camli/blobserver/handlers/enumerate.go b/lib/go/camli/blobserver/handlers/enumerate.go index 211ed3a2a..100af909a 100644 --- a/lib/go/camli/blobserver/handlers/enumerate.go +++ b/lib/go/camli/blobserver/handlers/enumerate.go @@ -36,15 +36,15 @@ type blobInfo struct { } -func CreateEnumerateHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) { +func CreateEnumerateHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { - handleEnumerateBlobs(conn, req, storage, partition) + handleEnumerateBlobs(conn, req, storage) } } const errMsgMaxWaitSecWithAfter = "Can't use 'maxwaitsec' with 'after'.\n" -func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobEnumerator, partition blobserver.Partition) { +func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobEnumerator) { // Potential input parameters formValueLimit := req.FormValue("limit") formValueMaxWaitSec := req.FormValue("maxwaitsec") @@ -92,7 +92,7 @@ func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage b blobch := make(chan *blobref.SizedBlobRef, 100) resultch := make(chan os.Error, 1) go func() { - resultch <- storage.EnumerateBlobs(blobch, partition, formValueAfter, limit+1, waitSeconds) + resultch <- storage.EnumerateBlobs(blobch, formValueAfter, limit+1, waitSeconds) }() after := "" diff --git a/lib/go/camli/blobserver/handlers/enumerate_test.go b/lib/go/camli/blobserver/handlers/enumerate_test.go index 3a5388266..ab2f73925 100644 --- a/lib/go/camli/blobserver/handlers/enumerate_test.go +++ b/lib/go/camli/blobserver/handlers/enumerate_test.go @@ -18,7 +18,6 @@ package handlers import ( "camli/blobref" - "camli/blobserver" . "camli/test/asserts" "http" "http/httptest" @@ -43,7 +42,6 @@ type emptyEnumerator struct { } func (ee *emptyEnumerator) EnumerateBlobs(dest chan *blobref.SizedBlobRef, - partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error { @@ -75,7 +73,7 @@ func TestEnumerateInput(t *testing.T) { wr := httptest.NewRecorder() wr.Code = 200 // default req := makeGetRequest(test.url) - handleEnumerateBlobs(wr, req, enumerator, nil) // TODO: use better partition + handleEnumerateBlobs(wr, req, enumerator) ExpectInt(t, test.expectedCode, wr.Code, "response code for " + test.name) ExpectString(t, test.expectedBody, wr.Body.String(), "output for " + test.name) } diff --git a/lib/go/camli/blobserver/handlers/remove.go b/lib/go/camli/blobserver/handlers/remove.go index 36c1123c1..f515d2bb3 100644 --- a/lib/go/camli/blobserver/handlers/remove.go +++ b/lib/go/camli/blobserver/handlers/remove.go @@ -27,20 +27,25 @@ import ( const maxRemovesPerRequest = 1000 -func CreateRemoveHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) { +func CreateRemoveHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { - handleRemove(conn, req, storage, partition) + handleRemove(conn, req, storage) } } -func handleRemove(conn http.ResponseWriter, req *http.Request, storage blobserver.Storage, partition blobserver.Partition) { +func handleRemove(conn http.ResponseWriter, req *http.Request, storage blobserver.Storage) { if req.Method != "POST" { log.Fatalf("Invalid method; handlers misconfigured") } - - if !partition.IsQueue() { + configer, ok := storage.(blobserver.Configer) + if !ok { conn.WriteHeader(http.StatusForbidden) - fmt.Fprintf(conn, "Can only remove blobs from a queue partition.\n") + fmt.Fprintf(conn, "Remove handler's blobserver.Storage isn't a blobserver.Configuer; can't remove") + return + } + if !configer.Config().IsQueue { + conn.WriteHeader(http.StatusForbidden) + fmt.Fprintf(conn, "Can only remove blobs from a queue.\n") return } @@ -68,7 +73,7 @@ func handleRemove(conn http.ResponseWriter, req *http.Request, storage blobserve toRemoveStr = append(toRemoveStr, ref.String()) } - err := storage.Remove(partition, toRemove) + err := storage.Remove(toRemove) if err != nil { conn.WriteHeader(http.StatusInternalServerError) log.Printf("Server error during remove: %v", err) diff --git a/lib/go/camli/blobserver/handlers/stat.go b/lib/go/camli/blobserver/handlers/stat.go index beb783cfb..46bdf251a 100644 --- a/lib/go/camli/blobserver/handlers/stat.go +++ b/lib/go/camli/blobserver/handlers/stat.go @@ -27,15 +27,15 @@ import ( "strconv" ) -func CreateStatHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) { +func CreateStatHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { - handleStat(conn, req, storage, partition) + handleStat(conn, req, storage) } } const maxStatBlobs = 1000 -func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobStatter, partition blobserver.Partition) { +func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobStatter) { toStat := make([]*blobref.BlobRef, 0) switch req.Method { case "POST": @@ -91,7 +91,7 @@ func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver. blobch := make(chan *blobref.SizedBlobRef) resultch := make(chan os.Error, 1) go func() { - err := storage.Stat(blobch, partition, toStat, waitSeconds) + err := storage.Stat(blobch, toStat, waitSeconds) close(blobch) resultch <- err }() @@ -111,7 +111,8 @@ func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver. } } - ret := commonUploadResponse(partition, req) + configer, _ := storage.(blobserver.Configer) + ret := commonUploadResponse(configer, req) ret["stat"] = statRes ret["canLongPoll"] = true httputil.ReturnJson(conn, ret) diff --git a/lib/go/camli/blobserver/handlers/upload.go b/lib/go/camli/blobserver/handlers/upload.go index e99dbfc5b..d6011b972 100644 --- a/lib/go/camli/blobserver/handlers/upload.go +++ b/lib/go/camli/blobserver/handlers/upload.go @@ -30,13 +30,13 @@ import ( "strings" ) -func CreateUploadHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) { +func CreateUploadHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { - handleMultiPartUpload(conn, req, storage, partition) + handleMultiPartUpload(conn, req, storage) } } -func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver, partition blobserver.Partition) { +func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver) { if !(req.Method == "POST" && strings.Contains(req.URL.Path, "/camli/upload")) { httputil.BadRequestError(conn, "Inconfigured handler.") return @@ -96,7 +96,7 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece continue } - blobGot, err := blobReceiver.ReceiveBlob(ref, mimePart, partition.GetMirrorPartitions()) + blobGot, err := blobReceiver.ReceiveBlob(ref, mimePart) if err != nil { addError(fmt.Sprintf("Error receiving blob %v: %v\n", ref, err)) break @@ -106,7 +106,8 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece } log.Println("Done reading multipart body.") - ret := commonUploadResponse(partition, req) + configer, _ := blobReceiver.(blobserver.Configer) // TODO: ugly? + ret := commonUploadResponse(configer, req) received := make([]map[string]interface{}, 0) for _, got := range receivedBlobs { @@ -125,15 +126,17 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece httputil.ReturnJson(conn, ret) } -func commonUploadResponse(partition blobserver.Partition, req *http.Request) map[string]interface{} { +func commonUploadResponse(configer blobserver.Configer, req *http.Request) map[string]interface{} { ret := make(map[string]interface{}) - ret["maxUploadSize"] = 2147483647 // 2GB.. *shrug* + ret["maxUploadSize"] = 2147483647 // 2GB.. *shrug*. TODO: cut this down, standardize ret["uploadUrlExpirationSeconds"] = 86400 // TODO: camli/upload isn't part of the spec. we should pick // something different here just to make it obvious that this - // isn't a well-known URL and facilitate lazy clients. - ret["uploadUrl"] = partition.URLBase() + "/camli/upload" + // isn't a well-known URL and accidentally encourage lazy clients. + if configer != nil { + ret["uploadUrl"] = configer.Config().URLBase + "/camli/upload" + } return ret } @@ -141,13 +144,13 @@ func commonUploadResponse(partition blobserver.Partition, req *http.Request) map var kPutPattern *regexp.Regexp = regexp.MustCompile(`^/camli/([a-z0-9]+)-([a-f0-9]+)$`) // NOTE: not part of the spec at present. old. might be re-introduced. -func CreateNonStandardPutHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) { +func CreateNonStandardPutHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { - handlePut(conn, req, storage, partition) + handlePut(conn, req, storage) } } -func handlePut(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver, partition blobserver.Partition) { +func handlePut(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver) { blobRef := blobref.FromPattern(kPutPattern, req.URL.Path) if blobRef == nil { httputil.BadRequestError(conn, "Malformed PUT URL.") @@ -159,7 +162,7 @@ func handlePut(conn http.ResponseWriter, req *http.Request, blobReceiver blobser return } - _, err := blobReceiver.ReceiveBlob(blobRef, req.Body, partition.GetMirrorPartitions()) + _, err := blobReceiver.ReceiveBlob(blobRef, req.Body) if err != nil { httputil.ServerError(conn, err) return diff --git a/lib/go/camli/blobserver/interface.go b/lib/go/camli/blobserver/interface.go index 5e6145c61..4ee03fe9e 100644 --- a/lib/go/camli/blobserver/interface.go +++ b/lib/go/camli/blobserver/interface.go @@ -45,20 +45,16 @@ type Partition interface { type BlobReceiver interface { // ReceiveBlob accepts a newly uploaded blob and writes it to // disk. - // - // mirrorPartitions may not be supported by all instances - // and may return an error if used. - ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []Partition) (*blobref.SizedBlobRef, os.Error) + ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error) } type BlobStatter interface { // Stat checks for the existence of blobs, writing their sizes // (if found back to the dest channel), and returning an error // or nil. Stat() should NOT close the channel. - // waitSeconds is the max time to wait for the blobs to exist - // in the given partition, or 0 for no delay. + // waitSeconds is the max time to wait for the blobs to exist, + // or 0 for no delay. Stat(dest chan *blobref.SizedBlobRef, - partition Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error } @@ -74,12 +70,11 @@ type BlobEnumerator interface { // sorted, as long as they are lexigraphically greater than // after (if provided). // limit will be supplied and sanity checked by caller. - // waitSeconds is the max time to wait for any blobs to exist - // in the given partition, or 0 for no delay. + // waitSeconds is the max time to wait for any blobs to exist, + // or 0 for no delay. // EnumerateBlobs must close the channel. (even if limit // was hit and more blobs remain) EnumerateBlobs(dest chan *blobref.SizedBlobRef, - partition Partition, after string, limit uint, waitSeconds int) os.Error @@ -92,20 +87,29 @@ type Cache interface { BlobStatter } +type Config struct { + Writable, Readable bool + IsQueue bool // supports deletes + + // the "http://host:port" and optional path (but without trailing slash) to have "/camli/*" appended + URLBase string +} + +type Configer interface { + Config() *Config +} + type Storage interface { blobref.StreamingFetcher BlobReceiver BlobStatter BlobEnumerator - // Remove 0 or more blobs from provided partition, which - // should be empty for the default partition. Removal of - // non-existent items isn't an error. Returns failure if any - // items existed but failed to be deleted. - Remove(partition Partition, blobs []*blobref.BlobRef) os.Error + // Remove 0 or more blobs. Removal of non-existent items + // isn't an error. Returns failure if any items existed but + // failed to be deleted. + Remove(blobs []*blobref.BlobRef) os.Error - // Returns the blob notification bus for a given partition. - // Use nil for the default partition. - // TODO: move this to be a method on the Partition interface? - GetBlobHub(partition Partition) BlobHub + // Returns the blob notification bus + GetBlobHub() BlobHub } diff --git a/lib/go/camli/blobserver/localdisk/enumerate.go b/lib/go/camli/blobserver/localdisk/enumerate.go index e1b117dc1..6e789fe95 100644 --- a/lib/go/camli/blobserver/localdisk/enumerate.go +++ b/lib/go/camli/blobserver/localdisk/enumerate.go @@ -17,13 +17,13 @@ limitations under the License. package localdisk import ( - "camli/blobref" - "camli/blobserver" "fmt" "os" "sort" "strings" "time" + + "camli/blobref" ) type readBlobRequest struct { @@ -110,8 +110,8 @@ func readBlobs(opts readBlobRequest) os.Error { return nil } -func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error { - dirRoot := ds.PartitionRoot(partition) +func (ds *DiskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error { + dirRoot := ds.PartitionRoot(ds.partition) limitMutable := limit var err os.Error doScan := func() { @@ -132,7 +132,7 @@ func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition // The case where we have to wait for waitSeconds for any blob // to possibly appear. - hub := ds.GetBlobHub(partition) + hub := ds.GetBlobHub() ch := make(chan *blobref.BlobRef, 1) hub.RegisterListener(ch) defer hub.UnregisterListener(ch) diff --git a/lib/go/camli/blobserver/localdisk/enumerate_test.go b/lib/go/camli/blobserver/localdisk/enumerate_test.go index 37769970e..775ebf871 100644 --- a/lib/go/camli/blobserver/localdisk/enumerate_test.go +++ b/lib/go/camli/blobserver/localdisk/enumerate_test.go @@ -18,7 +18,6 @@ package localdisk import ( "camli/blobref" - "camli/blobserver" . "camli/test/asserts" "fmt" @@ -29,8 +28,6 @@ import ( "time" ) -var defaultPartition blobserver.Partition = nil - func TestEnumerate(t *testing.T) { ds := NewStorage(t) defer cleanUp(ds) @@ -49,7 +46,7 @@ func TestEnumerate(t *testing.T) { ch := make(chan *blobref.SizedBlobRef) errCh := make(chan os.Error) go func() { - errCh <- ds.EnumerateBlobs(ch, defaultPartition, "", limit, waitSeconds) + errCh <- ds.EnumerateBlobs(ch, "", limit, waitSeconds) }() var sb *blobref.SizedBlobRef @@ -69,7 +66,7 @@ func TestEnumerate(t *testing.T) { // Now again, but skipping foo's blob ch = make(chan *blobref.SizedBlobRef) go func() { - errCh <- ds.EnumerateBlobs(ch, defaultPartition, + errCh <- ds.EnumerateBlobs(ch, foo.BlobRef().String(), limit, waitSeconds) }() @@ -93,7 +90,7 @@ func TestEnumerateEmpty(t *testing.T) { ch := make(chan *blobref.SizedBlobRef) errCh := make(chan os.Error) go func() { - errCh <- ds.EnumerateBlobs(ch, defaultPartition, + errCh <- ds.EnumerateBlobs(ch, "", limit, waitSeconds) }() @@ -110,7 +107,7 @@ func TestEnumerateEmptyLongPoll(t *testing.T) { ch := make(chan *blobref.SizedBlobRef) errCh := make(chan os.Error) go func() { - errCh <- ds.EnumerateBlobs(ch, defaultPartition, + errCh <- ds.EnumerateBlobs(ch, "", limit, waitSeconds) }() @@ -177,7 +174,7 @@ func TestEnumerateIsSorted(t *testing.T) { ch := make(chan *blobref.SizedBlobRef) errCh := make(chan os.Error) go func() { - errCh <- ds.EnumerateBlobs(ch, defaultPartition, test.after, limit, 0) + errCh <- ds.EnumerateBlobs(ch, test.after, limit, 0) }() var got = make([]*blobref.SizedBlobRef, 0, blobsToMake) for { diff --git a/lib/go/camli/blobserver/localdisk/localdisk.go b/lib/go/camli/blobserver/localdisk/localdisk.go index 4ac7202c5..da263c24e 100644 --- a/lib/go/camli/blobserver/localdisk/localdisk.go +++ b/lib/go/camli/blobserver/localdisk/localdisk.go @@ -27,15 +27,15 @@ import ( "camli/jsonconfig" ) -type diskStorage struct { +type DiskStorage struct { *blobserver.SimpleBlobHubPartitionMap root string -} -// TODO: lazy hack because I didn't want to rename diskStorage everywhere -// during an experiment. should just rename it now. -type DiskStorage struct { - *diskStorage + // the sub-partition to read from. + partition blobserver.Partition + + // to mirror new blobs into (when partition above is the default partition) + mirrorPartitions []blobserver.Partition } func New(root string) (storage *DiskStorage, err os.Error) { @@ -45,15 +45,16 @@ func New(root string) (storage *DiskStorage, err os.Error) { err = os.NewError(fmt.Sprintf("Storage root %q doesn't exist or is not a directory.", root)) return } - storage = &DiskStorage{&diskStorage{ - &blobserver.SimpleBlobHubPartitionMap{}, - root, - }} + storage = &DiskStorage{ + SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{}, + root: root, + partition: nil, // TODO: this will probably crash elsewhere + } return } func newFromConfig(config jsonconfig.Obj) (storage blobserver.Storage, err os.Error) { - sto := &diskStorage{ + sto := &DiskStorage{ SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{}, root: config.RequiredString("path"), } @@ -74,11 +75,11 @@ func init() { blobserver.RegisterStorageConstructor("filesystem", blobserver.StorageConstructor(newFromConfig)) } -func (ds *diskStorage) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, os.Error) { +func (ds *DiskStorage) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, os.Error) { return ds.Fetch(blob) } -func (ds *diskStorage) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int64, os.Error) { +func (ds *DiskStorage) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int64, os.Error) { fileName := ds.blobPath(nil, blob) stat, err := os.Stat(fileName) if errorIsNoEnt(err) { @@ -94,9 +95,9 @@ func (ds *diskStorage) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int return file, stat.Size, nil } -func (ds *diskStorage) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error { +func (ds *DiskStorage) Remove(blobs []*blobref.BlobRef) os.Error { for _, blob := range blobs { - fileName := ds.blobPath(partition, blob) + fileName := ds.blobPath(ds.partition, blob) err := os.Remove(fileName) switch { case err == nil: diff --git a/lib/go/camli/blobserver/localdisk/localdisk_test.go b/lib/go/camli/blobserver/localdisk/localdisk_test.go index ea885db65..0d00fc9f1 100644 --- a/lib/go/camli/blobserver/localdisk/localdisk_test.go +++ b/lib/go/camli/blobserver/localdisk/localdisk_test.go @@ -30,7 +30,7 @@ import ( "time" ) -func cleanUp(ds *diskStorage) { +func cleanUp(ds *DiskStorage) { os.RemoveAll(ds.root) } @@ -39,7 +39,7 @@ var ( rootEpoch = 0 ) -func NewStorage(t *testing.T) *diskStorage { +func NewStorage(t *testing.T) *DiskStorage { epochLock.Lock() rootEpoch++ path := fmt.Sprintf("%s/camli-testroot-%d-%d", os.TempDir(), os.Getpid(), rootEpoch) @@ -51,7 +51,7 @@ func NewStorage(t *testing.T) *diskStorage { if err != nil { t.Fatalf("Failed to run New: %v", err) } - return ds.diskStorage + return ds } type testBlob struct { @@ -86,7 +86,7 @@ func (tb *testBlob) AssertMatches(t *testing.T, sb *blobref.SizedBlobRef) { } func (tb *testBlob) ExpectUploadBlob(t *testing.T, ds blobserver.BlobReceiver) { - sb, err := ds.ReceiveBlob(tb.BlobRef(), tb.Reader(), nil) + sb, err := ds.ReceiveBlob(tb.BlobRef(), tb.Reader()) if err != nil { t.Fatalf("ReceiveBlob error: %v", err) } @@ -103,7 +103,7 @@ func TestReceiveStat(t *testing.T) { ch := make(chan *blobref.SizedBlobRef, 0) errch := make(chan os.Error, 1) go func() { - errch <- ds.Stat(ch, defaultPartition, tb.BlobRefSlice(), 0) + errch <- ds.Stat(ch, tb.BlobRefSlice(), 0) close(ch) }() got := 0 @@ -126,7 +126,7 @@ func TestStatWait(t *testing.T) { ch := make(chan *blobref.SizedBlobRef, 0) errch := make(chan os.Error, 1) go func() { - errch <- ds.Stat(ch, defaultPartition, tb.BlobRefSlice(), waitSeconds) + errch <- ds.Stat(ch, tb.BlobRefSlice(), waitSeconds) close(ch) }() @@ -167,7 +167,6 @@ func TestMultiStat(t *testing.T) { errch := make(chan os.Error, 1) go func() { errch <- ds.Stat(ch, - defaultPartition, []*blobref.BlobRef{blobfoo.BlobRef(), blobbar.BlobRef()}, 0) close(ch) diff --git a/lib/go/camli/blobserver/localdisk/path.go b/lib/go/camli/blobserver/localdisk/path.go index eb48dcebe..0df55b4ab 100644 --- a/lib/go/camli/blobserver/localdisk/path.go +++ b/lib/go/camli/blobserver/localdisk/path.go @@ -28,7 +28,7 @@ func BlobFileBaseName(b *blobref.BlobRef) string { return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest()) } -func (ds *diskStorage) blobDirectory(partition blobserver.NamedPartition, b *blobref.BlobRef) string { +func (ds *DiskStorage) blobDirectory(partition blobserver.NamedPartition, b *blobref.BlobRef) string { d := b.Digest() if len(d) < 6 { d = d + "______" @@ -36,11 +36,11 @@ func (ds *diskStorage) blobDirectory(partition blobserver.NamedPartition, b *blo return fmt.Sprintf("%s/%s/%s/%s", ds.PartitionRoot(partition), b.HashName(), d[0:3], d[3:6]) } -func (ds *diskStorage) blobPath(partition blobserver.NamedPartition, b *blobref.BlobRef) string { +func (ds *DiskStorage) blobPath(partition blobserver.NamedPartition, b *blobref.BlobRef) string { return fmt.Sprintf("%s/%s", ds.blobDirectory(partition, b), BlobFileBaseName(b)) } -func (ds *diskStorage) PartitionRoot(partition blobserver.NamedPartition) string { +func (ds *DiskStorage) PartitionRoot(partition blobserver.NamedPartition) string { if partition == nil { return ds.root } diff --git a/lib/go/camli/blobserver/localdisk/path_test.go b/lib/go/camli/blobserver/localdisk/path_test.go index 16515f161..63b3be7cd 100644 --- a/lib/go/camli/blobserver/localdisk/path_test.go +++ b/lib/go/camli/blobserver/localdisk/path_test.go @@ -29,7 +29,7 @@ func (pn partitionName) Name() string { func TestPaths(t *testing.T) { br := blobref.Parse("digalg-abc") - ds := &diskStorage{root: "/tmp/dir"} + ds := &DiskStorage{root: "/tmp/dir"} if e, g := "/tmp/dir/digalg/abc/___", ds.blobDirectory(nil, br); e != g { t.Errorf("short blobref dir; expected path %q; got %q", e, g) diff --git a/lib/go/camli/blobserver/localdisk/receive.go b/lib/go/camli/blobserver/localdisk/receive.go index 7c60585a6..66828535d 100644 --- a/lib/go/camli/blobserver/localdisk/receive.go +++ b/lib/go/camli/blobserver/localdisk/receive.go @@ -29,7 +29,7 @@ import ( var flagOpenImages = flag.Bool("showimages", false, "Show images on receiving them with eog.") -func (ds *diskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, mirrorPartitions []blobserver.Partition) (blobGot *blobref.SizedBlobRef, err os.Error) { +func (ds *DiskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader) (blobGot *blobref.SizedBlobRef, err os.Error) { hashedDirectory := ds.blobDirectory(nil, blobRef) err = os.MkdirAll(hashedDirectory, 0700) if err != nil { @@ -82,7 +82,7 @@ func (ds *diskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, m return } - for _, partition := range mirrorPartitions { + for _, partition := range ds.mirrorPartitions { partitionDir := ds.blobDirectory(partition, blobRef) if err = os.MkdirAll(partitionDir, 0700); err != nil { return @@ -107,11 +107,14 @@ func (ds *diskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, m exec.MergeWithStdout) } - hub := ds.GetBlobHub(nil) + hub := ds.GetBlobHub() hub.NotifyBlobReceived(blobRef) - for _, partition := range mirrorPartitions { - hub = ds.GetBlobHub(partition) - hub.NotifyBlobReceived(blobRef) + for _, partition := range ds.mirrorPartitions { + partition = partition + // TODO: need to get the other blobserver.Storage for + // the mirrors, not just their partition names, + // because we need their blob hubs now to wake them + // up. } return } diff --git a/lib/go/camli/blobserver/localdisk/stat.go b/lib/go/camli/blobserver/localdisk/stat.go index 05ac759e3..33179220d 100644 --- a/lib/go/camli/blobserver/localdisk/stat.go +++ b/lib/go/camli/blobserver/localdisk/stat.go @@ -17,16 +17,16 @@ limitations under the License. package localdisk import ( - "camli/blobref" - "camli/blobserver" "os" "sync" "time" + + "camli/blobref" ) const maxParallelStats = 20 -func (ds *diskStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error { +func (ds *DiskStorage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { if len(blobs) == 0 { return nil } @@ -35,7 +35,7 @@ func (ds *diskStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserve var missing []*blobref.BlobRef statSend := func(ref *blobref.BlobRef, appendMissing bool) os.Error { - fi, err := os.Stat(ds.blobPath(partition, ref)) + fi, err := os.Stat(ds.blobPath(ds.partition, ref)) switch { case err == nil && fi.IsRegular(): dest <- &blobref.SizedBlobRef{BlobRef: ref, Size: fi.Size} @@ -76,7 +76,7 @@ func (ds *diskStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserve // TODO: use a flag, defaulting to 60? waitSeconds = 60 } - hub := ds.GetBlobHub(partition) + hub := ds.GetBlobHub() ch := make(chan *blobref.BlobRef, 1) for _, missblob := range missing { hub.RegisterBlobListener(missblob, ch) diff --git a/lib/go/camli/blobserver/remote/remote.go b/lib/go/camli/blobserver/remote/remote.go index ee2f9659d..a114a9db0 100644 --- a/lib/go/camli/blobserver/remote/remote.go +++ b/lib/go/camli/blobserver/remote/remote.go @@ -49,15 +49,15 @@ func newFromConfig(config jsonconfig.Obj) (storage blobserver.Storage, err os.Er return sto, nil } -func (sto *remoteStorage) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error { +func (sto *remoteStorage) Remove(blobs []*blobref.BlobRef) os.Error { return os.NewError("TODO: implement") } -func (sto *remoteStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error { +func (sto *remoteStorage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { return os.NewError("TODO: implement") } -func (sto *remoteStorage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (*blobref.SizedBlobRef, os.Error) { +func (sto *remoteStorage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error) { return nil, os.NewError("TODO: implement") } @@ -67,7 +67,7 @@ func (sto *remoteStorage) FetchStreaming(b *blobref.BlobRef) (file io.ReadCloser func (sto *remoteStorage) MaxEnumerate() uint { return 1000 } -func (sto *remoteStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error { +func (sto *remoteStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error { defer close(dest) return os.NewError("TODO: implement") } diff --git a/lib/go/camli/blobserver/s3/enumerate.go b/lib/go/camli/blobserver/s3/enumerate.go index f7a01dbcf..b336ab7f7 100644 --- a/lib/go/camli/blobserver/s3/enumerate.go +++ b/lib/go/camli/blobserver/s3/enumerate.go @@ -21,14 +21,13 @@ import ( "os" "camli/blobref" - "camli/blobserver" ) var _ = log.Printf func (sto *s3Storage) MaxEnumerate() uint { return 1000 } -func (sto *s3Storage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error { +func (sto *s3Storage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error { defer close(dest) objs, err := sto.s3Client.ListBucket(sto.bucket, after, limit) if err != nil { diff --git a/lib/go/camli/blobserver/s3/receive.go b/lib/go/camli/blobserver/s3/receive.go index 17ddd7153..9b916bf8a 100644 --- a/lib/go/camli/blobserver/s3/receive.go +++ b/lib/go/camli/blobserver/s3/receive.go @@ -100,7 +100,7 @@ func (as *amazonSlurper) Cleanup() { } } -func (sto *s3Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (*blobref.SizedBlobRef, os.Error) { +func (sto *s3Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error) { slurper := newAmazonSlurper(blob) defer slurper.Cleanup() diff --git a/lib/go/camli/blobserver/s3/remove.go b/lib/go/camli/blobserver/s3/remove.go index dd2917bc8..ae2ddeb7e 100644 --- a/lib/go/camli/blobserver/s3/remove.go +++ b/lib/go/camli/blobserver/s3/remove.go @@ -21,12 +21,11 @@ import ( "os" "camli/blobref" - "camli/blobserver" ) var _ = log.Printf -func (sto *s3Storage) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error { +func (sto *s3Storage) Remove(blobs []*blobref.BlobRef) os.Error { // TODO: do these in parallel var reterr os.Error for _, blob := range blobs { diff --git a/lib/go/camli/blobserver/s3/stat.go b/lib/go/camli/blobserver/s3/stat.go index be1c57dd7..be8302a6d 100644 --- a/lib/go/camli/blobserver/s3/stat.go +++ b/lib/go/camli/blobserver/s3/stat.go @@ -21,12 +21,11 @@ import ( "os" "camli/blobref" - "camli/blobserver" ) var _ = log.Printf -func (sto *s3Storage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error { +func (sto *s3Storage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { // TODO: do n stats in parallel for _, br := range blobs { size, err := sto.s3Client.Stat(br.String(), sto.bucket) diff --git a/lib/go/camli/mysqlindexer/enumerate.go b/lib/go/camli/mysqlindexer/enumerate.go index ec7023252..2020ce881 100644 --- a/lib/go/camli/mysqlindexer/enumerate.go +++ b/lib/go/camli/mysqlindexer/enumerate.go @@ -17,11 +17,10 @@ limitations under the License. package mysqlindexer import ( - "camli/blobref" - "camli/blobserver" - "os" + "camli/blobref" + mysql "camli/third_party/github.com/Philio/GoMySQL" ) @@ -30,7 +29,7 @@ type blobRow struct { size int64 } -func (mi *Indexer) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) (err os.Error) { +func (mi *Indexer) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) (err os.Error) { // MySQL connection stuff. var client *mysql.Client client, err = mi.getConnection() diff --git a/lib/go/camli/mysqlindexer/mysqlindexer.go b/lib/go/camli/mysqlindexer/mysqlindexer.go index 36af5ab8c..995206be6 100644 --- a/lib/go/camli/mysqlindexer/mysqlindexer.go +++ b/lib/go/camli/mysqlindexer/mysqlindexer.go @@ -127,6 +127,6 @@ func (mi *Indexer) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, return nil, 0, os.NewError("Fetch isn't supported by the MySQL indexer") } -func (mi *Indexer) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error { +func (mi *Indexer) Remove(blobs []*blobref.BlobRef) os.Error { return os.NewError("Remove isn't supported by the MySQL indexer") } diff --git a/lib/go/camli/mysqlindexer/receive.go b/lib/go/camli/mysqlindexer/receive.go index b0976eb7c..fd3dd09ad 100644 --- a/lib/go/camli/mysqlindexer/receive.go +++ b/lib/go/camli/mysqlindexer/receive.go @@ -91,7 +91,7 @@ func (sn *blobSniffer) bufferIsCamliJson() bool { return true } -func (mi *Indexer) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (retsb *blobref.SizedBlobRef, err os.Error) { +func (mi *Indexer) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader) (retsb *blobref.SizedBlobRef, err os.Error) { sniffer := new(blobSniffer) hash := blobRef.Hash() var written int64 diff --git a/lib/go/camli/mysqlindexer/stat.go b/lib/go/camli/mysqlindexer/stat.go index 88dc371f2..153242bfc 100644 --- a/lib/go/camli/mysqlindexer/stat.go +++ b/lib/go/camli/mysqlindexer/stat.go @@ -18,7 +18,6 @@ package mysqlindexer import ( "camli/blobref" - "camli/blobserver" "log" "fmt" @@ -26,7 +25,7 @@ import ( "strings" ) -func (mi *Indexer) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error { +func (mi *Indexer) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { error := func(err os.Error) os.Error { log.Printf("mysqlindexer: stat error: %v", err) return err diff --git a/server/go/camlistored/camlistored.go b/server/go/camlistored/camlistored.go index 83763c8cd..6f326880a 100644 --- a/server/go/camlistored/camlistored.go +++ b/server/go/camlistored/camlistored.go @@ -97,24 +97,23 @@ func handleCamliUsingStorage(conn http.ResponseWriter, req *http.Request, action case "GET": switch action { case "enumerate-blobs": - handler = auth.RequireAuth(handlers.CreateEnumerateHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateEnumerateHandler(storage)) case "stat": - handler = auth.RequireAuth(handlers.CreateStatHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateStatHandler(storage)) default: handler = handlers.CreateGetHandler(storage) } case "POST": switch action { case "stat": - handler = auth.RequireAuth(handlers.CreateStatHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateStatHandler(storage)) case "upload": - handler = auth.RequireAuth(handlers.CreateUploadHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateUploadHandler(storage)) case "remove": - // Currently only allows removing from a non-main partition. - handler = auth.RequireAuth(handlers.CreateRemoveHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateRemoveHandler(storage)) } case "PUT": // no longer part of spec - handler = auth.RequireAuth(handlers.CreateNonStandardPutHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateNonStandardPutHandler(storage)) } handler(conn, req) }