Cleanup: remove partitions from interfaces. WIP but compiles.

Still need to fix up blobhub notification for localdisk
mirroring, since now localdisk can't find the notification
hub for mirrored partitions.  (and also can't be configured
yet)
This commit is contained in:
Brad Fitzpatrick 2011-05-09 09:11:18 -07:00
parent 77cfa748f5
commit bed26de507
26 changed files with 141 additions and 146 deletions

View File

@ -47,7 +47,7 @@ func (cf *CachingFetcher) Fetch(br *blobref.BlobRef) (file blobref.ReadSeekClose
return nil, 0, err
}
_, err = cf.c.ReceiveBlob(br, sblob, nil)
_, err = cf.c.ReceiveBlob(br, sblob)
if err != nil {
return nil, 0, err
}

View File

@ -121,28 +121,18 @@ func (h *SimpleBlobHub) UnregisterBlobListener(blob *blobref.BlobRef, ch chan *b
type SimpleBlobHubPartitionMap struct {
hubLock sync.Mutex
hubMap map[string]BlobHub
hub BlobHub
}
func (spm *SimpleBlobHubPartitionMap) GetBlobHub(partition Partition) BlobHub {
name := ""
if partition != nil {
name = partition.Name()
}
func (spm *SimpleBlobHubPartitionMap) GetBlobHub() BlobHub {
spm.hubLock.Lock()
defer spm.hubLock.Unlock()
if spm.hubMap == nil {
spm.hubMap = make(map[string]BlobHub)
if spm.hub == nil {
// TODO: in the future, allow for different blob hub
// implementations rather than the
// everything-in-memory-on-a-single-machine SimpleBlobHub.
spm.hub = new(SimpleBlobHub)
}
if hub, ok := spm.hubMap[name]; ok {
return hub
}
// TODO: in the future, allow for different blob hub
// implementations rather than the
// everything-in-memory-on-a-single-machine SimpleBlobHub.
hub := new(SimpleBlobHub)
spm.hubMap[name] = hub
return hub
return spm.hub
}

View File

@ -36,15 +36,15 @@ type blobInfo struct {
}
func CreateEnumerateHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) {
func CreateEnumerateHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) {
return func(conn http.ResponseWriter, req *http.Request) {
handleEnumerateBlobs(conn, req, storage, partition)
handleEnumerateBlobs(conn, req, storage)
}
}
const errMsgMaxWaitSecWithAfter = "Can't use 'maxwaitsec' with 'after'.\n"
func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobEnumerator, partition blobserver.Partition) {
func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobEnumerator) {
// Potential input parameters
formValueLimit := req.FormValue("limit")
formValueMaxWaitSec := req.FormValue("maxwaitsec")
@ -92,7 +92,7 @@ func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage b
blobch := make(chan *blobref.SizedBlobRef, 100)
resultch := make(chan os.Error, 1)
go func() {
resultch <- storage.EnumerateBlobs(blobch, partition, formValueAfter, limit+1, waitSeconds)
resultch <- storage.EnumerateBlobs(blobch, formValueAfter, limit+1, waitSeconds)
}()
after := ""

View File

@ -18,7 +18,6 @@ package handlers
import (
"camli/blobref"
"camli/blobserver"
. "camli/test/asserts"
"http"
"http/httptest"
@ -43,7 +42,6 @@ type emptyEnumerator struct {
}
func (ee *emptyEnumerator) EnumerateBlobs(dest chan *blobref.SizedBlobRef,
partition blobserver.Partition,
after string,
limit uint,
waitSeconds int) os.Error {
@ -75,7 +73,7 @@ func TestEnumerateInput(t *testing.T) {
wr := httptest.NewRecorder()
wr.Code = 200 // default
req := makeGetRequest(test.url)
handleEnumerateBlobs(wr, req, enumerator, nil) // TODO: use better partition
handleEnumerateBlobs(wr, req, enumerator)
ExpectInt(t, test.expectedCode, wr.Code, "response code for " + test.name)
ExpectString(t, test.expectedBody, wr.Body.String(), "output for " + test.name)
}

View File

@ -27,20 +27,25 @@ import (
const maxRemovesPerRequest = 1000
func CreateRemoveHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) {
func CreateRemoveHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) {
return func(conn http.ResponseWriter, req *http.Request) {
handleRemove(conn, req, storage, partition)
handleRemove(conn, req, storage)
}
}
func handleRemove(conn http.ResponseWriter, req *http.Request, storage blobserver.Storage, partition blobserver.Partition) {
func handleRemove(conn http.ResponseWriter, req *http.Request, storage blobserver.Storage) {
if req.Method != "POST" {
log.Fatalf("Invalid method; handlers misconfigured")
}
if !partition.IsQueue() {
configer, ok := storage.(blobserver.Configer)
if !ok {
conn.WriteHeader(http.StatusForbidden)
fmt.Fprintf(conn, "Can only remove blobs from a queue partition.\n")
fmt.Fprintf(conn, "Remove handler's blobserver.Storage isn't a blobserver.Configuer; can't remove")
return
}
if !configer.Config().IsQueue {
conn.WriteHeader(http.StatusForbidden)
fmt.Fprintf(conn, "Can only remove blobs from a queue.\n")
return
}
@ -68,7 +73,7 @@ func handleRemove(conn http.ResponseWriter, req *http.Request, storage blobserve
toRemoveStr = append(toRemoveStr, ref.String())
}
err := storage.Remove(partition, toRemove)
err := storage.Remove(toRemove)
if err != nil {
conn.WriteHeader(http.StatusInternalServerError)
log.Printf("Server error during remove: %v", err)

View File

@ -27,15 +27,15 @@ import (
"strconv"
)
func CreateStatHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) {
func CreateStatHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) {
return func(conn http.ResponseWriter, req *http.Request) {
handleStat(conn, req, storage, partition)
handleStat(conn, req, storage)
}
}
const maxStatBlobs = 1000
func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobStatter, partition blobserver.Partition) {
func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobStatter) {
toStat := make([]*blobref.BlobRef, 0)
switch req.Method {
case "POST":
@ -91,7 +91,7 @@ func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver.
blobch := make(chan *blobref.SizedBlobRef)
resultch := make(chan os.Error, 1)
go func() {
err := storage.Stat(blobch, partition, toStat, waitSeconds)
err := storage.Stat(blobch, toStat, waitSeconds)
close(blobch)
resultch <- err
}()
@ -111,7 +111,8 @@ func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver.
}
}
ret := commonUploadResponse(partition, req)
configer, _ := storage.(blobserver.Configer)
ret := commonUploadResponse(configer, req)
ret["stat"] = statRes
ret["canLongPoll"] = true
httputil.ReturnJson(conn, ret)

View File

@ -30,13 +30,13 @@ import (
"strings"
)
func CreateUploadHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) {
func CreateUploadHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) {
return func(conn http.ResponseWriter, req *http.Request) {
handleMultiPartUpload(conn, req, storage, partition)
handleMultiPartUpload(conn, req, storage)
}
}
func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver, partition blobserver.Partition) {
func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver) {
if !(req.Method == "POST" && strings.Contains(req.URL.Path, "/camli/upload")) {
httputil.BadRequestError(conn, "Inconfigured handler.")
return
@ -96,7 +96,7 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece
continue
}
blobGot, err := blobReceiver.ReceiveBlob(ref, mimePart, partition.GetMirrorPartitions())
blobGot, err := blobReceiver.ReceiveBlob(ref, mimePart)
if err != nil {
addError(fmt.Sprintf("Error receiving blob %v: %v\n", ref, err))
break
@ -106,7 +106,8 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece
}
log.Println("Done reading multipart body.")
ret := commonUploadResponse(partition, req)
configer, _ := blobReceiver.(blobserver.Configer) // TODO: ugly?
ret := commonUploadResponse(configer, req)
received := make([]map[string]interface{}, 0)
for _, got := range receivedBlobs {
@ -125,15 +126,17 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece
httputil.ReturnJson(conn, ret)
}
func commonUploadResponse(partition blobserver.Partition, req *http.Request) map[string]interface{} {
func commonUploadResponse(configer blobserver.Configer, req *http.Request) map[string]interface{} {
ret := make(map[string]interface{})
ret["maxUploadSize"] = 2147483647 // 2GB.. *shrug*
ret["maxUploadSize"] = 2147483647 // 2GB.. *shrug*. TODO: cut this down, standardize
ret["uploadUrlExpirationSeconds"] = 86400
// TODO: camli/upload isn't part of the spec. we should pick
// something different here just to make it obvious that this
// isn't a well-known URL and facilitate lazy clients.
ret["uploadUrl"] = partition.URLBase() + "/camli/upload"
// isn't a well-known URL and accidentally encourage lazy clients.
if configer != nil {
ret["uploadUrl"] = configer.Config().URLBase + "/camli/upload"
}
return ret
}
@ -141,13 +144,13 @@ func commonUploadResponse(partition blobserver.Partition, req *http.Request) map
var kPutPattern *regexp.Regexp = regexp.MustCompile(`^/camli/([a-z0-9]+)-([a-f0-9]+)$`)
// NOTE: not part of the spec at present. old. might be re-introduced.
func CreateNonStandardPutHandler(storage blobserver.Storage, partition blobserver.Partition) func(http.ResponseWriter, *http.Request) {
func CreateNonStandardPutHandler(storage blobserver.Storage) func(http.ResponseWriter, *http.Request) {
return func(conn http.ResponseWriter, req *http.Request) {
handlePut(conn, req, storage, partition)
handlePut(conn, req, storage)
}
}
func handlePut(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver, partition blobserver.Partition) {
func handlePut(conn http.ResponseWriter, req *http.Request, blobReceiver blobserver.BlobReceiver) {
blobRef := blobref.FromPattern(kPutPattern, req.URL.Path)
if blobRef == nil {
httputil.BadRequestError(conn, "Malformed PUT URL.")
@ -159,7 +162,7 @@ func handlePut(conn http.ResponseWriter, req *http.Request, blobReceiver blobser
return
}
_, err := blobReceiver.ReceiveBlob(blobRef, req.Body, partition.GetMirrorPartitions())
_, err := blobReceiver.ReceiveBlob(blobRef, req.Body)
if err != nil {
httputil.ServerError(conn, err)
return

View File

@ -45,20 +45,16 @@ type Partition interface {
type BlobReceiver interface {
// ReceiveBlob accepts a newly uploaded blob and writes it to
// disk.
//
// mirrorPartitions may not be supported by all instances
// and may return an error if used.
ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []Partition) (*blobref.SizedBlobRef, os.Error)
ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error)
}
type BlobStatter interface {
// Stat checks for the existence of blobs, writing their sizes
// (if found back to the dest channel), and returning an error
// or nil. Stat() should NOT close the channel.
// waitSeconds is the max time to wait for the blobs to exist
// in the given partition, or 0 for no delay.
// waitSeconds is the max time to wait for the blobs to exist,
// or 0 for no delay.
Stat(dest chan *blobref.SizedBlobRef,
partition Partition,
blobs []*blobref.BlobRef,
waitSeconds int) os.Error
}
@ -74,12 +70,11 @@ type BlobEnumerator interface {
// sorted, as long as they are lexigraphically greater than
// after (if provided).
// limit will be supplied and sanity checked by caller.
// waitSeconds is the max time to wait for any blobs to exist
// in the given partition, or 0 for no delay.
// waitSeconds is the max time to wait for any blobs to exist,
// or 0 for no delay.
// EnumerateBlobs must close the channel. (even if limit
// was hit and more blobs remain)
EnumerateBlobs(dest chan *blobref.SizedBlobRef,
partition Partition,
after string,
limit uint,
waitSeconds int) os.Error
@ -92,20 +87,29 @@ type Cache interface {
BlobStatter
}
type Config struct {
Writable, Readable bool
IsQueue bool // supports deletes
// the "http://host:port" and optional path (but without trailing slash) to have "/camli/*" appended
URLBase string
}
type Configer interface {
Config() *Config
}
type Storage interface {
blobref.StreamingFetcher
BlobReceiver
BlobStatter
BlobEnumerator
// Remove 0 or more blobs from provided partition, which
// should be empty for the default partition. Removal of
// non-existent items isn't an error. Returns failure if any
// items existed but failed to be deleted.
Remove(partition Partition, blobs []*blobref.BlobRef) os.Error
// Remove 0 or more blobs. Removal of non-existent items
// isn't an error. Returns failure if any items existed but
// failed to be deleted.
Remove(blobs []*blobref.BlobRef) os.Error
// Returns the blob notification bus for a given partition.
// Use nil for the default partition.
// TODO: move this to be a method on the Partition interface?
GetBlobHub(partition Partition) BlobHub
// Returns the blob notification bus
GetBlobHub() BlobHub
}

View File

@ -17,13 +17,13 @@ limitations under the License.
package localdisk
import (
"camli/blobref"
"camli/blobserver"
"fmt"
"os"
"sort"
"strings"
"time"
"camli/blobref"
)
type readBlobRequest struct {
@ -110,8 +110,8 @@ func readBlobs(opts readBlobRequest) os.Error {
return nil
}
func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error {
dirRoot := ds.PartitionRoot(partition)
func (ds *DiskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error {
dirRoot := ds.PartitionRoot(ds.partition)
limitMutable := limit
var err os.Error
doScan := func() {
@ -132,7 +132,7 @@ func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition
// The case where we have to wait for waitSeconds for any blob
// to possibly appear.
hub := ds.GetBlobHub(partition)
hub := ds.GetBlobHub()
ch := make(chan *blobref.BlobRef, 1)
hub.RegisterListener(ch)
defer hub.UnregisterListener(ch)

View File

@ -18,7 +18,6 @@ package localdisk
import (
"camli/blobref"
"camli/blobserver"
. "camli/test/asserts"
"fmt"
@ -29,8 +28,6 @@ import (
"time"
)
var defaultPartition blobserver.Partition = nil
func TestEnumerate(t *testing.T) {
ds := NewStorage(t)
defer cleanUp(ds)
@ -49,7 +46,7 @@ func TestEnumerate(t *testing.T) {
ch := make(chan *blobref.SizedBlobRef)
errCh := make(chan os.Error)
go func() {
errCh <- ds.EnumerateBlobs(ch, defaultPartition, "", limit, waitSeconds)
errCh <- ds.EnumerateBlobs(ch, "", limit, waitSeconds)
}()
var sb *blobref.SizedBlobRef
@ -69,7 +66,7 @@ func TestEnumerate(t *testing.T) {
// Now again, but skipping foo's blob
ch = make(chan *blobref.SizedBlobRef)
go func() {
errCh <- ds.EnumerateBlobs(ch, defaultPartition,
errCh <- ds.EnumerateBlobs(ch,
foo.BlobRef().String(),
limit, waitSeconds)
}()
@ -93,7 +90,7 @@ func TestEnumerateEmpty(t *testing.T) {
ch := make(chan *blobref.SizedBlobRef)
errCh := make(chan os.Error)
go func() {
errCh <- ds.EnumerateBlobs(ch, defaultPartition,
errCh <- ds.EnumerateBlobs(ch,
"", limit, waitSeconds)
}()
@ -110,7 +107,7 @@ func TestEnumerateEmptyLongPoll(t *testing.T) {
ch := make(chan *blobref.SizedBlobRef)
errCh := make(chan os.Error)
go func() {
errCh <- ds.EnumerateBlobs(ch, defaultPartition,
errCh <- ds.EnumerateBlobs(ch,
"", limit, waitSeconds)
}()
@ -177,7 +174,7 @@ func TestEnumerateIsSorted(t *testing.T) {
ch := make(chan *blobref.SizedBlobRef)
errCh := make(chan os.Error)
go func() {
errCh <- ds.EnumerateBlobs(ch, defaultPartition, test.after, limit, 0)
errCh <- ds.EnumerateBlobs(ch, test.after, limit, 0)
}()
var got = make([]*blobref.SizedBlobRef, 0, blobsToMake)
for {

View File

@ -27,15 +27,15 @@ import (
"camli/jsonconfig"
)
type diskStorage struct {
type DiskStorage struct {
*blobserver.SimpleBlobHubPartitionMap
root string
}
// TODO: lazy hack because I didn't want to rename diskStorage everywhere
// during an experiment. should just rename it now.
type DiskStorage struct {
*diskStorage
// the sub-partition to read from.
partition blobserver.Partition
// to mirror new blobs into (when partition above is the default partition)
mirrorPartitions []blobserver.Partition
}
func New(root string) (storage *DiskStorage, err os.Error) {
@ -45,15 +45,16 @@ func New(root string) (storage *DiskStorage, err os.Error) {
err = os.NewError(fmt.Sprintf("Storage root %q doesn't exist or is not a directory.", root))
return
}
storage = &DiskStorage{&diskStorage{
&blobserver.SimpleBlobHubPartitionMap{},
root,
}}
storage = &DiskStorage{
SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{},
root: root,
partition: nil, // TODO: this will probably crash elsewhere
}
return
}
func newFromConfig(config jsonconfig.Obj) (storage blobserver.Storage, err os.Error) {
sto := &diskStorage{
sto := &DiskStorage{
SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{},
root: config.RequiredString("path"),
}
@ -74,11 +75,11 @@ func init() {
blobserver.RegisterStorageConstructor("filesystem", blobserver.StorageConstructor(newFromConfig))
}
func (ds *diskStorage) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, os.Error) {
func (ds *DiskStorage) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, os.Error) {
return ds.Fetch(blob)
}
func (ds *diskStorage) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int64, os.Error) {
func (ds *DiskStorage) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int64, os.Error) {
fileName := ds.blobPath(nil, blob)
stat, err := os.Stat(fileName)
if errorIsNoEnt(err) {
@ -94,9 +95,9 @@ func (ds *diskStorage) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int
return file, stat.Size, nil
}
func (ds *diskStorage) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error {
func (ds *DiskStorage) Remove(blobs []*blobref.BlobRef) os.Error {
for _, blob := range blobs {
fileName := ds.blobPath(partition, blob)
fileName := ds.blobPath(ds.partition, blob)
err := os.Remove(fileName)
switch {
case err == nil:

View File

@ -30,7 +30,7 @@ import (
"time"
)
func cleanUp(ds *diskStorage) {
func cleanUp(ds *DiskStorage) {
os.RemoveAll(ds.root)
}
@ -39,7 +39,7 @@ var (
rootEpoch = 0
)
func NewStorage(t *testing.T) *diskStorage {
func NewStorage(t *testing.T) *DiskStorage {
epochLock.Lock()
rootEpoch++
path := fmt.Sprintf("%s/camli-testroot-%d-%d", os.TempDir(), os.Getpid(), rootEpoch)
@ -51,7 +51,7 @@ func NewStorage(t *testing.T) *diskStorage {
if err != nil {
t.Fatalf("Failed to run New: %v", err)
}
return ds.diskStorage
return ds
}
type testBlob struct {
@ -86,7 +86,7 @@ func (tb *testBlob) AssertMatches(t *testing.T, sb *blobref.SizedBlobRef) {
}
func (tb *testBlob) ExpectUploadBlob(t *testing.T, ds blobserver.BlobReceiver) {
sb, err := ds.ReceiveBlob(tb.BlobRef(), tb.Reader(), nil)
sb, err := ds.ReceiveBlob(tb.BlobRef(), tb.Reader())
if err != nil {
t.Fatalf("ReceiveBlob error: %v", err)
}
@ -103,7 +103,7 @@ func TestReceiveStat(t *testing.T) {
ch := make(chan *blobref.SizedBlobRef, 0)
errch := make(chan os.Error, 1)
go func() {
errch <- ds.Stat(ch, defaultPartition, tb.BlobRefSlice(), 0)
errch <- ds.Stat(ch, tb.BlobRefSlice(), 0)
close(ch)
}()
got := 0
@ -126,7 +126,7 @@ func TestStatWait(t *testing.T) {
ch := make(chan *blobref.SizedBlobRef, 0)
errch := make(chan os.Error, 1)
go func() {
errch <- ds.Stat(ch, defaultPartition, tb.BlobRefSlice(), waitSeconds)
errch <- ds.Stat(ch, tb.BlobRefSlice(), waitSeconds)
close(ch)
}()
@ -167,7 +167,6 @@ func TestMultiStat(t *testing.T) {
errch := make(chan os.Error, 1)
go func() {
errch <- ds.Stat(ch,
defaultPartition,
[]*blobref.BlobRef{blobfoo.BlobRef(), blobbar.BlobRef()},
0)
close(ch)

View File

@ -28,7 +28,7 @@ func BlobFileBaseName(b *blobref.BlobRef) string {
return fmt.Sprintf("%s-%s.dat", b.HashName(), b.Digest())
}
func (ds *diskStorage) blobDirectory(partition blobserver.NamedPartition, b *blobref.BlobRef) string {
func (ds *DiskStorage) blobDirectory(partition blobserver.NamedPartition, b *blobref.BlobRef) string {
d := b.Digest()
if len(d) < 6 {
d = d + "______"
@ -36,11 +36,11 @@ func (ds *diskStorage) blobDirectory(partition blobserver.NamedPartition, b *blo
return fmt.Sprintf("%s/%s/%s/%s", ds.PartitionRoot(partition), b.HashName(), d[0:3], d[3:6])
}
func (ds *diskStorage) blobPath(partition blobserver.NamedPartition, b *blobref.BlobRef) string {
func (ds *DiskStorage) blobPath(partition blobserver.NamedPartition, b *blobref.BlobRef) string {
return fmt.Sprintf("%s/%s", ds.blobDirectory(partition, b), BlobFileBaseName(b))
}
func (ds *diskStorage) PartitionRoot(partition blobserver.NamedPartition) string {
func (ds *DiskStorage) PartitionRoot(partition blobserver.NamedPartition) string {
if partition == nil {
return ds.root
}

View File

@ -29,7 +29,7 @@ func (pn partitionName) Name() string {
func TestPaths(t *testing.T) {
br := blobref.Parse("digalg-abc")
ds := &diskStorage{root: "/tmp/dir"}
ds := &DiskStorage{root: "/tmp/dir"}
if e, g := "/tmp/dir/digalg/abc/___", ds.blobDirectory(nil, br); e != g {
t.Errorf("short blobref dir; expected path %q; got %q", e, g)

View File

@ -29,7 +29,7 @@ import (
var flagOpenImages = flag.Bool("showimages", false, "Show images on receiving them with eog.")
func (ds *diskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, mirrorPartitions []blobserver.Partition) (blobGot *blobref.SizedBlobRef, err os.Error) {
func (ds *DiskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader) (blobGot *blobref.SizedBlobRef, err os.Error) {
hashedDirectory := ds.blobDirectory(nil, blobRef)
err = os.MkdirAll(hashedDirectory, 0700)
if err != nil {
@ -82,7 +82,7 @@ func (ds *diskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, m
return
}
for _, partition := range mirrorPartitions {
for _, partition := range ds.mirrorPartitions {
partitionDir := ds.blobDirectory(partition, blobRef)
if err = os.MkdirAll(partitionDir, 0700); err != nil {
return
@ -107,11 +107,14 @@ func (ds *diskStorage) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, m
exec.MergeWithStdout)
}
hub := ds.GetBlobHub(nil)
hub := ds.GetBlobHub()
hub.NotifyBlobReceived(blobRef)
for _, partition := range mirrorPartitions {
hub = ds.GetBlobHub(partition)
hub.NotifyBlobReceived(blobRef)
for _, partition := range ds.mirrorPartitions {
partition = partition
// TODO: need to get the other blobserver.Storage for
// the mirrors, not just their partition names,
// because we need their blob hubs now to wake them
// up.
}
return
}

View File

@ -17,16 +17,16 @@ limitations under the License.
package localdisk
import (
"camli/blobref"
"camli/blobserver"
"os"
"sync"
"time"
"camli/blobref"
)
const maxParallelStats = 20
func (ds *diskStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
func (ds *DiskStorage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
if len(blobs) == 0 {
return nil
}
@ -35,7 +35,7 @@ func (ds *diskStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserve
var missing []*blobref.BlobRef
statSend := func(ref *blobref.BlobRef, appendMissing bool) os.Error {
fi, err := os.Stat(ds.blobPath(partition, ref))
fi, err := os.Stat(ds.blobPath(ds.partition, ref))
switch {
case err == nil && fi.IsRegular():
dest <- &blobref.SizedBlobRef{BlobRef: ref, Size: fi.Size}
@ -76,7 +76,7 @@ func (ds *diskStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserve
// TODO: use a flag, defaulting to 60?
waitSeconds = 60
}
hub := ds.GetBlobHub(partition)
hub := ds.GetBlobHub()
ch := make(chan *blobref.BlobRef, 1)
for _, missblob := range missing {
hub.RegisterBlobListener(missblob, ch)

View File

@ -49,15 +49,15 @@ func newFromConfig(config jsonconfig.Obj) (storage blobserver.Storage, err os.Er
return sto, nil
}
func (sto *remoteStorage) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error {
func (sto *remoteStorage) Remove(blobs []*blobref.BlobRef) os.Error {
return os.NewError("TODO: implement")
}
func (sto *remoteStorage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
func (sto *remoteStorage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
return os.NewError("TODO: implement")
}
func (sto *remoteStorage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (*blobref.SizedBlobRef, os.Error) {
func (sto *remoteStorage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error) {
return nil, os.NewError("TODO: implement")
}
@ -67,7 +67,7 @@ func (sto *remoteStorage) FetchStreaming(b *blobref.BlobRef) (file io.ReadCloser
func (sto *remoteStorage) MaxEnumerate() uint { return 1000 }
func (sto *remoteStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error {
func (sto *remoteStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error {
defer close(dest)
return os.NewError("TODO: implement")
}

View File

@ -21,14 +21,13 @@ import (
"os"
"camli/blobref"
"camli/blobserver"
)
var _ = log.Printf
func (sto *s3Storage) MaxEnumerate() uint { return 1000 }
func (sto *s3Storage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error {
func (sto *s3Storage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error {
defer close(dest)
objs, err := sto.s3Client.ListBucket(sto.bucket, after, limit)
if err != nil {

View File

@ -100,7 +100,7 @@ func (as *amazonSlurper) Cleanup() {
}
}
func (sto *s3Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (*blobref.SizedBlobRef, os.Error) {
func (sto *s3Storage) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (*blobref.SizedBlobRef, os.Error) {
slurper := newAmazonSlurper(blob)
defer slurper.Cleanup()

View File

@ -21,12 +21,11 @@ import (
"os"
"camli/blobref"
"camli/blobserver"
)
var _ = log.Printf
func (sto *s3Storage) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error {
func (sto *s3Storage) Remove(blobs []*blobref.BlobRef) os.Error {
// TODO: do these in parallel
var reterr os.Error
for _, blob := range blobs {

View File

@ -21,12 +21,11 @@ import (
"os"
"camli/blobref"
"camli/blobserver"
)
var _ = log.Printf
func (sto *s3Storage) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
func (sto *s3Storage) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
// TODO: do n stats in parallel
for _, br := range blobs {
size, err := sto.s3Client.Stat(br.String(), sto.bucket)

View File

@ -17,11 +17,10 @@ limitations under the License.
package mysqlindexer
import (
"camli/blobref"
"camli/blobserver"
"os"
"camli/blobref"
mysql "camli/third_party/github.com/Philio/GoMySQL"
)
@ -30,7 +29,7 @@ type blobRow struct {
size int64
}
func (mi *Indexer) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) (err os.Error) {
func (mi *Indexer) EnumerateBlobs(dest chan *blobref.SizedBlobRef, after string, limit uint, waitSeconds int) (err os.Error) {
// MySQL connection stuff.
var client *mysql.Client
client, err = mi.getConnection()

View File

@ -127,6 +127,6 @@ func (mi *Indexer) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64,
return nil, 0, os.NewError("Fetch isn't supported by the MySQL indexer")
}
func (mi *Indexer) Remove(partition blobserver.Partition, blobs []*blobref.BlobRef) os.Error {
func (mi *Indexer) Remove(blobs []*blobref.BlobRef) os.Error {
return os.NewError("Remove isn't supported by the MySQL indexer")
}

View File

@ -91,7 +91,7 @@ func (sn *blobSniffer) bufferIsCamliJson() bool {
return true
}
func (mi *Indexer) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader, mirrorPartions []blobserver.Partition) (retsb *blobref.SizedBlobRef, err os.Error) {
func (mi *Indexer) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader) (retsb *blobref.SizedBlobRef, err os.Error) {
sniffer := new(blobSniffer)
hash := blobRef.Hash()
var written int64

View File

@ -18,7 +18,6 @@ package mysqlindexer
import (
"camli/blobref"
"camli/blobserver"
"log"
"fmt"
@ -26,7 +25,7 @@ import (
"strings"
)
func (mi *Indexer) Stat(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
func (mi *Indexer) Stat(dest chan *blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
error := func(err os.Error) os.Error {
log.Printf("mysqlindexer: stat error: %v", err)
return err

View File

@ -97,24 +97,23 @@ func handleCamliUsingStorage(conn http.ResponseWriter, req *http.Request, action
case "GET":
switch action {
case "enumerate-blobs":
handler = auth.RequireAuth(handlers.CreateEnumerateHandler(storage, partition))
handler = auth.RequireAuth(handlers.CreateEnumerateHandler(storage))
case "stat":
handler = auth.RequireAuth(handlers.CreateStatHandler(storage, partition))
handler = auth.RequireAuth(handlers.CreateStatHandler(storage))
default:
handler = handlers.CreateGetHandler(storage)
}
case "POST":
switch action {
case "stat":
handler = auth.RequireAuth(handlers.CreateStatHandler(storage, partition))
handler = auth.RequireAuth(handlers.CreateStatHandler(storage))
case "upload":
handler = auth.RequireAuth(handlers.CreateUploadHandler(storage, partition))
handler = auth.RequireAuth(handlers.CreateUploadHandler(storage))
case "remove":
// Currently only allows removing from a non-main partition.
handler = auth.RequireAuth(handlers.CreateRemoveHandler(storage, partition))
handler = auth.RequireAuth(handlers.CreateRemoveHandler(storage))
}
case "PUT": // no longer part of spec
handler = auth.RequireAuth(handlers.CreateNonStandardPutHandler(storage, partition))
handler = auth.RequireAuth(handlers.CreateNonStandardPutHandler(storage))
}
handler(conn, req)
}