Start of new context package and *context.Context type.

Will eventually be plumbed through lots of APIs, especially those requiring or benefiting from
cancelation notification and/or those needing access to the HTTP context (e.g. App Engine).

Change-Id: I591496725d620126e09d49eb07cade7707c7fc64
This commit is contained in:
Brad Fitzpatrick 2013-12-02 13:20:51 -08:00
parent 0c73a65f78
commit b82b8efe4c
25 changed files with 234 additions and 78 deletions

View File

@ -30,6 +30,7 @@ import (
"camlistore.org/pkg/blobserver/localdisk"
"camlistore.org/pkg/client"
"camlistore.org/pkg/cmdmain"
"camlistore.org/pkg/context"
)
type syncCmd struct {
@ -236,16 +237,16 @@ func (c *syncCmd) discoClient() *client.Client {
return cl
}
func enumerateAllBlobs(s blobserver.Storage, destc chan<- blob.SizedRef) error {
func enumerateAllBlobs(ctx *context.Context, s blobserver.Storage, destc chan<- blob.SizedRef) error {
// Use *client.Client's support for enumerating all blobs if
// possible, since it could probably do a better job knowing
// HTTP boundaries and such.
if c, ok := s.(*client.Client); ok {
return c.SimpleEnumerateBlobs(destc)
return c.SimpleEnumerateBlobs(ctx, destc)
}
defer close(destc)
return blobserver.EnumerateAll(s, func(sb blob.SizedRef) error {
return blobserver.EnumerateAll(ctx, s, func(sb blob.SizedRef) error {
destc <- sb
return nil
})
@ -263,8 +264,10 @@ func (c *syncCmd) doPass(src, dest, thirdLeg blobserver.Storage) (stats SyncStat
srcErr := make(chan error, 1)
destErr := make(chan error, 1)
ctx := context.TODO()
defer ctx.Cancel()
go func() {
srcErr <- enumerateAllBlobs(src, srcBlobs)
srcErr <- enumerateAllBlobs(ctx, src, srcBlobs)
}()
checkSourceError := func() {
if err := <-srcErr; err != nil {
@ -281,7 +284,7 @@ func (c *syncCmd) doPass(src, dest, thirdLeg blobserver.Storage) (stats SyncStat
}
go func() {
destErr <- enumerateAllBlobs(dest, destBlobs)
destErr <- enumerateAllBlobs(ctx, dest, destBlobs)
}()
checkDestError := func() {
if err := <-destErr; err != nil {
@ -306,7 +309,7 @@ func (c *syncCmd) doPass(src, dest, thirdLeg blobserver.Storage) (stats SyncStat
thirdBlobs := make(chan blob.SizedRef, 100)
thirdErr := make(chan error, 1)
go func() {
thirdErr <- enumerateAllBlobs(thirdLeg, thirdBlobs)
thirdErr <- enumerateAllBlobs(ctx, thirdLeg, thirdBlobs)
}()
checkThirdError = func() {
if err := <-thirdErr; err != nil {

View File

@ -47,6 +47,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/schema"
)
@ -202,9 +203,9 @@ func (sto *condStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) e
return errors.New("cond: Read not configured")
}
func (sto *condStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (sto *condStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
if sto.read != nil {
return sto.read.EnumerateBlobs(dest, after, limit)
return sto.read.EnumerateBlobs(ctx, dest, after, limit)
}
return errors.New("cond: Read not configured")
}

View File

@ -43,6 +43,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/local"
"camlistore.org/pkg/context"
"camlistore.org/pkg/index/kvfile"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/readerutil"
@ -266,8 +267,16 @@ func (s *storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) (err er
return wg.Err()
}
func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) (err error) {
func (s *storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) {
defer close(dest)
t := s.index.Find(after)
defer func() {
closeErr := t.Close()
if err == nil {
err = closeErr
}
}()
for i := 0; i < limit && t.Next(); {
key := t.Key()
if key <= after {
@ -276,22 +285,20 @@ func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit
}
br, ok := blob.Parse(key)
if !ok {
err = fmt.Errorf("diskpacked: couldn't parse index key %q", key)
continue
return fmt.Errorf("diskpacked: couldn't parse index key %q", key)
}
m, ok := parseBlobMeta(t.Value())
if !ok {
err = fmt.Errorf("diskpacked: couldn't parse index value %q: %q", key, t.Value())
continue
return fmt.Errorf("diskpacked: couldn't parse index value %q: %q", key, t.Value())
}
select {
case dest <- m.SizedRef(br):
case <-ctx.Done():
return context.ErrCanceled
}
dest <- m.SizedRef(br)
i++
}
if err2 := t.Close(); err == nil && err2 != nil {
err = err2
}
close(dest)
return
return nil
}
func (s *storage) ReceiveBlob(br blob.Ref, source io.Reader) (sbr blob.SizedRef, err error) {

View File

@ -50,6 +50,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/sorted"
)
@ -325,7 +326,7 @@ func (s *storage) FetchStreaming(plainBR blob.Ref) (file io.ReadCloser, size int
}, plainSize, nil
}
func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (s *storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
defer close(dest)
iter := s.index.Find(after)
n := 0
@ -341,7 +342,11 @@ func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit
if !ok {
panic("Bogus encrypt index value: " + iter.Value())
}
dest <- blob.SizedRef{br, plainSize}
select {
case dest <- blob.SizedRef{br, plainSize}:
case <-ctx.Done():
return context.ErrCanceled
}
n++
if limit != 0 && n >= limit {
break
@ -421,7 +426,7 @@ func (s *storage) readAllMetaBlobs() error {
enumErrc := make(chan error, 1)
go func() {
var wg sync.WaitGroup
enumErrc <- blobserver.EnumerateAll(s.meta, func(sb blob.SizedRef) error {
enumErrc <- blobserver.EnumerateAll(context.TODO(), s.meta, func(sb blob.SizedRef) error {
select {
case <-stopEnumerate:
return errors.New("enumeration stopped")

View File

@ -20,12 +20,13 @@ import (
"sync"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
)
// EnumerateAll runs fn for each blob in src.
// If fn returns an error, iteration stops and fn isn't called again.
// EnumerateAll will not return concurrently with fn.
func EnumerateAll(src BlobEnumerator, fn func(blob.SizedRef) error) error {
func EnumerateAll(ctx *context.Context, src BlobEnumerator, fn func(blob.SizedRef) error) error {
const batchSize = 1000
var mu sync.Mutex // protects returning with an error while fn is still running
after := ""
@ -47,7 +48,7 @@ func EnumerateAll(src BlobEnumerator, fn func(blob.SizedRef) error) error {
}
errc <- err
}()
err := src.EnumerateBlobs(ch, after, batchSize)
err := src.EnumerateBlobs(ctx, ch, after, batchSize)
if err != nil {
mu.Lock() // make sure fn callback finished; no need to unlock
return err

View File

@ -27,6 +27,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/googlestorage"
"camlistore.org/pkg/jsonconfig"
)
@ -59,7 +60,7 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
return gs, nil
}
func (gs *Storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (gs *Storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
defer close(dest)
objs, err := gs.client.EnumerateObjects(gs.bucket, after, limit)
if err != nil {
@ -71,7 +72,11 @@ func (gs *Storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit
if !ok {
continue
}
dest <- blob.SizedRef{Ref: br, Size: obj.Size}
select {
case dest <- blob.SizedRef{Ref: br, Size: obj.Size}:
case <-ctx.Done():
return context.ErrCanceled
}
}
return nil
}

View File

@ -19,13 +19,14 @@ package drive
import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
)
var _ blobserver.MaxEnumerateConfig = (*driveStorage)(nil)
func (sto *driveStorage) MaxEnumerate() int { return 1000 }
func (sto *driveStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (sto *driveStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
defer close(dest)
panic("not implemented")
return nil

View File

@ -26,6 +26,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
)
const defaultMaxEnumerate = 10000
@ -100,7 +101,7 @@ func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage b
blobch := make(chan blob.SizedRef, 100)
resultch := make(chan error, 1)
go func() {
resultch <- storage.EnumerateBlobs(blobch, formValueAfter, limit+1)
resultch <- storage.EnumerateBlobs(context.TODO(), blobch, formValueAfter, limit+1)
}()
endsReached := 0

View File

@ -22,13 +22,14 @@ import (
"testing"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
. "camlistore.org/pkg/test/asserts"
)
type emptyEnumerator struct {
}
func (ee *emptyEnumerator) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (ee *emptyEnumerator) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
close(dest)
return nil
}

View File

@ -24,6 +24,7 @@ import (
"time"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
)
// MaxBlobSize is the size of a single blob in Camlistore.
@ -90,8 +91,10 @@ type BlobEnumerator interface {
// after (if provided).
// limit will be supplied and sanity checked by caller.
// EnumerateBlobs must close the channel. (even if limit
// was hit and more blobs remain)
EnumerateBlobs(dest chan<- blob.SizedRef,
// was hit and more blobs remain, or an error is returned, or
// the ctx is canceled)
EnumerateBlobs(ctx *context.Context,
dest chan<- blob.SizedRef,
after string,
limit int) error
}

View File

@ -25,9 +25,11 @@ import (
"strings"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
)
type readBlobRequest struct {
done <-chan struct{}
ch chan<- blob.SizedRef
after string
remain *int // limit countdown
@ -148,7 +150,11 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error {
continue
}
if blobRef, ok := blob.Parse(blobName); ok {
opts.ch <- blob.SizedRef{Ref: blobRef, Size: fi.Size()}
select {
case opts.ch <- blob.SizedRef{Ref: blobRef, Size: fi.Size()}:
case <-opts.done:
return context.ErrCanceled
}
(*opts.remain)--
}
continue
@ -158,7 +164,7 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error {
return nil
}
func (ds *DiskStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (ds *DiskStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
defer close(dest)
if limit == 0 {
log.Printf("Warning: localdisk.EnumerateBlobs called with a limit of 0")
@ -166,6 +172,7 @@ func (ds *DiskStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, l
limitMutable := limit
return ds.readBlobs(readBlobRequest{
done: ctx.Done(),
ch: dest,
dirRoot: ds.root,
after: after,

View File

@ -24,6 +24,7 @@ import (
"testing"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
"camlistore.org/pkg/test"
. "camlistore.org/pkg/test/asserts"
)
@ -45,7 +46,7 @@ func TestEnumerate(t *testing.T) {
ch := make(chan blob.SizedRef)
errCh := make(chan error)
go func() {
errCh <- ds.EnumerateBlobs(ch, "", limit)
errCh <- ds.EnumerateBlobs(context.New(), ch, "", limit)
}()
var (
@ -68,7 +69,8 @@ func TestEnumerate(t *testing.T) {
// Now again, but skipping foo's blob
ch = make(chan blob.SizedRef)
go func() {
errCh <- ds.EnumerateBlobs(ch,
errCh <- ds.EnumerateBlobs(context.New(),
ch,
foo.BlobRef().String(),
limit)
}()
@ -91,7 +93,7 @@ func TestEnumerateEmpty(t *testing.T) {
ch := make(chan blob.SizedRef)
errCh := make(chan error)
go func() {
errCh <- ds.EnumerateBlobs(ch, "", limit)
errCh <- ds.EnumerateBlobs(context.New(), ch, "", limit)
}()
_, ok := <-ch
@ -157,7 +159,7 @@ func TestEnumerateIsSorted(t *testing.T) {
ch := make(chan blob.SizedRef)
errCh := make(chan error)
go func() {
errCh <- ds.EnumerateBlobs(ch, test.after, limit)
errCh <- ds.EnumerateBlobs(context.New(), ch, test.after, limit)
}()
got := make([]blob.SizedRef, 0, blobsToMake)
for sb := range ch {

View File

@ -18,6 +18,7 @@ package blobserver
import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
)
const buffered = 8
@ -25,14 +26,14 @@ const buffered = 8
// TODO: it'd be nice to make sources be []BlobEnumerator, but that
// makes callers more complex since assignable interfaces' slice forms
// aren't assignable.
func MergedEnumerate(dest chan<- blob.SizedRef, sources []Storage, after string, limit int) error {
func MergedEnumerate(ctx *context.Context, dest chan<- blob.SizedRef, sources []Storage, after string, limit int) error {
defer close(dest)
startEnum := func(source Storage) (*blob.ChanPeeker, <-chan error) {
ch := make(chan blob.SizedRef, buffered)
errch := make(chan error, 1)
go func() {
errch <- source.EnumerateBlobs(ch, after, limit)
errch <- source.EnumerateBlobs(ctx, ch, after, limit)
}()
return &blob.ChanPeeker{Ch: ch}, errch
}

View File

@ -22,6 +22,7 @@ import (
"os"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
"camlistore.org/pkg/types"
)
@ -48,7 +49,7 @@ func (nis *NoImplStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref)
return errors.New("Stat not implemented")
}
func (nis *NoImplStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (nis *NoImplStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
return errors.New("EnumerateBlobs not implemented")
}

View File

@ -38,6 +38,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/client"
"camlistore.org/pkg/context"
"camlistore.org/pkg/jsonconfig"
)
@ -75,7 +76,7 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserv
// correct.
// TODO(bradfitz,mpl): skip this operation smartly if it turns out this is annoying/slow for whatever reason.
c := make(chan blob.SizedRef, 1)
err = sto.EnumerateBlobs(c, "", 1)
err = sto.EnumerateBlobs(context.TODO(), c, "", 1)
if err != nil {
return nil, err
}
@ -115,8 +116,8 @@ func (sto *remoteStorage) FetchStreaming(b blob.Ref) (file io.ReadCloser, size i
func (sto *remoteStorage) MaxEnumerate() int { return 1000 }
func (sto *remoteStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
return sto.client.EnumerateBlobsOpts(dest, client.EnumerateOpts{
func (sto *remoteStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
return sto.client.EnumerateBlobsOpts(ctx, dest, client.EnumerateOpts{
After: after,
Limit: limit,
})

View File

@ -44,6 +44,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/jsonconfig"
)
@ -253,8 +254,8 @@ func (sto *replicaStorage) RemoveBlobs(blobs []blob.Ref) error {
return reterr
}
func (sto *replicaStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
return blobserver.MergedEnumerate(dest, sto.readReplicas, after, limit)
func (sto *replicaStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
return blobserver.MergedEnumerate(ctx, dest, sto.readReplicas, after, limit)
}
func init() {

View File

@ -21,13 +21,14 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
)
var _ blobserver.MaxEnumerateConfig = (*s3Storage)(nil)
func (sto *s3Storage) MaxEnumerate() int { return 1000 }
func (sto *s3Storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (sto *s3Storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
defer close(dest)
objs, err := sto.s3Client.ListBucket(sto.bucket, after, limit)
if err != nil {
@ -39,7 +40,11 @@ func (sto *s3Storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, li
if !ok {
continue
}
dest <- blob.SizedRef{Ref: br, Size: obj.Size}
select {
case dest <- blob.SizedRef{Ref: br, Size: obj.Size}:
case <-ctx.Done():
return context.ErrCanceled
}
}
return nil
}

View File

@ -37,6 +37,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/jsonconfig"
)
@ -117,8 +118,8 @@ func (sto *shardStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref)
})
}
func (sto *shardStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
return blobserver.MergedEnumerate(dest, sto.shards, after, limit)
func (sto *shardStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
return blobserver.MergedEnumerate(ctx, dest, sto.shards, after, limit)
}
func init() {

View File

@ -30,6 +30,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/syncutil"
"camlistore.org/pkg/test"
"camlistore.org/pkg/types"
@ -47,12 +48,6 @@ func Test(t *testing.T, fn func(*testing.T) (sto blobserver.Storage, cleanup fun
t.Logf("Testing blobserver storage %T", sto)
t.Logf("Testing Enumerate for empty")
dest := make(chan blob.SizedRef)
go func() {
if err := sto.EnumerateBlobs(dest, "", 1000); err != nil {
t.Fatalf("EnumerateBlob: %v", err)
}
}()
testEnumerate(t, sto, nil)
var blobs []*test.Blob
@ -117,7 +112,7 @@ func Test(t *testing.T, fn func(*testing.T) (sto blobserver.Storage, cleanup fun
}
t.Logf("Testing Stat")
dest = make(chan blob.SizedRef)
dest := make(chan blob.SizedRef)
go func() {
if err := sto.StatBlobs(dest, blobRefs); err != nil {
t.Fatalf("error stating blobs %s: %v", blobRefs, err)
@ -200,7 +195,7 @@ func testEnumerate(t *testing.T, sto blobserver.Storage, wantUnsorted []blob.Siz
var grp syncutil.Group
sawEnd := make(chan bool, 1)
grp.Go(func() error {
if err := sto.EnumerateBlobs(sbc, after, n); err != nil {
if err := sto.EnumerateBlobs(context.New(), sbc, after, n); err != nil {
return fmt.Errorf("EnumerateBlobs(%q, %d): %v", after, n)
}
return nil

View File

@ -24,6 +24,7 @@ import (
"time"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
)
type EnumerateOpts struct {
@ -33,17 +34,17 @@ type EnumerateOpts struct {
}
// Note: closes ch.
func (c *Client) SimpleEnumerateBlobs(ch chan<- blob.SizedRef) error {
return c.EnumerateBlobsOpts(ch, EnumerateOpts{})
func (c *Client) SimpleEnumerateBlobs(ctx *context.Context, ch chan<- blob.SizedRef) error {
return c.EnumerateBlobsOpts(ctx, ch, EnumerateOpts{})
}
func (c *Client) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (c *Client) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
if limit == 0 {
log.Printf("Warning: Client.EnumerateBlobs called with a limit of zero")
close(dest)
return nil
}
return c.EnumerateBlobsOpts(dest, EnumerateOpts{
return c.EnumerateBlobsOpts(ctx, dest, EnumerateOpts{
After: after,
Limit: limit,
})
@ -52,7 +53,7 @@ func (c *Client) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit i
const enumerateBatchSize = 1000
// Note: closes ch.
func (c *Client) EnumerateBlobsOpts(ch chan<- blob.SizedRef, opts EnumerateOpts) error {
func (c *Client) EnumerateBlobsOpts(ctx *context.Context, ch chan<- blob.SizedRef, opts EnumerateOpts) error {
defer close(ch)
if opts.After != "" && opts.MaxWait != 0 {
return errors.New("client error: it's invalid to use enumerate After and MaxWaitSec together")
@ -115,7 +116,11 @@ func (c *Client) EnumerateBlobsOpts(ch chan<- blob.SizedRef, opts EnumerateOpts)
if !ok {
return error("item in 'blobs' had invalid blobref.", nil)
}
ch <- blob.SizedRef{Ref: br, Size: size}
select {
case ch <- blob.SizedRef{Ref: br, Size: size}:
case <-ctx.Done():
return context.ErrCanceled
}
nSent++
if opts.Limit == nSent {
// nSent can't be zero at this point, so opts.Limit being 0

83
pkg/context/context.go Normal file
View File

@ -0,0 +1,83 @@
/*
Copyright 2013 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Packaage context provides a Context type to propagate state and cancellation
// information.
package context
import (
"errors"
"sync"
)
// ErrCanceled may be returned by code when it receives from a Context.Done channel.
var ErrCanceled = errors.New("canceled")
// TODO returns a dummy context. It's a signal that the caller code is not yet correct,
// and needs its own context to propagate.
func TODO() *Context {
return nil
}
// A Context is carries state and cancellation information between calls.
// A nil Context pointer is valid, for now.
type Context struct {
cancelOnce sync.Once
done chan struct{}
}
func New() *Context {
return &Context{
done: make(chan struct{}),
}
}
// New returns a child context attached to the receiver parent context c.
// The returned context is done when the parent is done, but the returned child
// context can be canceled indepedently without affecting the parent.
func (c *Context) New() *Context {
subc := New()
if c == nil {
return subc
}
go func() {
<-c.Done()
subc.Cancel()
}()
return subc
}
// Done returns a channel that is closed when the Context is cancelled
// or finished.
func (c *Context) Done() <-chan struct{} {
if c == nil {
return nil
}
return c.done
}
func (c *Context) Cancel() {
if c == nil {
return
}
c.cancelOnce.Do(c.cancel)
}
func (c *Context) cancel() {
if c.done != nil {
close(c.done)
}
}

View File

@ -22,26 +22,41 @@ import (
"strings"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
"camlistore.org/pkg/sorted"
)
func (ix *Index) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (ix *Index) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) {
defer close(dest)
it := ix.s.Find("have:" + after)
defer func() {
closeErr := it.Close()
if err == nil {
err = closeErr
}
}()
n := int(0)
for n < limit && it.Next() {
k := it.Key()
if k <= after {
continue
}
if !strings.HasPrefix(k, "have:") {
break
}
n++
br, ok := blob.Parse(k[len("have:"):])
size, err := strconv.ParseInt(it.Value(), 10, 64)
size, err := strconv.ParseUint(it.Value(), 10, 32)
if ok && err == nil {
dest <- blob.SizedRef{br, size}
select {
case dest <- blob.SizedRef{br, int64(size)}:
case <-ctx.Done():
return context.ErrCanceled
}
}
}
return it.Close()
return nil
}
func (ix *Index) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {

View File

@ -30,6 +30,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/readerutil"
"camlistore.org/pkg/sorted"
@ -141,7 +142,7 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler,
go func() {
n := sh.runSync("queue", sh.enumerateQueuedBlobs)
sh.logf("Queue sync copied %d blobs", n)
n = sh.runSync("full", blobserverEnumerator(fromBs))
n = sh.runSync("full", blobserverEnumerator(context.TODO(), fromBs))
sh.logf("Full sync copied %d blobs", n)
didFullSync <- true
sh.syncQueueLoop()
@ -286,9 +287,9 @@ type copyResult struct {
err error
}
func blobserverEnumerator(src blobserver.BlobEnumerator) func(chan<- blob.SizedRef, <-chan struct{}) error {
func blobserverEnumerator(ctx *context.Context, src blobserver.BlobEnumerator) func(chan<- blob.SizedRef, <-chan struct{}) error {
return func(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
return blobserver.EnumerateAll(src, func(sb blob.SizedRef) error {
return blobserver.EnumerateAll(ctx, src, func(sb blob.SizedRef) error {
select {
case dst <- sb:
case <-intr:

View File

@ -27,6 +27,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/types"
)
@ -142,7 +143,7 @@ func (tf *Fetcher) BlobrefStrings() []string {
return s
}
func (tf *Fetcher) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (tf *Fetcher) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
defer close(dest)
tf.l.Lock()
defer tf.l.Unlock()
@ -152,7 +153,11 @@ func (tf *Fetcher) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit
continue
}
b := tf.m[k]
dest <- blob.SizedRef{b.BlobRef(), b.Size()}
select {
case dest <- blob.SizedRef{b.BlobRef(), b.Size()}:
case <-ctx.Done():
return context.ErrCanceled
}
n++
if limit > 0 && n == limit {
break

View File

@ -33,6 +33,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context"
"camlistore.org/pkg/jsonconfig"
)
@ -340,19 +341,19 @@ func (sto *appengineStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.R
return err
}
func (sto *appengineStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
func (sto *appengineStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
defer close(dest)
loan := ctxPool.Get()
defer loan.Return()
ctx := loan
actx := loan
prefix := sto.namespace + "|"
keyBegin := datastore.NewKey(ctx, memKind, prefix+after, 0, nil)
keyEnd := datastore.NewKey(ctx, memKind, sto.namespace+"~", 0, nil)
keyBegin := datastore.NewKey(actx, memKind, prefix+after, 0, nil)
keyEnd := datastore.NewKey(actx, memKind, sto.namespace+"~", 0, nil)
q := datastore.NewQuery(memKind).Limit(int(limit)).Filter("__key__>", keyBegin).Filter("__key__<", keyEnd)
it := q.Run(ctx)
it := q.Run(actx)
var row memEnt
for {
key, err := it.Next(&row)
@ -362,7 +363,11 @@ func (sto *appengineStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after str
if err != nil {
return err
}
dest <- blob.SizedRef{blob.ParseOrZero(key.StringID()[len(prefix):]), row.Size}
select {
case dest <- blob.SizedRef{blob.ParseOrZero(key.StringID()[len(prefix):]), row.Size}:
case <-ctx.Done():
return context.ErrCanceled
}
}
return nil
}