From bed26de507192611f09c96dd66f62ac7ce875bf4 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 9 May 2011 09:11:18 -0700 Subject: [PATCH] Cleanup: remove partitions from interfaces. WIP but compiles. Still need to fix up blobhub notification for localdisk mirroring, since now localdisk can't find the notification hub for mirrored partitions. (and also can't be configured yet) --- clients/go/cammount/cacher.go | 2 +- lib/go/camli/blobserver/blobhub.go | 26 ++++-------- lib/go/camli/blobserver/handlers/enumerate.go | 8 ++-- .../blobserver/handlers/enumerate_test.go | 4 +- lib/go/camli/blobserver/handlers/remove.go | 19 +++++---- lib/go/camli/blobserver/handlers/stat.go | 11 ++--- lib/go/camli/blobserver/handlers/upload.go | 29 +++++++------ lib/go/camli/blobserver/interface.go | 42 ++++++++++--------- .../camli/blobserver/localdisk/enumerate.go | 10 ++--- .../blobserver/localdisk/enumerate_test.go | 13 +++--- .../camli/blobserver/localdisk/localdisk.go | 31 +++++++------- .../blobserver/localdisk/localdisk_test.go | 13 +++--- lib/go/camli/blobserver/localdisk/path.go | 6 +-- .../camli/blobserver/localdisk/path_test.go | 2 +- lib/go/camli/blobserver/localdisk/receive.go | 15 ++++--- lib/go/camli/blobserver/localdisk/stat.go | 10 ++--- lib/go/camli/blobserver/remote/remote.go | 8 ++-- lib/go/camli/blobserver/s3/enumerate.go | 3 +- lib/go/camli/blobserver/s3/receive.go | 2 +- lib/go/camli/blobserver/s3/remove.go | 3 +- lib/go/camli/blobserver/s3/stat.go | 3 +- lib/go/camli/mysqlindexer/enumerate.go | 7 ++-- lib/go/camli/mysqlindexer/mysqlindexer.go | 2 +- lib/go/camli/mysqlindexer/receive.go | 2 +- lib/go/camli/mysqlindexer/stat.go | 3 +- server/go/camlistored/camlistored.go | 13 +++--- 26 files changed, 141 insertions(+), 146 deletions(-) diff --git a/clients/go/cammount/cacher.go b/clients/go/cammount/cacher.go index 99a74a64a..434481dd5 100644 --- a/clients/go/cammount/cacher.go +++ b/clients/go/cammount/cacher.go @@ -47,7 +47,7 @@ func (cf *CachingFetcher) Fetch(br *blobref.BlobRef) (file blobref.ReadSeekClose return nil, 0, err } - _, err = cf.c.ReceiveBlob(br, sblob, nil) + _, err = cf.c.ReceiveBlob(br, sblob) if err != nil { return nil, 0, err } diff --git a/lib/go/camli/blobserver/blobhub.go b/lib/go/camli/blobserver/blobhub.go index abb75c6a3..bd7d418d1 100644 --- a/lib/go/camli/blobserver/blobhub.go +++ b/lib/go/camli/blobserver/blobhub.go @@ -121,28 +121,18 @@ func (h *SimpleBlobHub) UnregisterBlobListener(blob *blobref.BlobRef, ch chan *b type SimpleBlobHubPartitionMap struct { hubLock sync.Mutex - hubMap map[string]BlobHub + hub BlobHub } -func (spm *SimpleBlobHubPartitionMap) GetBlobHub(partition Partition) BlobHub { - name := "" - if partition != nil { - name = partition.Name() - } +func (spm *SimpleBlobHubPartitionMap) GetBlobHub() BlobHub { spm.hubLock.Lock() defer spm.hubLock.Unlock() - if spm.hubMap == nil { - spm.hubMap = make(map[string]BlobHub) + if spm.hub == nil { + // TODO: in the future, allow for different blob hub + // implementations rather than the + // everything-in-memory-on-a-single-machine SimpleBlobHub. + spm.hub = new(SimpleBlobHub) } - if hub, ok := spm.hubMap[name]; ok { - return hub - } - - // TODO: in the future, allow for different blob hub - // implementations rather than the - // everything-in-memory-on-a-single-machine SimpleBlobHub. - hub := new(SimpleBlobHub) - spm.hubMap[name] = hub - return hub + return spm.hub } diff --git a/lib/go/camli/blobserver/handlers/enumerate.go b/lib/go/camli/blobserver/handlers/enumerate.go index 211ed3a2a..100af909a 100644 --- a/lib/go/camli/blobserver/handlers/enumerate.go +++ b/lib/go/camli/blobserver/handlers/enumerate.go @@ -36,15 +36,15 @@ type blobInfo struct { } -func CreateEnumerateHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) { +func CreateEnumerateHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { - handleEnumerateBlobs(conn, req, storage, partition) + handleEnumerateBlobs(conn, req, storage) } } const errMsgMaxWaitSecWithAfter = "Can't use 'maxwaitsec' with 'after'.\n" -func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobEnumerator, partition blobserver.Partition) { +func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobEnumerator) { // Potential input parameters formValueLimit := req.FormValue("limit") formValueMaxWaitSec := req.FormValue("maxwaitsec") @@ -92,7 +92,7 @@ func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage b blobch := make(chan *blobref.SizedBlobRef, 100) resultch := make(chan os.Error, 1) go func() { - resultch <- storage.EnumerateBlobs(blobch, partition, formValueAfter, limit+1, waitSeconds) + resultch <- storage.EnumerateBlobs(blobch, formValueAfter, limit+1, waitSeconds) }() after := "" diff --git a/lib/go/camli/blobserver/handlers/enumerate_test.go b/lib/go/camli/blobserver/handlers/enumerate_test.go index 3a5388266..ab2f73925 100644 --- a/lib/go/camli/blobserver/handlers/enumerate_test.go +++ b/lib/go/camli/blobserver/handlers/enumerate_test.go @@ -18,7 +18,6 @@ package handlers import ( "camli/blobref" - "camli/blobserver" . "camli/test/asserts" "http" "http/httptest" @@ -43,7 +42,6 @@ type emptyEnumerator struct { } func (ee *emptyEnumerator) EnumerateBlobs(dest chan *blobref.SizedBlobRef, - partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error { @@ -75,7 +73,7 @@ func TestEnumerateInput(t *testing.T) { wr := httptest.NewRecorder() wr.Code = 200 // default req := makeGetRequest(test.url) - handleEnumerateBlobs(wr, req, enumerator, nil) // TODO: use better partition + handleEnumerateBlobs(wr, req, enumerator) ExpectInt(t, test.expectedCode, wr.Code, "response code for " + test.name) ExpectString(t, test.expectedBody, wr.Body.String(), "output for " + test.name) } diff --git a/lib/go/camli/blobserver/handlers/remove.go b/lib/go/camli/blobserver/handlers/remove.go index 36c1123c1..f515d2bb3 100644 --- a/lib/go/camli/blobserver/handlers/remove.go +++ b/lib/go/camli/blobserver/handlers/remove.go @@ -27,20 +27,25 @@ import ( const maxRemovesPerRequest = 1000 -func CreateRemoveHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) { +func CreateRemoveHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { - handleRemove(conn, req, storage, partition) + handleRemove(conn, req, storage) } } -func handleRemove(conn http.ResponseWriter, req *http.Request, storage blobserver.Storage, partition blobserver.Partition) { +func handleRemove(conn http.ResponseWriter, req *http.Request, storage blobserver.Storage) { if req.Method != "POST" { log.Fatalf("Invalid method; handlers misconfigured") } - - if !partition.IsQueue() { + configer, ok := storage.(blobserver.Configer) + if !ok { conn.WriteHeader(http.StatusForbidden) - fmt.Fprintf(conn, "Can only remove blobs from a queue partition.\n") + fmt.Fprintf(conn, "Remove handler's blobserver.Storage isn't a blobserver.Configuer; can't remove") + return + } + if !configer.Config().IsQueue { + conn.WriteHeader(http.StatusForbidden) + fmt.Fprintf(conn, "Can only remove blobs from a queue.\n") return } @@ -68,7 +73,7 @@ func handleRemove(conn http.ResponseWriter, req *http.Request, storage blobserve toRemoveStr = append(toRemoveStr, ref.String()) } - err := storage.Remove(partition, toRemove) + err := storage.Remove(toRemove) if err != nil { conn.WriteHeader(http.StatusInternalServerError) log.Printf("Server error during remove: %v", err) diff --git a/lib/go/camli/blobserver/handlers/stat.go b/lib/go/camli/blobserver/handlers/stat.go index beb783cfb..46bdf251a 100644 --- a/lib/go/camli/blobserver/handlers/stat.go +++ b/lib/go/camli/blobserver/handlers/stat.go @@ -27,15 +27,15 @@ import ( "strconv" ) -func CreateStatHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) { +func CreateStatHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { - handleStat(conn, req, storage, partition) + handleStat(conn, req, storage) } } const maxStatBlobs = 1000 -func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobStatter, partition blobserver.Partition) { +func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobStatter) { toStat := make([]*blobref.BlobRef, 0) switch req.Method { case "POST": @@ -91,7 +91,7 @@ func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver. blobch := make(chan *blobref.SizedBlobRef) resultch := make(chan os.Error, 1) go func() { - err := storage.Stat(blobch, partition, toStat, waitSeconds) + err := storage.Stat(blobch, toStat, waitSeconds) close(blobch) resultch <- err }() @@ -111,7 +111,8 @@ func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver. } } - ret := commonUploadResponse(partition, req) + configer, _ := storage.(blobserver.Configer) + ret := commonUploadResponse(configer, req) ret["stat"] = statRes ret["canLongPoll"] = true httputil.ReturnJson(conn, ret) diff --git a/lib/go/camli/blobserver/handlers/upload.go b/lib/go/camli/blobserver/handlers/upload.go index e99dbfc5b..d6011b972 100644 --- a/lib/go/camli/blobserver/handlers/upload.go +++ b/lib/go/camli/blobserver/handlers/upload.go @@ -30,13 +30,13 @@ import ( "strings" ) -func CreateUploadHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) { +func CreateUploadHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { - handleMultiPartUpload(conn, req, storage, partition) + handleMultiPartUpload(conn, req, storage) } } -func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver, partition blobserver.Partition) { +func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver) { if !(req.Method == "POST" && strings.Contains(req.URL.Path, "/camli/upload")) { httputil.BadRequestError(conn, "Inconfigured handler.") return @@ -96,7 +96,7 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece continue } - blobGot, err := blobReceiver.ReceiveBlob(ref, mimePart, partition.GetMirrorPartitions()) + blobGot, err := blobReceiver.ReceiveBlob(ref, mimePart) if err != nil { addError(fmt.Sprintf("Error receiving blob %v: %v\n", ref, err)) break @@ -106,7 +106,8 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece } log.Println("Done reading multipart body.") - ret := commonUploadResponse(partition, req) + configer, _ := blobReceiver.(blobserver.Configer) // TODO: ugly? + ret := commonUploadResponse(configer, req) received := make([]map[string]interface{}, 0) for _, got := range receivedBlobs { @@ -125,15 +126,17 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece httputil.ReturnJson(conn, ret) } -func commonUploadResponse(partition blobserver.Partition, req *http.Request) map[string]interface{} { +func commonUploadResponse(configer blobserver.Configer, req *http.Request) map[string]interface{} { ret := make(map[string]interface{}) - ret["maxUploadSize"] = 2147483647 // 2GB.. *shrug* + ret["maxUploadSize"] = 2147483647 // 2GB.. *shrug*. TODO: cut this down, standardize ret["uploadUrlExpirationSeconds"] = 86400 // TODO: camli/upload isn't part of the spec. we should pick // something different here just to make it obvious that this - // isn't a well-known URL and facilitate lazy clients. - ret["uploadUrl"] = partition.URLBase() + "/camli/upload" + // isn't a well-known URL and accidentally encourage lazy clients. + if configer != nil { + ret["uploadUrl"] = configer.Config().URLBase + "/camli/upload" + } return ret } @@ -141,13 +144,13 @@ func commonUploadResponse(partition blobserver.Partition, req *http.Request) map var kPutPattern *regexp.Regexp = regexp.MustCompile(`^/camli/([a-z0-9]+)-([a-f0-9]+)$`) // NOTE: not part of the spec at present. old. might be re-introduced. -func CreateNonStandardPutHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) { +func CreateNonStandardPutHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) { return func(conn http.ResponseWriter, req *http.Request) { - handlePut(conn, req, storage, partition) + handlePut(conn, req, storage) } } -func handlePut(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver, partition blobserver.Partition) { +func handlePut(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver) { blobRef := blobref.FromPattern(kPutPattern, req.URL.Path) if blobRef == nil { httputil.BadRequestError(conn, "Malformed PUT URL.") @@ -159,7 +162,7 @@ func handlePut(conn http.ResponseWriter, req *http.Request, blobReceiver blobser return } - _, err := blobReceiver.ReceiveBlob(blobRef, req.Body, partition.GetMirrorPartitions()) + _, err := blobReceiver.ReceiveBlob(blobRef, req.Body) if err != nil { httputil.ServerError(conn, err) return diff --git a/lib/go/camli/blobserver/interface.go b/lib/go/camli/blobserver/interface.go index 5e6145c61..4ee03fe9e 100644 --- a/lib/go/camli/blobserver/interface.go +++ b/lib/go/camli/blobserver/interface.go @@ -45,20 +45,16 @@ type Partition interface { type BlobReceiver interface { // ReceiveBlob accepts a newly uploaded blob and writes it to // disk. - // - // mirrorPartitions may not be supported by all instances - // and may return an error if used. - ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []Partition) (*blobref.SizedBlobRef, os.Error) + ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error) } type BlobStatter interface { // Stat checks for the existence of blobs, writing their sizes // (if found back to the dest channel), and returning an error // or nil. Stat() should NOT close the channel. - // waitSeconds is the max time to wait for the blobs to exist - // in the given partition, or 0 for no delay. + // waitSeconds is the max time to wait for the blobs to exist, + // or 0 for no delay. Stat(dest chan *blobref.SizedBlobRef, - partition Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error } @@ -74,12 +70,11 @@ type BlobEnumerator interface { // sorted, as long as they are lexigraphically greater than // after (if provided). // limit will be supplied and sanity checked by caller. - // waitSeconds is the max time to wait for any blobs to exist - // in the given partition, or 0 for no delay. + // waitSeconds is the max time to wait for any blobs to exist, + // or 0 for no delay. // EnumerateBlobs must close the channel. (even if limit // was hit and more blobs remain) EnumerateBlobs(dest chan *blobref.SizedBlobRef, - partition Partition, after string, limit uint, waitSeconds int) os.Error @@ -92,20 +87,29 @@ type Cache interface { BlobStatter } +type Config struct { + Writable, Readable bool + IsQueue bool // supports deletes + + // the "http://host:port" and optional path (but without trailing slash) to have "/camli/*" appended + URLBase string +} + +type Configer interface { + Config() *Config +} + type Storage interface { blobref.StreamingFetcher BlobReceiver BlobStatter BlobEnumerator - // Remove 0 or more blobs from provided partition, which - // should be empty for the default partition. Removal of - // non-existent items isn't an error. Returns failure if any - // items existed but failed to be deleted. - Remove(partition Partition, blobs []*blobref.BlobRef) os.Error + // Remove 0 or more blobs. Removal of non-existent items + // isn't an error. Returns failure if any items existed but + // failed to be deleted. + Remove(blobs []*blobref.BlobRef) os.Error - // Returns the blob notification bus for a given partition. - // Use nil for the default partition. - // TODO: move this to be a method on the Partition interface? - GetBlobHub(partition Partition) BlobHub + // Returns the blob notification bus + GetBlobHub() BlobHub } diff --git a/lib/go/camli/blobserver/localdisk/enumerate.go b/lib/go/camli/blobserver/localdisk/enumerate.go index e1b117dc1..6e789fe95 100644 --- a/lib/go/camli/blobserver/localdisk/enumerate.go +++ b/lib/go/camli/blobserver/localdisk/enumerate.go @@ -17,13 +17,13 @@ limitations under the License. package localdisk import ( - "camli/blobref" - "camli/blobserver" "fmt" "os" "sort" "strings" "time" + + "camli/blobref" ) type readBlobRequest struct { @@ -110,8 +110,8 @@ func readBlobs(opts readBlobRequest) os.Error { return nil } -func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error { - dirRoot := ds.PartitionRoot(partition) +func (ds *DiskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error { + dirRoot := ds.PartitionRoot(ds.partition) limitMutable := limit var err os.Error doScan := func() { @@ -132,7 +132,7 @@ func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition // The case where we have to wait for waitSeconds for any blob // to possibly appear. - hub := ds.GetBlobHub(partition) + hub := ds.GetBlobHub() ch := make(chan *blobref.BlobRef, 1) hub.RegisterListener(ch) defer hub.UnregisterListener(ch) diff --git a/lib/go/camli/blobserver/localdisk/enumerate_test.go b/lib/go/camli/blobserver/localdisk/enumerate_test.go index 37769970e..775ebf871 100644 --- a/lib/go/camli/blobserver/localdisk/enumerate_test.go +++ b/lib/go/camli/blobserver/localdisk/enumerate_test.go @@ -18,7 +18,6 @@ package localdisk import ( "camli/blobref" - "camli/blobserver" . "camli/test/asserts" "fmt" @@ -29,8 +28,6 @@ import ( "time" ) -var defaultPartition blobserver.Partition = nil - func TestEnumerate(t *testing.T) { ds := NewStorage(t) defer cleanUp(ds) @@ -49,7 +46,7 @@ func TestEnumerate(t *testing.T) { ch := make(chan *blobref.SizedBlobRef) errCh := make(chan os.Error) go func() { - errCh <- ds.EnumerateBlobs(ch, defaultPartition, "", limit, waitSeconds) + errCh <- ds.EnumerateBlobs(ch, "", limit, waitSeconds) }() var sb *blobref.SizedBlobRef @@ -69,7 +66,7 @@ func TestEnumerate(t *testing.T) { // Now again, but skipping foo's blob ch = make(chan *blobref.SizedBlobRef) go func() { - errCh <- ds.EnumerateBlobs(ch, defaultPartition, + errCh <- ds.EnumerateBlobs(ch, foo.BlobRef().String(), limit, waitSeconds) }() @@ -93,7 +90,7 @@ func TestEnumerateEmpty(t *testing.T) { ch := make(chan *blobref.SizedBlobRef) errCh := make(chan os.Error) go func() { - errCh <- ds.EnumerateBlobs(ch, defaultPartition, + errCh <- ds.EnumerateBlobs(ch, "", limit, waitSeconds) }() @@ -110,7 +107,7 @@ func TestEnumerateEmptyLongPoll(t *testing.T) { ch := make(chan *blobref.SizedBlobRef) errCh := make(chan os.Error) go func() { - errCh <- ds.EnumerateBlobs(ch, defaultPartition, + errCh <- ds.EnumerateBlobs(ch, "", limit, waitSeconds) }() @@ -177,7 +174,7 @@ func TestEnumerateIsSorted(t *testing.T) { ch := make(chan *blobref.SizedBlobRef) errCh := make(chan os.Error) go func() { - errCh <- ds.EnumerateBlobs(ch, defaultPartition, test.after, limit, 0) + errCh <- ds.EnumerateBlobs(ch, test.after, limit, 0) }() var got = make([]*blobref.SizedBlobRef, 0, blobsToMake) for { diff --git a/lib/go/camli/blobserver/localdisk/localdisk.go b/lib/go/camli/blobserver/localdisk/localdisk.go index 4ac7202c5..da263c24e 100644 --- a/lib/go/camli/blobserver/localdisk/localdisk.go +++ b/lib/go/camli/blobserver/localdisk/localdisk.go @@ -27,15 +27,15 @@ import ( "camli/jsonconfig" ) -type diskStorage struct { +type DiskStorage struct { *blobserver.SimpleBlobHubPartitionMap root string -} -// TODO: lazy hack because I didn't want to rename diskStorage everywhere -// during an experiment. should just rename it now. -type DiskStorage struct { - *diskStorage + // the sub-partition to read from. + partition blobserver.Partition + + // to mirror new blobs into (when partition above is the default partition) + mirrorPartitions []blobserver.Partition } func New(root string) (storage *DiskStorage, err os.Error) { @@ -45,15 +45,16 @@ func New(root string) (storage *DiskStorage, err os.Error) { err = os.NewError(fmt.Sprintf("Storage root %q doesn't exist or is not a directory.", root)) return } - storage = &DiskStorage{&diskStorage{ - &blobserver.SimpleBlobHubPartitionMap{}, - root, - }} + storage = &DiskStorage{ + SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{}, + root: root, + partition: nil, // TODO: this will probably crash elsewhere + } return } func newFromConfig(config jsonconfig.Obj) (storage blobserver.Storage, err os.Error) { - sto := &diskStorage{ + sto := &DiskStorage{ SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{}, root: config.RequiredString("path"), } @@ -74,11 +75,11 @@ func init() { blobserver.RegisterStorageConstructor("filesystem", blobserver.StorageConstructor(newFromConfig)) } -func (ds *diskStorage) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, os.Error) { +func (ds *DiskStorage) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, os.Error) { return ds.Fetch(blob) } -func (ds *diskStorage) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int64, os.Error) { +func (ds *DiskStorage) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int64, os.Error) { fileName := ds.blobPath(nil, blob) stat, err := os.Stat(fileName) if errorIsNoEnt(err) { @@ -94,9 +95,9 @@ func (ds *diskStorage) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int return file, stat.Size, nil } -func (ds *diskStorage) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error { +func (ds *DiskStorage) Remove(blobs []*blobref.BlobRef) os.Error { for _, blob := range blobs { - fileName := ds.blobPath(partition, blob) + fileName := ds.blobPath(ds.partition, blob) err := os.Remove(fileName) switch { case err == nil: diff --git a/lib/go/camli/blobserver/localdisk/localdisk_test.go b/lib/go/camli/blobserver/localdisk/localdisk_test.go index ea885db65..0d00fc9f1 100644 --- a/lib/go/camli/blobserver/localdisk/localdisk_test.go +++ b/lib/go/camli/blobserver/localdisk/localdisk_test.go @@ -30,7 +30,7 @@ import ( "time" ) -func cleanUp(ds *diskStorage) { +func cleanUp(ds *DiskStorage) { os.RemoveAll(ds.root) } @@ -39,7 +39,7 @@ var ( rootEpoch = 0 ) -func NewStorage(t *testing.T) *diskStorage { +func NewStorage(t *testing.T) *DiskStorage { epochLock.Lock() rootEpoch++ path := fmt.Sprintf("%s/camli-testroot-%d-%d", os.TempDir(), os.Getpid(), rootEpoch) @@ -51,7 +51,7 @@ func NewStorage(t *testing.T) *diskStorage { if err != nil { t.Fatalf("Failed to run New: %v", err) } - return ds.diskStorage + return ds } type testBlob struct { @@ -86,7 +86,7 @@ func (tb *testBlob) AssertMatches(t *testing.T, sb *blobref.SizedBlobRef) { } func (tb *testBlob) ExpectUploadBlob(t *testing.T, ds blobserver.BlobReceiver) { - sb, err := ds.ReceiveBlob(tb.BlobRef(), tb.Reader(), nil) + sb, err := ds.ReceiveBlob(tb.BlobRef(), tb.Reader()) if err != nil { t.Fatalf("ReceiveBlob error: %v", err) } @@ -103,7 +103,7 @@ func TestReceiveStat(t *testing.T) { ch := make(chan *blobref.SizedBlobRef, 0) errch := make(chan os.Error, 1) go func() { - errch <- ds.Stat(ch, defaultPartition, tb.BlobRefSlice(), 0) + errch <- ds.Stat(ch, tb.BlobRefSlice(), 0) close(ch) }() got := 0 @@ -126,7 +126,7 @@ func TestStatWait(t *testing.T) { ch := make(chan *blobref.SizedBlobRef, 0) errch := make(chan os.Error, 1) go func() { - errch <- ds.Stat(ch, defaultPartition, tb.BlobRefSlice(), waitSeconds) + errch <- ds.Stat(ch, tb.BlobRefSlice(), waitSeconds) close(ch) }() @@ -167,7 +167,6 @@ func TestMultiStat(t *testing.T) { errch := make(chan os.Error, 1) go func() { errch <- ds.Stat(ch, - defaultPartition, []*blobref.BlobRef{blobfoo.BlobRef(), blobbar.BlobRef()}, 0) close(ch) diff --git a/lib/go/camli/blobserver/localdisk/path.go b/lib/go/camli/blobserver/localdisk/path.go index eb48dcebe..0df55b4ab 100644 --- a/lib/go/camli/blobserver/localdisk/path.go +++ b/lib/go/camli/blobserver/localdisk/path.go @@ -28,7 +28,7 @@ func BlobFileBaseName(b *blobref.BlobRef) string { return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest()) } -func (ds *diskStorage) blobDirectory(partition blobserver.NamedPartition, b *blobref.BlobRef) string { +func (ds *DiskStorage) blobDirectory(partition blobserver.NamedPartition, b *blobref.BlobRef) string { d := b.Digest() if len(d) < 6 { d = d + "______" @@ -36,11 +36,11 @@ func (ds *diskStorage) blobDirectory(partition blobserver.NamedPartition, b *blo return fmt.Sprintf("%s/%s/%s/%s", ds.PartitionRoot(partition), b.HashName(), d[0:3], d[3:6]) } -func (ds *diskStorage) blobPath(partition blobserver.NamedPartition, b *blobref.BlobRef) string { +func (ds *DiskStorage) blobPath(partition blobserver.NamedPartition, b *blobref.BlobRef) string { return fmt.Sprintf("%s/%s", ds.blobDirectory(partition, b), BlobFileBaseName(b)) } -func (ds *diskStorage) PartitionRoot(partition blobserver.NamedPartition) string { +func (ds *DiskStorage) PartitionRoot(partition blobserver.NamedPartition) string { if partition == nil { return ds.root } diff --git a/lib/go/camli/blobserver/localdisk/path_test.go b/lib/go/camli/blobserver/localdisk/path_test.go index 16515f161..63b3be7cd 100644 --- a/lib/go/camli/blobserver/localdisk/path_test.go +++ b/lib/go/camli/blobserver/localdisk/path_test.go @@ -29,7 +29,7 @@ func (pn partitionName) Name() string { func TestPaths(t *testing.T) { br := blobref.Parse("digalg-abc") - ds := &diskStorage{root: "/tmp/dir"} + ds := &DiskStorage{root: "/tmp/dir"} if e, g := "/tmp/dir/digalg/abc/___", ds.blobDirectory(nil, br); e != g { t.Errorf("short blobref dir; expected path %q; got %q", e, g) diff --git a/lib/go/camli/blobserver/localdisk/receive.go b/lib/go/camli/blobserver/localdisk/receive.go index 7c60585a6..66828535d 100644 --- a/lib/go/camli/blobserver/localdisk/receive.go +++ b/lib/go/camli/blobserver/localdisk/receive.go @@ -29,7 +29,7 @@ import ( var flagOpenImages = flag.Bool("showimages", false, "Show images on receiving them with eog.") -func (ds *diskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, mirrorPartitions []blobserver.Partition) (blobGot *blobref.SizedBlobRef, err os.Error) { +func (ds *DiskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader) (blobGot *blobref.SizedBlobRef, err os.Error) { hashedDirectory := ds.blobDirectory(nil, blobRef) err = os.MkdirAll(hashedDirectory, 0700) if err != nil { @@ -82,7 +82,7 @@ func (ds *diskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, m return } - for _, partition := range mirrorPartitions { + for _, partition := range ds.mirrorPartitions { partitionDir := ds.blobDirectory(partition, blobRef) if err = os.MkdirAll(partitionDir, 0700); err != nil { return @@ -107,11 +107,14 @@ func (ds *diskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, m exec.MergeWithStdout) } - hub := ds.GetBlobHub(nil) + hub := ds.GetBlobHub() hub.NotifyBlobReceived(blobRef) - for _, partition := range mirrorPartitions { - hub = ds.GetBlobHub(partition) - hub.NotifyBlobReceived(blobRef) + for _, partition := range ds.mirrorPartitions { + partition = partition + // TODO: need to get the other blobserver.Storage for + // the mirrors, not just their partition names, + // because we need their blob hubs now to wake them + // up. } return } diff --git a/lib/go/camli/blobserver/localdisk/stat.go b/lib/go/camli/blobserver/localdisk/stat.go index 05ac759e3..33179220d 100644 --- a/lib/go/camli/blobserver/localdisk/stat.go +++ b/lib/go/camli/blobserver/localdisk/stat.go @@ -17,16 +17,16 @@ limitations under the License. package localdisk import ( - "camli/blobref" - "camli/blobserver" "os" "sync" "time" + + "camli/blobref" ) const maxParallelStats = 20 -func (ds *diskStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error { +func (ds *DiskStorage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { if len(blobs) == 0 { return nil } @@ -35,7 +35,7 @@ func (ds *diskStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserve var missing []*blobref.BlobRef statSend := func(ref *blobref.BlobRef, appendMissing bool) os.Error { - fi, err := os.Stat(ds.blobPath(partition, ref)) + fi, err := os.Stat(ds.blobPath(ds.partition, ref)) switch { case err == nil && fi.IsRegular(): dest <- &blobref.SizedBlobRef{BlobRef: ref, Size: fi.Size} @@ -76,7 +76,7 @@ func (ds *diskStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserve // TODO: use a flag, defaulting to 60? waitSeconds = 60 } - hub := ds.GetBlobHub(partition) + hub := ds.GetBlobHub() ch := make(chan *blobref.BlobRef, 1) for _, missblob := range missing { hub.RegisterBlobListener(missblob, ch) diff --git a/lib/go/camli/blobserver/remote/remote.go b/lib/go/camli/blobserver/remote/remote.go index ee2f9659d..a114a9db0 100644 --- a/lib/go/camli/blobserver/remote/remote.go +++ b/lib/go/camli/blobserver/remote/remote.go @@ -49,15 +49,15 @@ func newFromConfig(config jsonconfig.Obj) (storage blobserver.Storage, err os.Er return sto, nil } -func (sto *remoteStorage) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error { +func (sto *remoteStorage) Remove(blobs []*blobref.BlobRef) os.Error { return os.NewError("TODO: implement") } -func (sto *remoteStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error { +func (sto *remoteStorage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { return os.NewError("TODO: implement") } -func (sto *remoteStorage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (*blobref.SizedBlobRef, os.Error) { +func (sto *remoteStorage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error) { return nil, os.NewError("TODO: implement") } @@ -67,7 +67,7 @@ func (sto *remoteStorage) FetchStreaming(b *blobref.BlobRef) (file io.ReadCloser func (sto *remoteStorage) MaxEnumerate() uint { return 1000 } -func (sto *remoteStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error { +func (sto *remoteStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error { defer close(dest) return os.NewError("TODO: implement") } diff --git a/lib/go/camli/blobserver/s3/enumerate.go b/lib/go/camli/blobserver/s3/enumerate.go index f7a01dbcf..b336ab7f7 100644 --- a/lib/go/camli/blobserver/s3/enumerate.go +++ b/lib/go/camli/blobserver/s3/enumerate.go @@ -21,14 +21,13 @@ import ( "os" "camli/blobref" - "camli/blobserver" ) var _ = log.Printf func (sto *s3Storage) MaxEnumerate() uint { return 1000 } -func (sto *s3Storage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error { +func (sto *s3Storage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error { defer close(dest) objs, err := sto.s3Client.ListBucket(sto.bucket, after, limit) if err != nil { diff --git a/lib/go/camli/blobserver/s3/receive.go b/lib/go/camli/blobserver/s3/receive.go index 17ddd7153..9b916bf8a 100644 --- a/lib/go/camli/blobserver/s3/receive.go +++ b/lib/go/camli/blobserver/s3/receive.go @@ -100,7 +100,7 @@ func (as *amazonSlurper) Cleanup() { } } -func (sto *s3Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (*blobref.SizedBlobRef, os.Error) { +func (sto *s3Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error) { slurper := newAmazonSlurper(blob) defer slurper.Cleanup() diff --git a/lib/go/camli/blobserver/s3/remove.go b/lib/go/camli/blobserver/s3/remove.go index dd2917bc8..ae2ddeb7e 100644 --- a/lib/go/camli/blobserver/s3/remove.go +++ b/lib/go/camli/blobserver/s3/remove.go @@ -21,12 +21,11 @@ import ( "os" "camli/blobref" - "camli/blobserver" ) var _ = log.Printf -func (sto *s3Storage) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error { +func (sto *s3Storage) Remove(blobs []*blobref.BlobRef) os.Error { // TODO: do these in parallel var reterr os.Error for _, blob := range blobs { diff --git a/lib/go/camli/blobserver/s3/stat.go b/lib/go/camli/blobserver/s3/stat.go index be1c57dd7..be8302a6d 100644 --- a/lib/go/camli/blobserver/s3/stat.go +++ b/lib/go/camli/blobserver/s3/stat.go @@ -21,12 +21,11 @@ import ( "os" "camli/blobref" - "camli/blobserver" ) var _ = log.Printf -func (sto *s3Storage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error { +func (sto *s3Storage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { // TODO: do n stats in parallel for _, br := range blobs { size, err := sto.s3Client.Stat(br.String(), sto.bucket) diff --git a/lib/go/camli/mysqlindexer/enumerate.go b/lib/go/camli/mysqlindexer/enumerate.go index ec7023252..2020ce881 100644 --- a/lib/go/camli/mysqlindexer/enumerate.go +++ b/lib/go/camli/mysqlindexer/enumerate.go @@ -17,11 +17,10 @@ limitations under the License. package mysqlindexer import ( - "camli/blobref" - "camli/blobserver" - "os" + "camli/blobref" + mysql "camli/third_party/github.com/Philio/GoMySQL" ) @@ -30,7 +29,7 @@ type blobRow struct { size int64 } -func (mi *Indexer) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) (err os.Error) { +func (mi *Indexer) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) (err os.Error) { // MySQL connection stuff. var client *mysql.Client client, err = mi.getConnection() diff --git a/lib/go/camli/mysqlindexer/mysqlindexer.go b/lib/go/camli/mysqlindexer/mysqlindexer.go index 36af5ab8c..995206be6 100644 --- a/lib/go/camli/mysqlindexer/mysqlindexer.go +++ b/lib/go/camli/mysqlindexer/mysqlindexer.go @@ -127,6 +127,6 @@ func (mi *Indexer) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, return nil, 0, os.NewError("Fetch isn't supported by the MySQL indexer") } -func (mi *Indexer) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error { +func (mi *Indexer) Remove(blobs []*blobref.BlobRef) os.Error { return os.NewError("Remove isn't supported by the MySQL indexer") } diff --git a/lib/go/camli/mysqlindexer/receive.go b/lib/go/camli/mysqlindexer/receive.go index b0976eb7c..fd3dd09ad 100644 --- a/lib/go/camli/mysqlindexer/receive.go +++ b/lib/go/camli/mysqlindexer/receive.go @@ -91,7 +91,7 @@ func (sn *blobSniffer) bufferIsCamliJson() bool { return true } -func (mi *Indexer) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (retsb *blobref.SizedBlobRef, err os.Error) { +func (mi *Indexer) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader) (retsb *blobref.SizedBlobRef, err os.Error) { sniffer := new(blobSniffer) hash := blobRef.Hash() var written int64 diff --git a/lib/go/camli/mysqlindexer/stat.go b/lib/go/camli/mysqlindexer/stat.go index 88dc371f2..153242bfc 100644 --- a/lib/go/camli/mysqlindexer/stat.go +++ b/lib/go/camli/mysqlindexer/stat.go @@ -18,7 +18,6 @@ package mysqlindexer import ( "camli/blobref" - "camli/blobserver" "log" "fmt" @@ -26,7 +25,7 @@ import ( "strings" ) -func (mi *Indexer) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error { +func (mi *Indexer) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { error := func(err os.Error) os.Error { log.Printf("mysqlindexer: stat error: %v", err) return err diff --git a/server/go/camlistored/camlistored.go b/server/go/camlistored/camlistored.go index 83763c8cd..6f326880a 100644 --- a/server/go/camlistored/camlistored.go +++ b/server/go/camlistored/camlistored.go @@ -97,24 +97,23 @@ func handleCamliUsingStorage(conn http.ResponseWriter, req *http.Request, action case "GET": switch action { case "enumerate-blobs": - handler = auth.RequireAuth(handlers.CreateEnumerateHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateEnumerateHandler(storage)) case "stat": - handler = auth.RequireAuth(handlers.CreateStatHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateStatHandler(storage)) default: handler = handlers.CreateGetHandler(storage) } case "POST": switch action { case "stat": - handler = auth.RequireAuth(handlers.CreateStatHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateStatHandler(storage)) case "upload": - handler = auth.RequireAuth(handlers.CreateUploadHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateUploadHandler(storage)) case "remove": - // Currently only allows removing from a non-main partition. - handler = auth.RequireAuth(handlers.CreateRemoveHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateRemoveHandler(storage)) } case "PUT": // no longer part of spec - handler = auth.RequireAuth(handlers.CreateNonStandardPutHandler(storage, partition)) + handler = auth.RequireAuth(handlers.CreateNonStandardPutHandler(storage)) } handler(conn, req) }