AppEngine storage namespace support.

Change-Id: I41d83598cfdef4795d5fbc5763efcad81de84c0a
This commit is contained in:
Brad Fitzpatrick 2011-10-12 18:10:58 -07:00
parent d2d1018f59
commit f345db5411
2 changed files with 171 additions and 51 deletions

View File

@ -24,6 +24,7 @@ import (
"io/ioutil"
"log"
"os"
"strings"
"appengine"
"appengine/datastore"
@ -36,39 +37,102 @@ import (
var _ = log.Printf
const blobKind = "Blob"
const (
blobKind = "Blob"
memKind = "NsBlobMember" // blob membership in a namespace
)
type appengineStorage struct {
*blobserver.SimpleBlobHubPartitionMap
ctx appengine.Context
namespace string // never empty; config initializes to at least "-"
ctx appengine.Context
}
// blobEnt is stored once per unique blob, keyed by blobref.
type blobEnt struct {
Size []byte // an int64 as "%d" to make it unindexed
BlobKey []byte // an appengine.BlobKey
Size []byte // an int64 as "%d" to make it unindexed
BlobKey []byte // an appengine.BlobKey
Namespaces []byte // |-separated string of namespaces
// TODO(bradfitz): IsCamliSchemaBlob bool?
// TODO(bradfitz): IsCamliSchemaBlob bool? ... probably want
// on enumeration (memEnt) too.
}
// memEnt is stored once per blob in a namespace, keyed by "ns|blobref"
type memEnt struct {
Size []byte // an int64 as "%d" to make it unindexed
}
func (b *blobEnt) size() (int64, os.Error) {
return byteDecSize(b.Size)
}
func (m *memEnt) size() (int64, os.Error) {
return byteDecSize(m.Size)
}
func byteDecSize(b []byte) (int64, os.Error) {
var size int64
n, err := fmt.Fscanf(bytes.NewBuffer(b.Size), "%d", &size)
n, err := fmt.Fscanf(bytes.NewBuffer(b), "%d", &size)
if n != 1 || err != nil {
return 0, fmt.Errorf("invalid Size column in datastore: %q", string(b.Size))
return 0, fmt.Errorf("invalid Size column in datastore: %q", string(b))
}
return size, nil
}
func (b *blobEnt) inNamespace(ns string) (out bool) {
defer func() {
log.Printf("inNamespace(%q, %q) = %v", string(b.Namespaces), ns, out)
}()
for _, in := range strings.Split(string(b.Namespaces), "|") {
if ns == in {
return true
}
}
return false
}
func entKey(c appengine.Context, br *blobref.BlobRef) *datastore.Key {
return datastore.NewKey(c, blobKind, br.String(), 0, nil)
}
func (s *appengineStorage) memKey(c appengine.Context, br *blobref.BlobRef) *datastore.Key {
return datastore.NewKey(c, memKind, fmt.Sprintf("%s|%s", s.namespace, br.String()), 0, nil)
}
func fetchEnt(c appengine.Context, br *blobref.BlobRef) (*blobEnt, os.Error) {
row := new(blobEnt)
err := datastore.Get(c, entKey(c, br), row)
if err != nil {
return nil, err
}
return row, nil
}
var errNoContext = os.NewError("Internal error: no App Engine context is available")
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err os.Error) {
sto := &appengineStorage{
SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{},
}
sto.namespace = config.OptionalString("namespace", "")
if err := config.Validate(); err != nil {
return nil, err
}
if strings.Contains(sto.namespace, "|") {
return nil, fmt.Errorf("no pipe allowed in namespace %q", sto.namespace)
}
if strings.Contains(sto.namespace, "\x00") {
return nil, fmt.Errorf("no zero byte allowed in namespace %q", sto.namespace)
}
if sto.namespace == "-" {
return nil, fmt.Errorf("reserved namespace %q", sto.namespace)
}
if sto.namespace == "" {
sto.namespace = "-"
}
return sto, nil
}
@ -86,10 +150,7 @@ func (sto *appengineStorage) FetchStreaming(br *blobref.BlobRef) (file io.ReadCl
err = errNoContext
return
}
key := datastore.NewKey(sto.ctx, blobKind, br.String(), 0, nil)
row := new(blobEnt)
err = datastore.Get(sto.ctx, key, row)
row, err := fetchEnt(sto.ctx, br)
if err == datastore.ErrNoSuchEntity {
err = os.ENOENT
return
@ -97,6 +158,10 @@ func (sto *appengineStorage) FetchStreaming(br *blobref.BlobRef) (file io.ReadCl
if err != nil {
return
}
if !row.inNamespace(sto.namespace) {
err = os.ENOENT
return
}
size, err = row.size()
if err != nil {
return
@ -105,6 +170,8 @@ func (sto *appengineStorage) FetchStreaming(br *blobref.BlobRef) (file io.ReadCl
return ioutil.NopCloser(reader), size, nil
}
var crossGroupTransaction = &datastore.TransactionOptions{XG: true}
func (sto *appengineStorage) ReceiveBlob(br *blobref.BlobRef, in io.Reader) (sb blobref.SizedBlobRef, err os.Error) {
if sto.ctx == nil {
err = errNoContext
@ -117,42 +184,89 @@ func (sto *appengineStorage) ReceiveBlob(br *blobref.BlobRef, in io.Reader) (sb
if err != nil {
return
}
if !br.HashMatches(hash) {
err = blobserver.ErrCorruptBlob
return
}
mimeType := "application/octet-stream"
bw, err := blobstore.Create(sto.ctx, mimeType)
if err != nil {
return
}
written, err = io.Copy(bw, &b)
if err != nil {
// TODO(bradfitz): try to clean up; close it, see if we can find the key, delete it.
return
}
err = bw.Close()
if err != nil {
// TODO(bradfitz): try to clean up; see if we can find the key, delete it.
return
}
bkey, err := bw.Key()
if err != nil {
return
// bkey is non-empty once we've uploaded the blob.
var bkey appengine.BlobKey
// uploadBlob uploads the blob, unless it's already been done.
uploadBlob := func(ctx appengine.Context) os.Error {
if len(bkey) > 0 {
return nil // already done in previous transaction attempt
}
bw, err := blobstore.Create(ctx, "application/octet-stream")
if err != nil {
return err
}
_, err = io.Copy(bw, &b)
if err != nil {
// TODO(bradfitz): try to clean up; close it, see if we can find the key, delete it.
ctx.Errorf("blobstore Copy error: %v", err)
return err
}
err = bw.Close()
if err != nil {
// TODO(bradfitz): try to clean up; see if we can find the key, delete it.
ctx.Errorf("blobstore Close error: %v", err)
return err
}
k, err := bw.Key()
if err == nil {
bkey = k
}
return err
}
var ent blobEnt
ent.Size = []byte(fmt.Sprintf("%d", written))
ent.BlobKey = []byte(string(bkey))
tryFunc := func(tc appengine.Context) os.Error {
row, err := fetchEnt(sto.ctx, br)
switch err {
case datastore.ErrNoSuchEntity:
if err := uploadBlob(sto.ctx); err != nil {
tc.Errorf("uploadBlob failed: %v", err)
return err
}
row = &blobEnt{
Size: []byte(fmt.Sprintf("%d", written)),
BlobKey: []byte(string(bkey)),
Namespaces: []byte(sto.namespace),
}
_, err = datastore.Put(tc, entKey(tc, br), row)
if err != nil {
return err
}
case nil:
if row.inNamespace(sto.namespace) {
// Nothing to do
return nil
}
row.Namespaces = []byte(string(row.Namespaces) + "|" + sto.namespace)
_, err = datastore.Put(tc, entKey(tc, br), row)
if err != nil {
return err
}
default:
return err
}
dkey := datastore.NewKey(sto.ctx, blobKind, br.String(), 0, nil)
_, err = datastore.Put(sto.ctx, dkey, &ent)
// Add membership row
_, err = datastore.Put(tc, sto.memKey(tc, br), &memEnt{
Size: []byte(fmt.Sprintf("%d", written)),
})
return err
}
err = datastore.RunInTransaction(sto.ctx, tryFunc, crossGroupTransaction)
if err != nil {
blobstore.Delete(sto.ctx, bkey) // TODO: insert into task queue on error to try later?
if len(bkey) > 0 {
// If we just created this blob but we
// ultimately failed, try our best to delete
// it so it's not orphaned.
blobstore.Delete(sto.ctx, bkey)
}
return
}
return blobref.SizedBlobRef{br, written}, nil
}
@ -173,8 +287,8 @@ func (sto *appengineStorage) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs [
errs = make([]os.Error, len(blobs))
)
for _, br := range blobs {
keys = append(keys, datastore.NewKey(sto.ctx, blobKind, br.String(), 0, nil))
out = append(out, new(blobEnt))
keys = append(keys, sto.memKey(sto.ctx, br))
out = append(out, new(memEnt))
}
err := datastore.GetMulti(sto.ctx, keys, out)
if merr, ok := err.(datastore.ErrMulti); ok {
@ -193,12 +307,12 @@ func (sto *appengineStorage) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs [
err = errs[i] // just return last one found?
continue
}
ent := out[i].(*blobEnt)
ent := out[i].(*memEnt)
size, err := ent.size()
if err == nil {
dest <- blobref.SizedBlobRef{br, size}
} else {
sto.ctx.Warningf("skipping corrupt blob %s with Size %q during Stat", br, string(ent.Size))
sto.ctx.Warningf("skipping corrupt blob %s: %v", br, err)
}
}
return err
@ -209,26 +323,26 @@ func (sto *appengineStorage) EnumerateBlobs(dest chan<- blobref.SizedBlobRef, af
if sto.ctx == nil {
return errNoContext
}
q := datastore.NewQuery(blobKind).Limit(int(limit))
if after != "" {
akey := datastore.NewKey(sto.ctx, blobKind, after, 0, nil)
q = q.Filter("__key__>", akey)
}
prefix := sto.namespace + "|"
keyBegin := datastore.NewKey(sto.ctx, memKind, prefix+after, 0, nil)
keyEnd := datastore.NewKey(sto.ctx, memKind, sto.namespace+"~", 0, nil)
q := datastore.NewQuery(memKind).Limit(int(limit)).Filter("__key__>", keyBegin).Filter("__key__<", keyEnd)
it := q.Run(sto.ctx)
var ent blobEnt
var row memEnt
for {
key, err := it.Next(&ent)
key, err := it.Next(&row)
if err == datastore.Done {
break
}
if err != nil {
return err
}
size, err := ent.size()
size, err := row.size()
if err != nil {
return err
}
dest <- blobref.SizedBlobRef{blobref.Parse(key.StringID()), size}
dest <- blobref.SizedBlobRef{blobref.Parse(key.StringID()[len(prefix):]), size}
}
return nil
}

View File

@ -7,6 +7,12 @@
"handler": "storage-appengine",
"handlerArgs": {
}
},
"/bs2/": {
"handler": "storage-appengine",
"handlerArgs": {
"namespace": "two"
}
}
}
}