From f345db541184530343f852b77d57a7531bf236c8 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 12 Oct 2011 18:10:58 -0700 Subject: [PATCH] AppEngine storage namespace support. Change-Id: I41d83598cfdef4795d5fbc5763efcad81de84c0a --- .../go/appengine/camli/appengine/storage.go | 216 +++++++++++++----- server/go/appengine/config.json | 6 + 2 files changed, 171 insertions(+), 51 deletions(-) diff --git a/server/go/appengine/camli/appengine/storage.go b/server/go/appengine/camli/appengine/storage.go index 7471bda5c..cf60b3ced 100644 --- a/server/go/appengine/camli/appengine/storage.go +++ b/server/go/appengine/camli/appengine/storage.go @@ -24,6 +24,7 @@ import ( "io/ioutil" "log" "os" + "strings" "appengine" "appengine/datastore" @@ -36,39 +37,102 @@ import ( var _ = log.Printf -const blobKind = "Blob" +const ( + blobKind = "Blob" + memKind = "NsBlobMember" // blob membership in a namespace +) type appengineStorage struct { *blobserver.SimpleBlobHubPartitionMap - - ctx appengine.Context + namespace string // never empty; config initializes to at least "-" + ctx appengine.Context } +// blobEnt is stored once per unique blob, keyed by blobref. type blobEnt struct { - Size []byte // an int64 as "%d" to make it unindexed - BlobKey []byte // an appengine.BlobKey + Size []byte // an int64 as "%d" to make it unindexed + BlobKey []byte // an appengine.BlobKey + Namespaces []byte // |-separated string of namespaces - // TODO(bradfitz): IsCamliSchemaBlob bool? + // TODO(bradfitz): IsCamliSchemaBlob bool? ... probably want + // on enumeration (memEnt) too. +} + +// memEnt is stored once per blob in a namespace, keyed by "ns|blobref" +type memEnt struct { + Size []byte // an int64 as "%d" to make it unindexed } func (b *blobEnt) size() (int64, os.Error) { + return byteDecSize(b.Size) +} + +func (m *memEnt) size() (int64, os.Error) { + return byteDecSize(m.Size) +} + +func byteDecSize(b []byte) (int64, os.Error) { var size int64 - n, err := fmt.Fscanf(bytes.NewBuffer(b.Size), "%d", &size) + n, err := fmt.Fscanf(bytes.NewBuffer(b), "%d", &size) if n != 1 || err != nil { - return 0, fmt.Errorf("invalid Size column in datastore: %q", string(b.Size)) + return 0, fmt.Errorf("invalid Size column in datastore: %q", string(b)) } return size, nil } +func (b *blobEnt) inNamespace(ns string) (out bool) { + defer func() { + log.Printf("inNamespace(%q, %q) = %v", string(b.Namespaces), ns, out) + }() + for _, in := range strings.Split(string(b.Namespaces), "|") { + if ns == in { + return true + } + } + return false +} + +func entKey(c appengine.Context, br *blobref.BlobRef) *datastore.Key { + return datastore.NewKey(c, blobKind, br.String(), 0, nil) +} + +func (s *appengineStorage) memKey(c appengine.Context, br *blobref.BlobRef) *datastore.Key { + return datastore.NewKey(c, memKind, fmt.Sprintf("%s|%s", s.namespace, br.String()), 0, nil) +} + +func fetchEnt(c appengine.Context, br *blobref.BlobRef) (*blobEnt, os.Error) { + row := new(blobEnt) + err := datastore.Get(c, entKey(c, br), row) + if err != nil { + return nil, err + } + return row, nil +} + var errNoContext = os.NewError("Internal error: no App Engine context is available") func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err os.Error) { sto := &appengineStorage{ SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{}, } + sto.namespace = config.OptionalString("namespace", "") if err := config.Validate(); err != nil { return nil, err } + + if strings.Contains(sto.namespace, "|") { + return nil, fmt.Errorf("no pipe allowed in namespace %q", sto.namespace) + } + if strings.Contains(sto.namespace, "\x00") { + return nil, fmt.Errorf("no zero byte allowed in namespace %q", sto.namespace) + } + if sto.namespace == "-" { + return nil, fmt.Errorf("reserved namespace %q", sto.namespace) + } + if sto.namespace == "" { + sto.namespace = "-" + } + return sto, nil } @@ -86,10 +150,7 @@ func (sto *appengineStorage) FetchStreaming(br *blobref.BlobRef) (file io.ReadCl err = errNoContext return } - - key := datastore.NewKey(sto.ctx, blobKind, br.String(), 0, nil) - row := new(blobEnt) - err = datastore.Get(sto.ctx, key, row) + row, err := fetchEnt(sto.ctx, br) if err == datastore.ErrNoSuchEntity { err = os.ENOENT return @@ -97,6 +158,10 @@ func (sto *appengineStorage) FetchStreaming(br *blobref.BlobRef) (file io.ReadCl if err != nil { return } + if !row.inNamespace(sto.namespace) { + err = os.ENOENT + return + } size, err = row.size() if err != nil { return @@ -105,6 +170,8 @@ func (sto *appengineStorage) FetchStreaming(br *blobref.BlobRef) (file io.ReadCl return ioutil.NopCloser(reader), size, nil } +var crossGroupTransaction = &datastore.TransactionOptions{XG: true} + func (sto *appengineStorage) ReceiveBlob(br *blobref.BlobRef, in io.Reader) (sb blobref.SizedBlobRef, err os.Error) { if sto.ctx == nil { err = errNoContext @@ -117,42 +184,89 @@ func (sto *appengineStorage) ReceiveBlob(br *blobref.BlobRef, in io.Reader) (sb if err != nil { return } - if !br.HashMatches(hash) { err = blobserver.ErrCorruptBlob return } - mimeType := "application/octet-stream" - bw, err := blobstore.Create(sto.ctx, mimeType) - if err != nil { - return - } - written, err = io.Copy(bw, &b) - if err != nil { - // TODO(bradfitz): try to clean up; close it, see if we can find the key, delete it. - return - } - err = bw.Close() - if err != nil { - // TODO(bradfitz): try to clean up; see if we can find the key, delete it. - return - } - bkey, err := bw.Key() - if err != nil { - return + + // bkey is non-empty once we've uploaded the blob. + var bkey appengine.BlobKey + + // uploadBlob uploads the blob, unless it's already been done. + uploadBlob := func(ctx appengine.Context) os.Error { + if len(bkey) > 0 { + return nil // already done in previous transaction attempt + } + bw, err := blobstore.Create(ctx, "application/octet-stream") + if err != nil { + return err + } + _, err = io.Copy(bw, &b) + if err != nil { + // TODO(bradfitz): try to clean up; close it, see if we can find the key, delete it. + ctx.Errorf("blobstore Copy error: %v", err) + return err + } + err = bw.Close() + if err != nil { + // TODO(bradfitz): try to clean up; see if we can find the key, delete it. + ctx.Errorf("blobstore Close error: %v", err) + return err + } + k, err := bw.Key() + if err == nil { + bkey = k + } + return err } - var ent blobEnt - ent.Size = []byte(fmt.Sprintf("%d", written)) - ent.BlobKey = []byte(string(bkey)) + tryFunc := func(tc appengine.Context) os.Error { + row, err := fetchEnt(sto.ctx, br) + switch err { + case datastore.ErrNoSuchEntity: + if err := uploadBlob(sto.ctx); err != nil { + tc.Errorf("uploadBlob failed: %v", err) + return err + } + row = &blobEnt{ + Size: []byte(fmt.Sprintf("%d", written)), + BlobKey: []byte(string(bkey)), + Namespaces: []byte(sto.namespace), + } + _, err = datastore.Put(tc, entKey(tc, br), row) + if err != nil { + return err + } + case nil: + if row.inNamespace(sto.namespace) { + // Nothing to do + return nil + } + row.Namespaces = []byte(string(row.Namespaces) + "|" + sto.namespace) + _, err = datastore.Put(tc, entKey(tc, br), row) + if err != nil { + return err + } + default: + return err + } - dkey := datastore.NewKey(sto.ctx, blobKind, br.String(), 0, nil) - _, err = datastore.Put(sto.ctx, dkey, &ent) + // Add membership row + _, err = datastore.Put(tc, sto.memKey(tc, br), &memEnt{ + Size: []byte(fmt.Sprintf("%d", written)), + }) + return err + } + err = datastore.RunInTransaction(sto.ctx, tryFunc, crossGroupTransaction) if err != nil { - blobstore.Delete(sto.ctx, bkey) // TODO: insert into task queue on error to try later? + if len(bkey) > 0 { + // If we just created this blob but we + // ultimately failed, try our best to delete + // it so it's not orphaned. + blobstore.Delete(sto.ctx, bkey) + } return } - return blobref.SizedBlobRef{br, written}, nil } @@ -173,8 +287,8 @@ func (sto *appengineStorage) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs [ errs = make([]os.Error, len(blobs)) ) for _, br := range blobs { - keys = append(keys, datastore.NewKey(sto.ctx, blobKind, br.String(), 0, nil)) - out = append(out, new(blobEnt)) + keys = append(keys, sto.memKey(sto.ctx, br)) + out = append(out, new(memEnt)) } err := datastore.GetMulti(sto.ctx, keys, out) if merr, ok := err.(datastore.ErrMulti); ok { @@ -193,12 +307,12 @@ func (sto *appengineStorage) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs [ err = errs[i] // just return last one found? continue } - ent := out[i].(*blobEnt) + ent := out[i].(*memEnt) size, err := ent.size() if err == nil { dest <- blobref.SizedBlobRef{br, size} } else { - sto.ctx.Warningf("skipping corrupt blob %s with Size %q during Stat", br, string(ent.Size)) + sto.ctx.Warningf("skipping corrupt blob %s: %v", br, err) } } return err @@ -209,26 +323,26 @@ func (sto *appengineStorage) EnumerateBlobs(dest chan<- blobref.SizedBlobRef, af if sto.ctx == nil { return errNoContext } - q := datastore.NewQuery(blobKind).Limit(int(limit)) - if after != "" { - akey := datastore.NewKey(sto.ctx, blobKind, after, 0, nil) - q = q.Filter("__key__>", akey) - } + prefix := sto.namespace + "|" + keyBegin := datastore.NewKey(sto.ctx, memKind, prefix+after, 0, nil) + keyEnd := datastore.NewKey(sto.ctx, memKind, sto.namespace+"~", 0, nil) + + q := datastore.NewQuery(memKind).Limit(int(limit)).Filter("__key__>", keyBegin).Filter("__key__<", keyEnd) it := q.Run(sto.ctx) - var ent blobEnt + var row memEnt for { - key, err := it.Next(&ent) + key, err := it.Next(&row) if err == datastore.Done { break } if err != nil { return err } - size, err := ent.size() + size, err := row.size() if err != nil { return err } - dest <- blobref.SizedBlobRef{blobref.Parse(key.StringID()), size} + dest <- blobref.SizedBlobRef{blobref.Parse(key.StringID()[len(prefix):]), size} } return nil } diff --git a/server/go/appengine/config.json b/server/go/appengine/config.json index 2d9f21cb5..647454bea 100644 --- a/server/go/appengine/config.json +++ b/server/go/appengine/config.json @@ -7,6 +7,12 @@ "handler": "storage-appengine", "handlerArgs": { } + }, + "/bs2/": { + "handler": "storage-appengine", + "handlerArgs": { + "namespace": "two" + } } } }