2012-03-26 18:11:57 +00:00
|
|
|
// +build appengine
|
|
|
|
|
2011-10-05 21:34:55 +00:00
|
|
|
/*
|
|
|
|
Copyright 2011 Google Inc.
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package appengine
|
|
|
|
|
|
|
|
import (
|
2011-10-07 01:10:29 +00:00
|
|
|
"bytes"
|
2011-10-11 01:48:34 +00:00
|
|
|
"fmt"
|
2011-10-05 21:34:55 +00:00
|
|
|
"io"
|
|
|
|
"os"
|
2011-10-13 01:10:58 +00:00
|
|
|
"strings"
|
2013-01-09 03:43:09 +00:00
|
|
|
"sync"
|
2011-10-05 21:34:55 +00:00
|
|
|
|
2011-10-07 00:44:30 +00:00
|
|
|
"appengine"
|
2011-10-07 01:10:29 +00:00
|
|
|
"appengine/blobstore"
|
2013-01-09 02:41:06 +00:00
|
|
|
"appengine/datastore"
|
2011-10-07 00:44:30 +00:00
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
"camlistore.org/pkg/blob"
|
Update from r60 to [almost] Go 1.
A lot is still broken, but most stuff at least compiles now.
The directory tree has been rearranged now too. Go libraries are now
under "pkg". Fully qualified, they are e.g. "camlistore.org/pkg/jsonsign".
The go tool cannot yet fetch from arbitrary domains, but discussion is
happening now on which mechanism to use to allow that.
For now, put the camlistore root under $GOPATH/src. Typically $GOPATH
is $HOME, so Camlistore should be at $HOME/src/camlistore.org.
Then you can:
$ go build ./server/camlistored
... etc
The build.pl script is currently disabled. It'll be resurrected at
some point, but with a very different role (helping create a fake
GOPATH and running the go build command, if things are installed at
the wrong place, and/or running fileembed generators).
Many things are certainly broken.
Many things are disabled. (MySQL, all indexing, etc).
Many things need to be moved into
camlistore.org/third_party/{code.google.com,github.com} and updated
from their r60 to Go 1 versions, where applicable.
The GoMySQL stuff should be updated to use database/sql and the ziutek
library implementing database/sql/driver.
Help wanted.
Change-Id: If71217dc5c8f0e70dbe46e9504ca5131c6eeacde
2012-02-19 05:53:06 +00:00
|
|
|
"camlistore.org/pkg/blobserver"
|
2013-12-02 21:20:51 +00:00
|
|
|
"camlistore.org/pkg/context"
|
Update from r60 to [almost] Go 1.
A lot is still broken, but most stuff at least compiles now.
The directory tree has been rearranged now too. Go libraries are now
under "pkg". Fully qualified, they are e.g. "camlistore.org/pkg/jsonsign".
The go tool cannot yet fetch from arbitrary domains, but discussion is
happening now on which mechanism to use to allow that.
For now, put the camlistore root under $GOPATH/src. Typically $GOPATH
is $HOME, so Camlistore should be at $HOME/src/camlistore.org.
Then you can:
$ go build ./server/camlistored
... etc
The build.pl script is currently disabled. It'll be resurrected at
some point, but with a very different role (helping create a fake
GOPATH and running the go build command, if things are installed at
the wrong place, and/or running fileembed generators).
Many things are certainly broken.
Many things are disabled. (MySQL, all indexing, etc).
Many things need to be moved into
camlistore.org/third_party/{code.google.com,github.com} and updated
from their r60 to Go 1 versions, where applicable.
The GoMySQL stuff should be updated to use database/sql and the ziutek
library implementing database/sql/driver.
Help wanted.
Change-Id: If71217dc5c8f0e70dbe46e9504ca5131c6eeacde
2012-02-19 05:53:06 +00:00
|
|
|
"camlistore.org/pkg/jsonconfig"
|
2011-10-05 21:34:55 +00:00
|
|
|
)
|
|
|
|
|
2011-10-13 01:10:58 +00:00
|
|
|
const (
|
|
|
|
blobKind = "Blob"
|
|
|
|
memKind = "NsBlobMember" // blob membership in a namespace
|
|
|
|
)
|
2011-10-07 06:24:40 +00:00
|
|
|
|
2011-11-02 04:37:10 +00:00
|
|
|
var _ blobserver.Storage = (*appengineStorage)(nil)
|
|
|
|
|
2011-10-05 21:34:55 +00:00
|
|
|
type appengineStorage struct {
|
2011-10-13 01:10:58 +00:00
|
|
|
namespace string // never empty; config initializes to at least "-"
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
|
2011-10-13 01:10:58 +00:00
|
|
|
// blobEnt is stored once per unique blob, keyed by blobref.
|
2011-10-07 01:10:29 +00:00
|
|
|
type blobEnt struct {
|
2013-01-09 22:23:22 +00:00
|
|
|
Size int64 `datastore:"Size,noindex"`
|
|
|
|
BlobKey appengine.BlobKey `datastore:"BlobKey,noindex"`
|
|
|
|
Namespaces string `datastore:"Namespaces,noindex"` // |-separated string of namespaces
|
2011-10-11 00:58:59 +00:00
|
|
|
|
2011-10-13 01:10:58 +00:00
|
|
|
// TODO(bradfitz): IsCamliSchemaBlob bool? ... probably want
|
|
|
|
// on enumeration (memEnt) too.
|
|
|
|
}
|
|
|
|
|
|
|
|
// memEnt is stored once per blob in a namespace, keyed by "ns|blobref"
|
|
|
|
type memEnt struct {
|
2013-01-09 22:23:22 +00:00
|
|
|
Size int64 `datastore:"Size,noindex"`
|
2011-10-13 01:10:58 +00:00
|
|
|
}
|
|
|
|
|
2013-01-08 01:20:38 +00:00
|
|
|
func byteDecSize(b []byte) (int64, error) {
|
2011-10-11 01:48:34 +00:00
|
|
|
var size int64
|
2011-10-13 01:10:58 +00:00
|
|
|
n, err := fmt.Fscanf(bytes.NewBuffer(b), "%d", &size)
|
2011-10-11 01:48:34 +00:00
|
|
|
if n != 1 || err != nil {
|
2011-10-13 01:10:58 +00:00
|
|
|
return 0, fmt.Errorf("invalid Size column in datastore: %q", string(b))
|
2011-10-11 01:48:34 +00:00
|
|
|
}
|
|
|
|
return size, nil
|
|
|
|
}
|
|
|
|
|
2011-10-13 01:10:58 +00:00
|
|
|
func (b *blobEnt) inNamespace(ns string) (out bool) {
|
2013-01-09 22:23:22 +00:00
|
|
|
for _, in := range strings.Split(b.Namespaces, "|") {
|
2011-10-13 01:10:58 +00:00
|
|
|
if ns == in {
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return false
|
|
|
|
}
|
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
func entKey(c appengine.Context, br blob.Ref) *datastore.Key {
|
2011-10-13 01:10:58 +00:00
|
|
|
return datastore.NewKey(c, blobKind, br.String(), 0, nil)
|
|
|
|
}
|
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
func (s *appengineStorage) memKey(c appengine.Context, br blob.Ref) *datastore.Key {
|
2011-10-13 01:10:58 +00:00
|
|
|
return datastore.NewKey(c, memKind, fmt.Sprintf("%s|%s", s.namespace, br.String()), 0, nil)
|
|
|
|
}
|
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
func fetchEnt(c appengine.Context, br blob.Ref) (*blobEnt, error) {
|
2011-10-13 01:10:58 +00:00
|
|
|
row := new(blobEnt)
|
|
|
|
err := datastore.Get(c, entKey(c, br), row)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return row, nil
|
|
|
|
}
|
|
|
|
|
2013-01-08 01:20:38 +00:00
|
|
|
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
|
2011-10-05 21:34:55 +00:00
|
|
|
sto := &appengineStorage{
|
2013-08-21 23:51:10 +00:00
|
|
|
namespace: config.OptionalString("namespace", ""),
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
if err := config.Validate(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2011-10-22 01:42:09 +00:00
|
|
|
sto.namespace, err = sanitizeNamespace(sto.namespace)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
2011-10-13 01:10:58 +00:00
|
|
|
}
|
2011-10-05 21:34:55 +00:00
|
|
|
return sto, nil
|
|
|
|
}
|
|
|
|
|
2014-03-14 19:11:08 +00:00
|
|
|
func (sto *appengineStorage) Fetch(br blob.Ref) (file io.ReadCloser, size uint32, err error) {
|
2013-08-21 23:51:10 +00:00
|
|
|
loan := ctxPool.Get()
|
|
|
|
ctx := loan
|
|
|
|
defer func() {
|
|
|
|
if loan != nil {
|
|
|
|
loan.Return()
|
|
|
|
}
|
|
|
|
}()
|
2013-01-09 03:43:09 +00:00
|
|
|
|
|
|
|
row, err := fetchEnt(ctx, br)
|
2011-10-10 01:41:54 +00:00
|
|
|
if err == datastore.ErrNoSuchEntity {
|
2012-05-13 19:06:21 +00:00
|
|
|
err = os.ErrNotExist
|
2011-10-10 01:41:54 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
2011-10-13 01:10:58 +00:00
|
|
|
if !row.inNamespace(sto.namespace) {
|
2012-05-13 19:06:21 +00:00
|
|
|
err = os.ErrNotExist
|
2011-10-13 01:10:58 +00:00
|
|
|
return
|
|
|
|
}
|
2013-08-21 23:51:10 +00:00
|
|
|
|
|
|
|
closeLoan := loan
|
|
|
|
var c io.Closer = &onceCloser{fn: func() { closeLoan.Return() }}
|
|
|
|
loan = nil // take it, so it's not defer-closed
|
|
|
|
|
2013-01-09 03:43:09 +00:00
|
|
|
reader := blobstore.NewReader(ctx, appengine.BlobKey(string(row.BlobKey)))
|
|
|
|
type readCloser struct {
|
|
|
|
io.Reader
|
|
|
|
io.Closer
|
|
|
|
}
|
2014-02-08 17:25:21 +00:00
|
|
|
return readCloser{reader, c}, uint32(row.Size), nil
|
2013-01-09 03:43:09 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
type onceCloser struct {
|
|
|
|
once sync.Once
|
|
|
|
fn func()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (oc *onceCloser) Close() error {
|
|
|
|
oc.once.Do(oc.fn)
|
|
|
|
return nil
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
|
2011-10-13 01:10:58 +00:00
|
|
|
var crossGroupTransaction = &datastore.TransactionOptions{XG: true}
|
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
func (sto *appengineStorage) ReceiveBlob(br blob.Ref, in io.Reader) (sb blob.SizedRef, err error) {
|
2013-08-21 23:51:10 +00:00
|
|
|
loan := ctxPool.Get()
|
|
|
|
defer loan.Return()
|
|
|
|
ctx := loan
|
2011-10-07 01:10:29 +00:00
|
|
|
|
|
|
|
var b bytes.Buffer
|
2013-08-21 20:57:28 +00:00
|
|
|
written, err := io.Copy(&b, in)
|
2011-10-07 01:10:29 +00:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
2011-10-13 01:10:58 +00:00
|
|
|
|
|
|
|
// bkey is non-empty once we've uploaded the blob.
|
|
|
|
var bkey appengine.BlobKey
|
|
|
|
|
|
|
|
// uploadBlob uploads the blob, unless it's already been done.
|
2013-01-08 01:20:38 +00:00
|
|
|
uploadBlob := func(ctx appengine.Context) error {
|
2011-10-13 01:10:58 +00:00
|
|
|
if len(bkey) > 0 {
|
|
|
|
return nil // already done in previous transaction attempt
|
|
|
|
}
|
|
|
|
bw, err := blobstore.Create(ctx, "application/octet-stream")
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
_, err = io.Copy(bw, &b)
|
|
|
|
if err != nil {
|
|
|
|
// TODO(bradfitz): try to clean up; close it, see if we can find the key, delete it.
|
|
|
|
ctx.Errorf("blobstore Copy error: %v", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
err = bw.Close()
|
|
|
|
if err != nil {
|
|
|
|
// TODO(bradfitz): try to clean up; see if we can find the key, delete it.
|
|
|
|
ctx.Errorf("blobstore Close error: %v", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
k, err := bw.Key()
|
|
|
|
if err == nil {
|
|
|
|
bkey = k
|
|
|
|
}
|
|
|
|
return err
|
2011-10-07 06:24:40 +00:00
|
|
|
}
|
2011-10-07 01:10:29 +00:00
|
|
|
|
2013-01-08 01:20:38 +00:00
|
|
|
tryFunc := func(tc appengine.Context) error {
|
2013-01-09 03:43:09 +00:00
|
|
|
row, err := fetchEnt(tc, br)
|
2011-10-13 01:10:58 +00:00
|
|
|
switch err {
|
|
|
|
case datastore.ErrNoSuchEntity:
|
2013-01-09 03:43:09 +00:00
|
|
|
if err := uploadBlob(tc); err != nil {
|
2011-10-13 01:10:58 +00:00
|
|
|
tc.Errorf("uploadBlob failed: %v", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
row = &blobEnt{
|
2013-01-09 22:23:22 +00:00
|
|
|
Size: written,
|
|
|
|
BlobKey: bkey,
|
|
|
|
Namespaces: sto.namespace,
|
2011-10-13 01:10:58 +00:00
|
|
|
}
|
|
|
|
_, err = datastore.Put(tc, entKey(tc, br), row)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
case nil:
|
|
|
|
if row.inNamespace(sto.namespace) {
|
|
|
|
// Nothing to do
|
|
|
|
return nil
|
|
|
|
}
|
2013-01-09 22:23:22 +00:00
|
|
|
row.Namespaces = row.Namespaces + "|" + sto.namespace
|
2011-10-13 01:10:58 +00:00
|
|
|
_, err = datastore.Put(tc, entKey(tc, br), row)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return err
|
|
|
|
}
|
2011-10-07 01:10:29 +00:00
|
|
|
|
2011-10-13 01:10:58 +00:00
|
|
|
// Add membership row
|
|
|
|
_, err = datastore.Put(tc, sto.memKey(tc, br), &memEnt{
|
2013-01-09 22:23:22 +00:00
|
|
|
Size: written,
|
2011-10-13 01:10:58 +00:00
|
|
|
})
|
|
|
|
return err
|
|
|
|
}
|
2013-01-09 03:43:09 +00:00
|
|
|
err = datastore.RunInTransaction(ctx, tryFunc, crossGroupTransaction)
|
2011-10-07 01:10:29 +00:00
|
|
|
if err != nil {
|
2011-10-13 01:10:58 +00:00
|
|
|
if len(bkey) > 0 {
|
|
|
|
// If we just created this blob but we
|
|
|
|
// ultimately failed, try our best to delete
|
|
|
|
// it so it's not orphaned.
|
2013-01-09 03:43:09 +00:00
|
|
|
blobstore.Delete(ctx, bkey)
|
2011-10-13 01:10:58 +00:00
|
|
|
}
|
2011-10-07 01:10:29 +00:00
|
|
|
return
|
|
|
|
}
|
2014-02-08 17:25:21 +00:00
|
|
|
return blob.SizedRef{br, uint32(written)}, nil
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
|
2011-11-02 07:05:02 +00:00
|
|
|
// NOTE(bslatkin): No fucking clue if this works.
|
2013-08-04 02:54:30 +00:00
|
|
|
func (sto *appengineStorage) RemoveBlobs(blobs []blob.Ref) error {
|
2013-08-21 23:51:10 +00:00
|
|
|
loan := ctxPool.Get()
|
|
|
|
defer loan.Return()
|
|
|
|
ctx := loan
|
2011-11-02 07:05:02 +00:00
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
tryFunc := func(tc appengine.Context, br blob.Ref) error {
|
2011-11-02 07:05:02 +00:00
|
|
|
// TODO(bslatkin): Make the DB gets in this a multi-get.
|
|
|
|
// Remove the namespace from the blobEnt
|
2013-01-09 03:43:09 +00:00
|
|
|
row, err := fetchEnt(tc, br)
|
2011-11-02 07:05:02 +00:00
|
|
|
switch err {
|
|
|
|
case datastore.ErrNoSuchEntity:
|
|
|
|
// Doesn't exist, that means there should be no memEnt, but let's be
|
|
|
|
// paranoid and double check anyways.
|
|
|
|
case nil:
|
|
|
|
// blobEnt exists, remove our namespace from it if possible.
|
|
|
|
newNS := []string{}
|
|
|
|
for _, val := range strings.Split(string(row.Namespaces), "|") {
|
|
|
|
if val != sto.namespace {
|
|
|
|
newNS = append(newNS, val)
|
|
|
|
}
|
|
|
|
}
|
2013-01-09 22:23:22 +00:00
|
|
|
if v := strings.Join(newNS, "|"); v != row.Namespaces {
|
|
|
|
row.Namespaces = v
|
2011-11-02 07:05:02 +00:00
|
|
|
_, err = datastore.Put(tc, entKey(tc, br), row)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Blindly delete the memEnt.
|
2013-01-09 02:41:06 +00:00
|
|
|
err = datastore.Delete(tc, sto.memKey(tc, br))
|
2011-11-02 07:05:02 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, br := range blobs {
|
|
|
|
ret := datastore.RunInTransaction(
|
2013-01-09 03:43:09 +00:00
|
|
|
ctx,
|
2013-01-08 01:20:38 +00:00
|
|
|
func(tc appengine.Context) error {
|
2011-11-02 07:05:02 +00:00
|
|
|
return tryFunc(tc, br)
|
|
|
|
},
|
|
|
|
crossGroupTransaction)
|
|
|
|
if ret != nil {
|
|
|
|
return ret
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
|
2013-08-21 23:51:10 +00:00
|
|
|
func (sto *appengineStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
|
|
|
|
loan := ctxPool.Get()
|
|
|
|
defer loan.Return()
|
|
|
|
ctx := loan
|
2013-01-09 03:43:09 +00:00
|
|
|
|
2011-10-07 06:24:40 +00:00
|
|
|
var (
|
|
|
|
keys = make([]*datastore.Key, 0, len(blobs))
|
|
|
|
out = make([]interface{}, 0, len(blobs))
|
2013-01-08 01:20:38 +00:00
|
|
|
errs = make([]error, len(blobs))
|
2011-10-07 06:24:40 +00:00
|
|
|
)
|
|
|
|
for _, br := range blobs {
|
2013-01-09 03:43:09 +00:00
|
|
|
keys = append(keys, sto.memKey(ctx, br))
|
2011-10-13 01:10:58 +00:00
|
|
|
out = append(out, new(memEnt))
|
2011-10-07 06:24:40 +00:00
|
|
|
}
|
2013-01-09 03:43:09 +00:00
|
|
|
err := datastore.GetMulti(ctx, keys, out)
|
2013-01-08 01:20:38 +00:00
|
|
|
if merr, ok := err.(appengine.MultiError); ok {
|
|
|
|
errs = []error(merr)
|
2011-10-07 06:24:40 +00:00
|
|
|
err = nil
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for i, br := range blobs {
|
|
|
|
thisErr := errs[i]
|
|
|
|
if thisErr == datastore.ErrNoSuchEntity {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
if thisErr != nil {
|
|
|
|
err = errs[i] // just return last one found?
|
|
|
|
continue
|
|
|
|
}
|
2011-10-13 01:10:58 +00:00
|
|
|
ent := out[i].(*memEnt)
|
2014-02-08 17:25:21 +00:00
|
|
|
dest <- blob.SizedRef{br, uint32(ent.Size)}
|
2011-10-07 06:24:40 +00:00
|
|
|
}
|
|
|
|
return err
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
|
|
|
|
2013-12-02 21:20:51 +00:00
|
|
|
func (sto *appengineStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
|
2011-10-11 00:58:59 +00:00
|
|
|
defer close(dest)
|
2013-01-09 03:43:09 +00:00
|
|
|
|
2013-08-21 23:51:10 +00:00
|
|
|
loan := ctxPool.Get()
|
|
|
|
defer loan.Return()
|
2013-12-02 21:20:51 +00:00
|
|
|
actx := loan
|
2013-01-09 03:43:09 +00:00
|
|
|
|
2011-10-13 01:10:58 +00:00
|
|
|
prefix := sto.namespace + "|"
|
2013-12-02 21:20:51 +00:00
|
|
|
keyBegin := datastore.NewKey(actx, memKind, prefix+after, 0, nil)
|
|
|
|
keyEnd := datastore.NewKey(actx, memKind, sto.namespace+"~", 0, nil)
|
2011-10-13 01:10:58 +00:00
|
|
|
|
|
|
|
q := datastore.NewQuery(memKind).Limit(int(limit)).Filter("__key__>", keyBegin).Filter("__key__<", keyEnd)
|
2013-12-02 21:20:51 +00:00
|
|
|
it := q.Run(actx)
|
2011-10-13 01:10:58 +00:00
|
|
|
var row memEnt
|
2011-10-11 00:58:59 +00:00
|
|
|
for {
|
2011-10-13 01:10:58 +00:00
|
|
|
key, err := it.Next(&row)
|
2011-10-11 00:58:59 +00:00
|
|
|
if err == datastore.Done {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2013-12-02 21:20:51 +00:00
|
|
|
select {
|
2014-02-08 17:25:21 +00:00
|
|
|
case dest <- blob.SizedRef{blob.ParseOrZero(key.StringID()[len(prefix):]), uint32(row.Size)}:
|
2013-12-02 21:20:51 +00:00
|
|
|
case <-ctx.Done():
|
|
|
|
return context.ErrCanceled
|
|
|
|
}
|
2011-10-11 00:58:59 +00:00
|
|
|
}
|
|
|
|
return nil
|
2011-10-05 21:34:55 +00:00
|
|
|
}
|
2011-12-04 23:19:28 +00:00
|
|
|
|
Get rid of QueueCreator and all its associated complexity.
Previous TODO entry was:
-- Get rid of QueueCreator entirely. Plan:
-- sync handler still has a source and dest (one pair) but
instead of calling CreateQueue on the source, it instead
has an index.Storage (configured via a RequiredObject
so it can be a kvfile, leveldb, mysql, postgres etc)
-- make all the index.Storage types be instantiable
from a jsonconfig Object, perhaps with constructors keyed
on a "type" field.
-- make sync handler support blobserver.Receiver (or StatReceiver)
like indexes, so it can receive blobs. but all it needs to
do to acknowledge the ReceiveBlob is write and flush to its
index.Storage. the syncing is async by default. (otherwise callers
could just use "replica" if they wanted sync replication).
But maybe for ease of configuration switching, we could also
support a sync mode. when it needs to replicate a blob,
it uses the source.
-- future option: sync mirror to an alternate path on ReceiveBlob
that can delete. e.g. you're uploading to s3 and google,
but don't want to upload to both at once, so you use the localdisk
as a buffer to spread out your upstream bandwidth.
-- end result: no more hardlinks or queue creator.
Change-Id: I6244fc4f3a655f08470ae3160502659399f468ed
2013-11-22 22:33:31 +00:00
|
|
|
// TODO(bslatkin): sync does not work on App Engine yet because there are no
|
2011-12-04 23:19:28 +00:00
|
|
|
// background threads to do the sync loop. The plan is to break the
|
|
|
|
// syncer code up into two parts: 1) accepts notifications of new blobs to
|
|
|
|
// sync, 2) does one unit of work enumerating recent blobs and syncing them.
|
|
|
|
// In App Engine land, 1) will result in a task to be enqueued, and 2) will
|
|
|
|
// be called from within that queue context.
|