2011-10-05 21:34:55 +00:00
|
|
|
/*
|
|
|
|
Copyright 2011 Google Inc.
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package appengine
|
|
|
|
|
|
|
|
import (
|
2011-10-07 01:10:29 +00:00
|
|
|
"bytes"
|
2011-10-07 00:44:30 +00:00
|
|
|
"http"
|
2011-10-05 21:34:55 +00:00
|
|
|
"io"
|
2011-10-10 01:41:54 +00:00
|
|
|
"io/ioutil"
|
2011-10-07 06:24:40 +00:00
|
|
|
"log"
|
2011-10-05 21:34:55 +00:00
|
|
|
"os"
|
|
|
|
|
2011-10-07 00:44:30 +00:00
|
|
|
"appengine"
|
2011-10-07 01:10:29 +00:00
|
|
|
"appengine/datastore"
|
|
|
|
"appengine/blobstore"
|
2011-10-07 00:44:30 +00:00
|
|
|
|
2011-10-05 21:34:55 +00:00
|
|
|
"camli/blobref"
|
|
|
|
"camli/blobserver"
|
|
|
|
"camli/jsonconfig"
|
|
|
|
)
|
|
|
|
|
2011-10-07 06:24:40 +00:00
|
|
|
var _ = log.Printf
|
|
|
|
|
|
|
|
const blobKind = "Blob"
|
|
|
|
|
2011-10-05 21:34:55 +00:00
|
|
|
type appengineStorage struct {
|
|
|
|
*blobserver.SimpleBlobHubPartitionMap
|
2011-10-07 00:44:30 +00:00
|
|
|
|
|
|
|
ctx appengine.Context
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
|
2011-10-07 01:10:29 +00:00
|
|
|
type blobEnt struct {
|
|
|
|
BlobRefStr string
|
|
|
|
Size int64
|
|
|
|
BlobKey appengine.BlobKey
|
|
|
|
// TODO(bradfitz): IsCamliSchemaBlob bool
|
|
|
|
}
|
|
|
|
|
2011-10-07 00:44:30 +00:00
|
|
|
var errNoContext = os.NewError("Internal error: no App Engine context is available")
|
|
|
|
|
2011-10-05 21:34:55 +00:00
|
|
|
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err os.Error) {
|
|
|
|
sto := &appengineStorage{
|
|
|
|
SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{},
|
|
|
|
}
|
|
|
|
if err := config.Validate(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return sto, nil
|
|
|
|
}
|
|
|
|
|
2011-10-07 00:44:30 +00:00
|
|
|
var _ blobserver.ContextWrapper = (*appengineStorage)(nil)
|
|
|
|
|
|
|
|
func (sto *appengineStorage) WrapContext(req *http.Request) blobserver.Storage {
|
|
|
|
s2 := new(appengineStorage)
|
|
|
|
*s2 = *sto
|
|
|
|
s2.ctx = appengine.NewContext(req)
|
|
|
|
return s2
|
|
|
|
}
|
|
|
|
|
2011-10-05 21:34:55 +00:00
|
|
|
func (sto *appengineStorage) FetchStreaming(br *blobref.BlobRef) (file io.ReadCloser, size int64, err os.Error) {
|
2011-10-07 00:44:30 +00:00
|
|
|
if sto.ctx == nil {
|
|
|
|
err = errNoContext
|
|
|
|
return
|
|
|
|
}
|
2011-10-10 01:41:54 +00:00
|
|
|
|
|
|
|
key := datastore.NewKey(sto.ctx, blobKind, br.String(), 0, nil)
|
|
|
|
row := new(blobEnt)
|
|
|
|
err = datastore.Get(sto.ctx, key, row)
|
|
|
|
if err == datastore.ErrNoSuchEntity {
|
|
|
|
err = os.ENOENT
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
reader := blobstore.NewReader(sto.ctx, row.BlobKey)
|
|
|
|
return ioutil.NopCloser(reader), row.Size, nil
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
|
2011-10-07 01:10:29 +00:00
|
|
|
func (sto *appengineStorage) ReceiveBlob(br *blobref.BlobRef, in io.Reader) (sb blobref.SizedBlobRef, err os.Error) {
|
2011-10-07 00:44:30 +00:00
|
|
|
if sto.ctx == nil {
|
|
|
|
err = errNoContext
|
|
|
|
return
|
|
|
|
}
|
2011-10-07 01:10:29 +00:00
|
|
|
|
|
|
|
var b bytes.Buffer
|
|
|
|
hash := br.Hash()
|
|
|
|
written, err := io.Copy(io.MultiWriter(hash, &b), in)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
if !br.HashMatches(hash) {
|
|
|
|
err = blobserver.ErrCorruptBlob
|
|
|
|
return
|
|
|
|
}
|
|
|
|
mimeType := "application/octet-stream"
|
|
|
|
bw, err := blobstore.Create(sto.ctx, mimeType)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
written, err = io.Copy(bw, &b)
|
|
|
|
if err != nil {
|
|
|
|
// TODO(bradfitz): try to clean up; close it, see if we can find the key, delete it.
|
|
|
|
return
|
|
|
|
}
|
|
|
|
err = bw.Close()
|
|
|
|
if err != nil {
|
|
|
|
// TODO(bradfitz): try to clean up; see if we can find the key, delete it.
|
|
|
|
return
|
|
|
|
}
|
|
|
|
bkey, err := bw.Key()
|
|
|
|
if err != nil {
|
2011-10-07 06:24:40 +00:00
|
|
|
return
|
|
|
|
}
|
2011-10-07 01:10:29 +00:00
|
|
|
|
|
|
|
var ent blobEnt
|
|
|
|
ent.BlobRefStr = br.String()
|
|
|
|
ent.Size = written
|
|
|
|
ent.BlobKey = bkey
|
|
|
|
|
2011-10-07 06:24:40 +00:00
|
|
|
dkey := datastore.NewKey(sto.ctx, blobKind, br.String(), 0, nil)
|
2011-10-07 01:10:29 +00:00
|
|
|
_, err = datastore.Put(sto.ctx, dkey, &ent)
|
|
|
|
if err != nil {
|
2011-10-07 06:24:40 +00:00
|
|
|
blobstore.Delete(sto.ctx, bkey) // TODO: insert into task queue on error to try later?
|
2011-10-07 01:10:29 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
return blobref.SizedBlobRef{br, written}, nil
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (sto *appengineStorage) RemoveBlobs(blobs []*blobref.BlobRef) os.Error {
|
2011-10-07 00:44:30 +00:00
|
|
|
if sto.ctx == nil {
|
|
|
|
return errNoContext
|
|
|
|
}
|
|
|
|
return os.NewError("TODO-AppEngine-RemoveBlobs")
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (sto *appengineStorage) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
|
2011-10-07 00:44:30 +00:00
|
|
|
if sto.ctx == nil {
|
|
|
|
return errNoContext
|
|
|
|
}
|
2011-10-07 06:24:40 +00:00
|
|
|
var (
|
|
|
|
keys = make([]*datastore.Key, 0, len(blobs))
|
|
|
|
out = make([]interface{}, 0, len(blobs))
|
|
|
|
errs = make([]os.Error, len(blobs))
|
|
|
|
)
|
|
|
|
for _, br := range blobs {
|
|
|
|
keys = append(keys, datastore.NewKey(sto.ctx, blobKind, br.String(), 0, nil))
|
|
|
|
out = append(out, new(blobEnt))
|
|
|
|
}
|
|
|
|
err := datastore.GetMulti(sto.ctx, keys, out)
|
|
|
|
if merr, ok := err.(datastore.ErrMulti); ok {
|
|
|
|
errs = []os.Error(merr)
|
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for i, br := range blobs {
|
|
|
|
thisErr := errs[i]
|
|
|
|
if thisErr == datastore.ErrNoSuchEntity {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if thisErr != nil {
|
|
|
|
err = errs[i] // just return last one found?
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
ent := out[i].(*blobEnt)
|
|
|
|
if ent.BlobRefStr != "" {
|
|
|
|
dest <- blobref.SizedBlobRef{br, ent.Size}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return err
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (sto *appengineStorage) EnumerateBlobs(dest chan<- blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error {
|
2011-10-07 00:44:30 +00:00
|
|
|
if sto.ctx == nil {
|
|
|
|
return errNoContext
|
|
|
|
}
|
|
|
|
return os.NewError("TODO-AppEngine-EnumerateBlobs")
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|