From b82b8efe4c8a5f4b97bb0c7c7d7ecb9c07ae78b4 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Mon, 2 Dec 2013 13:20:51 -0800 Subject: [PATCH] Start of new context package and *context.Context type. Will eventually be plumbed through lots of APIs, especially those requiring or benefiting from cancelation notification and/or those needing access to the HTTP context (e.g. App Engine). Change-Id: I591496725d620126e09d49eb07cade7707c7fc64 --- cmd/camtool/sync.go | 15 ++-- pkg/blobserver/cond/cond.go | 5 +- pkg/blobserver/diskpacked/diskpacked.go | 29 ++++--- pkg/blobserver/encrypt/encrypt.go | 11 ++- pkg/blobserver/enumerate.go | 5 +- pkg/blobserver/google/cloudstorage/storage.go | 9 +- pkg/blobserver/google/drive/enumerate.go | 3 +- pkg/blobserver/handlers/enumerate.go | 3 +- pkg/blobserver/handlers/enumerate_test.go | 3 +- pkg/blobserver/interface.go | 7 +- pkg/blobserver/localdisk/enumerate.go | 11 ++- pkg/blobserver/localdisk/enumerate_test.go | 10 ++- pkg/blobserver/mergedenum.go | 5 +- pkg/blobserver/noimpl.go | 3 +- pkg/blobserver/remote/remote.go | 7 +- pkg/blobserver/replica/replica.go | 5 +- pkg/blobserver/s3/enumerate.go | 9 +- pkg/blobserver/shard/shard.go | 5 +- pkg/blobserver/storagetest/storagetest.go | 11 +-- pkg/client/enumerate.go | 17 ++-- pkg/context/context.go | 83 +++++++++++++++++++ pkg/index/enumstat.go | 23 ++++- pkg/server/sync.go | 7 +- pkg/test/fetcher.go | 9 +- server/appengine/camli/storage.go | 17 ++-- 25 files changed, 234 insertions(+), 78 deletions(-) create mode 100644 pkg/context/context.go diff --git a/cmd/camtool/sync.go b/cmd/camtool/sync.go index 4a1f45792..bcf46134e 100644 --- a/cmd/camtool/sync.go +++ b/cmd/camtool/sync.go @@ -30,6 +30,7 @@ import ( "camlistore.org/pkg/blobserver/localdisk" "camlistore.org/pkg/client" "camlistore.org/pkg/cmdmain" + "camlistore.org/pkg/context" ) type syncCmd struct { @@ -236,16 +237,16 @@ func (c *syncCmd) discoClient() *client.Client { return cl } -func enumerateAllBlobs(s blobserver.Storage, destc chan<- blob.SizedRef) error { +func enumerateAllBlobs(ctx *context.Context, s blobserver.Storage, destc chan<- blob.SizedRef) error { // Use *client.Client's support for enumerating all blobs if // possible, since it could probably do a better job knowing // HTTP boundaries and such. if c, ok := s.(*client.Client); ok { - return c.SimpleEnumerateBlobs(destc) + return c.SimpleEnumerateBlobs(ctx, destc) } defer close(destc) - return blobserver.EnumerateAll(s, func(sb blob.SizedRef) error { + return blobserver.EnumerateAll(ctx, s, func(sb blob.SizedRef) error { destc <- sb return nil }) @@ -263,8 +264,10 @@ func (c *syncCmd) doPass(src, dest, thirdLeg blobserver.Storage) (stats SyncStat srcErr := make(chan error, 1) destErr := make(chan error, 1) + ctx := context.TODO() + defer ctx.Cancel() go func() { - srcErr <- enumerateAllBlobs(src, srcBlobs) + srcErr <- enumerateAllBlobs(ctx, src, srcBlobs) }() checkSourceError := func() { if err := <-srcErr; err != nil { @@ -281,7 +284,7 @@ func (c *syncCmd) doPass(src, dest, thirdLeg blobserver.Storage) (stats SyncStat } go func() { - destErr <- enumerateAllBlobs(dest, destBlobs) + destErr <- enumerateAllBlobs(ctx, dest, destBlobs) }() checkDestError := func() { if err := <-destErr; err != nil { @@ -306,7 +309,7 @@ func (c *syncCmd) doPass(src, dest, thirdLeg blobserver.Storage) (stats SyncStat thirdBlobs := make(chan blob.SizedRef, 100) thirdErr := make(chan error, 1) go func() { - thirdErr <- enumerateAllBlobs(thirdLeg, thirdBlobs) + thirdErr <- enumerateAllBlobs(ctx, thirdLeg, thirdBlobs) }() checkThirdError = func() { if err := <-thirdErr; err != nil { diff --git a/pkg/blobserver/cond/cond.go b/pkg/blobserver/cond/cond.go index 1cdd28c22..0bd000146 100644 --- a/pkg/blobserver/cond/cond.go +++ b/pkg/blobserver/cond/cond.go @@ -47,6 +47,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/schema" ) @@ -202,9 +203,9 @@ func (sto *condStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) e return errors.New("cond: Read not configured") } -func (sto *condStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (sto *condStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { if sto.read != nil { - return sto.read.EnumerateBlobs(dest, after, limit) + return sto.read.EnumerateBlobs(ctx, dest, after, limit) } return errors.New("cond: Read not configured") } diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index 2b0f948c0..c9984e477 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -43,6 +43,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver/local" + "camlistore.org/pkg/context" "camlistore.org/pkg/index/kvfile" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/readerutil" @@ -266,8 +267,16 @@ func (s *storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) (err er return wg.Err() } -func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) (err error) { +func (s *storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) { + defer close(dest) + t := s.index.Find(after) + defer func() { + closeErr := t.Close() + if err == nil { + err = closeErr + } + }() for i := 0; i < limit && t.Next(); { key := t.Key() if key <= after { @@ -276,22 +285,20 @@ func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit } br, ok := blob.Parse(key) if !ok { - err = fmt.Errorf("diskpacked: couldn't parse index key %q", key) - continue + return fmt.Errorf("diskpacked: couldn't parse index key %q", key) } m, ok := parseBlobMeta(t.Value()) if !ok { - err = fmt.Errorf("diskpacked: couldn't parse index value %q: %q", key, t.Value()) - continue + return fmt.Errorf("diskpacked: couldn't parse index value %q: %q", key, t.Value()) + } + select { + case dest <- m.SizedRef(br): + case <-ctx.Done(): + return context.ErrCanceled } - dest <- m.SizedRef(br) i++ } - if err2 := t.Close(); err == nil && err2 != nil { - err = err2 - } - close(dest) - return + return nil } func (s *storage) ReceiveBlob(br blob.Ref, source io.Reader) (sbr blob.SizedRef, err error) { diff --git a/pkg/blobserver/encrypt/encrypt.go b/pkg/blobserver/encrypt/encrypt.go index 60ffbedef..059833ebf 100644 --- a/pkg/blobserver/encrypt/encrypt.go +++ b/pkg/blobserver/encrypt/encrypt.go @@ -50,6 +50,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/sorted" ) @@ -325,7 +326,7 @@ func (s *storage) FetchStreaming(plainBR blob.Ref) (file io.ReadCloser, size int }, plainSize, nil } -func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (s *storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) iter := s.index.Find(after) n := 0 @@ -341,7 +342,11 @@ func (s *storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit if !ok { panic("Bogus encrypt index value: " + iter.Value()) } - dest <- blob.SizedRef{br, plainSize} + select { + case dest <- blob.SizedRef{br, plainSize}: + case <-ctx.Done(): + return context.ErrCanceled + } n++ if limit != 0 && n >= limit { break @@ -421,7 +426,7 @@ func (s *storage) readAllMetaBlobs() error { enumErrc := make(chan error, 1) go func() { var wg sync.WaitGroup - enumErrc <- blobserver.EnumerateAll(s.meta, func(sb blob.SizedRef) error { + enumErrc <- blobserver.EnumerateAll(context.TODO(), s.meta, func(sb blob.SizedRef) error { select { case <-stopEnumerate: return errors.New("enumeration stopped") diff --git a/pkg/blobserver/enumerate.go b/pkg/blobserver/enumerate.go index 8f5036f2a..e49d33e2e 100644 --- a/pkg/blobserver/enumerate.go +++ b/pkg/blobserver/enumerate.go @@ -20,12 +20,13 @@ import ( "sync" "camlistore.org/pkg/blob" + "camlistore.org/pkg/context" ) // EnumerateAll runs fn for each blob in src. // If fn returns an error, iteration stops and fn isn't called again. // EnumerateAll will not return concurrently with fn. -func EnumerateAll(src BlobEnumerator, fn func(blob.SizedRef) error) error { +func EnumerateAll(ctx *context.Context, src BlobEnumerator, fn func(blob.SizedRef) error) error { const batchSize = 1000 var mu sync.Mutex // protects returning with an error while fn is still running after := "" @@ -47,7 +48,7 @@ func EnumerateAll(src BlobEnumerator, fn func(blob.SizedRef) error) error { } errc <- err }() - err := src.EnumerateBlobs(ch, after, batchSize) + err := src.EnumerateBlobs(ctx, ch, after, batchSize) if err != nil { mu.Lock() // make sure fn callback finished; no need to unlock return err diff --git a/pkg/blobserver/google/cloudstorage/storage.go b/pkg/blobserver/google/cloudstorage/storage.go index 23f4ac4dd..0adb929d3 100644 --- a/pkg/blobserver/google/cloudstorage/storage.go +++ b/pkg/blobserver/google/cloudstorage/storage.go @@ -27,6 +27,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" "camlistore.org/pkg/googlestorage" "camlistore.org/pkg/jsonconfig" ) @@ -59,7 +60,7 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora return gs, nil } -func (gs *Storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (gs *Storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) objs, err := gs.client.EnumerateObjects(gs.bucket, after, limit) if err != nil { @@ -71,7 +72,11 @@ func (gs *Storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit if !ok { continue } - dest <- blob.SizedRef{Ref: br, Size: obj.Size} + select { + case dest <- blob.SizedRef{Ref: br, Size: obj.Size}: + case <-ctx.Done(): + return context.ErrCanceled + } } return nil } diff --git a/pkg/blobserver/google/drive/enumerate.go b/pkg/blobserver/google/drive/enumerate.go index 4b2d3d0ab..49b508963 100644 --- a/pkg/blobserver/google/drive/enumerate.go +++ b/pkg/blobserver/google/drive/enumerate.go @@ -19,13 +19,14 @@ package drive import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" ) var _ blobserver.MaxEnumerateConfig = (*driveStorage)(nil) func (sto *driveStorage) MaxEnumerate() int { return 1000 } -func (sto *driveStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (sto *driveStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) panic("not implemented") return nil diff --git a/pkg/blobserver/handlers/enumerate.go b/pkg/blobserver/handlers/enumerate.go index 944db4438..e4d7ae0ca 100644 --- a/pkg/blobserver/handlers/enumerate.go +++ b/pkg/blobserver/handlers/enumerate.go @@ -26,6 +26,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" ) const defaultMaxEnumerate = 10000 @@ -100,7 +101,7 @@ func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage b blobch := make(chan blob.SizedRef, 100) resultch := make(chan error, 1) go func() { - resultch <- storage.EnumerateBlobs(blobch, formValueAfter, limit+1) + resultch <- storage.EnumerateBlobs(context.TODO(), blobch, formValueAfter, limit+1) }() endsReached := 0 diff --git a/pkg/blobserver/handlers/enumerate_test.go b/pkg/blobserver/handlers/enumerate_test.go index a0eeec518..5def77176 100644 --- a/pkg/blobserver/handlers/enumerate_test.go +++ b/pkg/blobserver/handlers/enumerate_test.go @@ -22,13 +22,14 @@ import ( "testing" "camlistore.org/pkg/blob" + "camlistore.org/pkg/context" . "camlistore.org/pkg/test/asserts" ) type emptyEnumerator struct { } -func (ee *emptyEnumerator) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (ee *emptyEnumerator) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { close(dest) return nil } diff --git a/pkg/blobserver/interface.go b/pkg/blobserver/interface.go index 1c47720ea..c8a666d54 100644 --- a/pkg/blobserver/interface.go +++ b/pkg/blobserver/interface.go @@ -24,6 +24,7 @@ import ( "time" "camlistore.org/pkg/blob" + "camlistore.org/pkg/context" ) // MaxBlobSize is the size of a single blob in Camlistore. @@ -90,8 +91,10 @@ type BlobEnumerator interface { // after (if provided). // limit will be supplied and sanity checked by caller. // EnumerateBlobs must close the channel. (even if limit - // was hit and more blobs remain) - EnumerateBlobs(dest chan<- blob.SizedRef, + // was hit and more blobs remain, or an error is returned, or + // the ctx is canceled) + EnumerateBlobs(ctx *context.Context, + dest chan<- blob.SizedRef, after string, limit int) error } diff --git a/pkg/blobserver/localdisk/enumerate.go b/pkg/blobserver/localdisk/enumerate.go index cdb3f47b0..4d71797c4 100644 --- a/pkg/blobserver/localdisk/enumerate.go +++ b/pkg/blobserver/localdisk/enumerate.go @@ -25,9 +25,11 @@ import ( "strings" "camlistore.org/pkg/blob" + "camlistore.org/pkg/context" ) type readBlobRequest struct { + done <-chan struct{} ch chan<- blob.SizedRef after string remain *int // limit countdown @@ -148,7 +150,11 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error { continue } if blobRef, ok := blob.Parse(blobName); ok { - opts.ch <- blob.SizedRef{Ref: blobRef, Size: fi.Size()} + select { + case opts.ch <- blob.SizedRef{Ref: blobRef, Size: fi.Size()}: + case <-opts.done: + return context.ErrCanceled + } (*opts.remain)-- } continue @@ -158,7 +164,7 @@ func (ds *DiskStorage) readBlobs(opts readBlobRequest) error { return nil } -func (ds *DiskStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (ds *DiskStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) if limit == 0 { log.Printf("Warning: localdisk.EnumerateBlobs called with a limit of 0") @@ -166,6 +172,7 @@ func (ds *DiskStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, l limitMutable := limit return ds.readBlobs(readBlobRequest{ + done: ctx.Done(), ch: dest, dirRoot: ds.root, after: after, diff --git a/pkg/blobserver/localdisk/enumerate_test.go b/pkg/blobserver/localdisk/enumerate_test.go index c7da3db85..d5c678213 100644 --- a/pkg/blobserver/localdisk/enumerate_test.go +++ b/pkg/blobserver/localdisk/enumerate_test.go @@ -24,6 +24,7 @@ import ( "testing" "camlistore.org/pkg/blob" + "camlistore.org/pkg/context" "camlistore.org/pkg/test" . "camlistore.org/pkg/test/asserts" ) @@ -45,7 +46,7 @@ func TestEnumerate(t *testing.T) { ch := make(chan blob.SizedRef) errCh := make(chan error) go func() { - errCh <- ds.EnumerateBlobs(ch, "", limit) + errCh <- ds.EnumerateBlobs(context.New(), ch, "", limit) }() var ( @@ -68,7 +69,8 @@ func TestEnumerate(t *testing.T) { // Now again, but skipping foo's blob ch = make(chan blob.SizedRef) go func() { - errCh <- ds.EnumerateBlobs(ch, + errCh <- ds.EnumerateBlobs(context.New(), + ch, foo.BlobRef().String(), limit) }() @@ -91,7 +93,7 @@ func TestEnumerateEmpty(t *testing.T) { ch := make(chan blob.SizedRef) errCh := make(chan error) go func() { - errCh <- ds.EnumerateBlobs(ch, "", limit) + errCh <- ds.EnumerateBlobs(context.New(), ch, "", limit) }() _, ok := <-ch @@ -157,7 +159,7 @@ func TestEnumerateIsSorted(t *testing.T) { ch := make(chan blob.SizedRef) errCh := make(chan error) go func() { - errCh <- ds.EnumerateBlobs(ch, test.after, limit) + errCh <- ds.EnumerateBlobs(context.New(), ch, test.after, limit) }() got := make([]blob.SizedRef, 0, blobsToMake) for sb := range ch { diff --git a/pkg/blobserver/mergedenum.go b/pkg/blobserver/mergedenum.go index 06c4a1142..f4ddbcc3a 100644 --- a/pkg/blobserver/mergedenum.go +++ b/pkg/blobserver/mergedenum.go @@ -18,6 +18,7 @@ package blobserver import ( "camlistore.org/pkg/blob" + "camlistore.org/pkg/context" ) const buffered = 8 @@ -25,14 +26,14 @@ const buffered = 8 // TODO: it'd be nice to make sources be []BlobEnumerator, but that // makes callers more complex since assignable interfaces' slice forms // aren't assignable. -func MergedEnumerate(dest chan<- blob.SizedRef, sources []Storage, after string, limit int) error { +func MergedEnumerate(ctx *context.Context, dest chan<- blob.SizedRef, sources []Storage, after string, limit int) error { defer close(dest) startEnum := func(source Storage) (*blob.ChanPeeker, <-chan error) { ch := make(chan blob.SizedRef, buffered) errch := make(chan error, 1) go func() { - errch <- source.EnumerateBlobs(ch, after, limit) + errch <- source.EnumerateBlobs(ctx, ch, after, limit) }() return &blob.ChanPeeker{Ch: ch}, errch } diff --git a/pkg/blobserver/noimpl.go b/pkg/blobserver/noimpl.go index d80ed709c..8ac0894d0 100644 --- a/pkg/blobserver/noimpl.go +++ b/pkg/blobserver/noimpl.go @@ -22,6 +22,7 @@ import ( "os" "camlistore.org/pkg/blob" + "camlistore.org/pkg/context" "camlistore.org/pkg/types" ) @@ -48,7 +49,7 @@ func (nis *NoImplStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) return errors.New("Stat not implemented") } -func (nis *NoImplStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (nis *NoImplStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { return errors.New("EnumerateBlobs not implemented") } diff --git a/pkg/blobserver/remote/remote.go b/pkg/blobserver/remote/remote.go index 5217b45a2..89cef5b9b 100644 --- a/pkg/blobserver/remote/remote.go +++ b/pkg/blobserver/remote/remote.go @@ -38,6 +38,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/client" + "camlistore.org/pkg/context" "camlistore.org/pkg/jsonconfig" ) @@ -75,7 +76,7 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (storage blobserv // correct. // TODO(bradfitz,mpl): skip this operation smartly if it turns out this is annoying/slow for whatever reason. c := make(chan blob.SizedRef, 1) - err = sto.EnumerateBlobs(c, "", 1) + err = sto.EnumerateBlobs(context.TODO(), c, "", 1) if err != nil { return nil, err } @@ -115,8 +116,8 @@ func (sto *remoteStorage) FetchStreaming(b blob.Ref) (file io.ReadCloser, size i func (sto *remoteStorage) MaxEnumerate() int { return 1000 } -func (sto *remoteStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { - return sto.client.EnumerateBlobsOpts(dest, client.EnumerateOpts{ +func (sto *remoteStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { + return sto.client.EnumerateBlobsOpts(ctx, dest, client.EnumerateOpts{ After: after, Limit: limit, }) diff --git a/pkg/blobserver/replica/replica.go b/pkg/blobserver/replica/replica.go index 5c5a774ac..4305ed3ed 100644 --- a/pkg/blobserver/replica/replica.go +++ b/pkg/blobserver/replica/replica.go @@ -44,6 +44,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" "camlistore.org/pkg/jsonconfig" ) @@ -253,8 +254,8 @@ func (sto *replicaStorage) RemoveBlobs(blobs []blob.Ref) error { return reterr } -func (sto *replicaStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { - return blobserver.MergedEnumerate(dest, sto.readReplicas, after, limit) +func (sto *replicaStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { + return blobserver.MergedEnumerate(ctx, dest, sto.readReplicas, after, limit) } func init() { diff --git a/pkg/blobserver/s3/enumerate.go b/pkg/blobserver/s3/enumerate.go index d70565a60..d9d84ca3c 100644 --- a/pkg/blobserver/s3/enumerate.go +++ b/pkg/blobserver/s3/enumerate.go @@ -21,13 +21,14 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" ) var _ blobserver.MaxEnumerateConfig = (*s3Storage)(nil) func (sto *s3Storage) MaxEnumerate() int { return 1000 } -func (sto *s3Storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (sto *s3Storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) objs, err := sto.s3Client.ListBucket(sto.bucket, after, limit) if err != nil { @@ -39,7 +40,11 @@ func (sto *s3Storage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, li if !ok { continue } - dest <- blob.SizedRef{Ref: br, Size: obj.Size} + select { + case dest <- blob.SizedRef{Ref: br, Size: obj.Size}: + case <-ctx.Done(): + return context.ErrCanceled + } } return nil } diff --git a/pkg/blobserver/shard/shard.go b/pkg/blobserver/shard/shard.go index aa23c21cd..9d7d601be 100644 --- a/pkg/blobserver/shard/shard.go +++ b/pkg/blobserver/shard/shard.go @@ -37,6 +37,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" "camlistore.org/pkg/jsonconfig" ) @@ -117,8 +118,8 @@ func (sto *shardStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) }) } -func (sto *shardStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { - return blobserver.MergedEnumerate(dest, sto.shards, after, limit) +func (sto *shardStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { + return blobserver.MergedEnumerate(ctx, dest, sto.shards, after, limit) } func init() { diff --git a/pkg/blobserver/storagetest/storagetest.go b/pkg/blobserver/storagetest/storagetest.go index a6ecac0a8..492ca68ef 100644 --- a/pkg/blobserver/storagetest/storagetest.go +++ b/pkg/blobserver/storagetest/storagetest.go @@ -30,6 +30,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" "camlistore.org/pkg/syncutil" "camlistore.org/pkg/test" "camlistore.org/pkg/types" @@ -47,12 +48,6 @@ func Test(t *testing.T, fn func(*testing.T) (sto blobserver.Storage, cleanup fun t.Logf("Testing blobserver storage %T", sto) t.Logf("Testing Enumerate for empty") - dest := make(chan blob.SizedRef) - go func() { - if err := sto.EnumerateBlobs(dest, "", 1000); err != nil { - t.Fatalf("EnumerateBlob: %v", err) - } - }() testEnumerate(t, sto, nil) var blobs []*test.Blob @@ -117,7 +112,7 @@ func Test(t *testing.T, fn func(*testing.T) (sto blobserver.Storage, cleanup fun } t.Logf("Testing Stat") - dest = make(chan blob.SizedRef) + dest := make(chan blob.SizedRef) go func() { if err := sto.StatBlobs(dest, blobRefs); err != nil { t.Fatalf("error stating blobs %s: %v", blobRefs, err) @@ -200,7 +195,7 @@ func testEnumerate(t *testing.T, sto blobserver.Storage, wantUnsorted []blob.Siz var grp syncutil.Group sawEnd := make(chan bool, 1) grp.Go(func() error { - if err := sto.EnumerateBlobs(sbc, after, n); err != nil { + if err := sto.EnumerateBlobs(context.New(), sbc, after, n); err != nil { return fmt.Errorf("EnumerateBlobs(%q, %d): %v", after, n) } return nil diff --git a/pkg/client/enumerate.go b/pkg/client/enumerate.go index abbdce5cb..be2b95639 100644 --- a/pkg/client/enumerate.go +++ b/pkg/client/enumerate.go @@ -24,6 +24,7 @@ import ( "time" "camlistore.org/pkg/blob" + "camlistore.org/pkg/context" ) type EnumerateOpts struct { @@ -33,17 +34,17 @@ type EnumerateOpts struct { } // Note: closes ch. -func (c *Client) SimpleEnumerateBlobs(ch chan<- blob.SizedRef) error { - return c.EnumerateBlobsOpts(ch, EnumerateOpts{}) +func (c *Client) SimpleEnumerateBlobs(ctx *context.Context, ch chan<- blob.SizedRef) error { + return c.EnumerateBlobsOpts(ctx, ch, EnumerateOpts{}) } -func (c *Client) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (c *Client) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { if limit == 0 { log.Printf("Warning: Client.EnumerateBlobs called with a limit of zero") close(dest) return nil } - return c.EnumerateBlobsOpts(dest, EnumerateOpts{ + return c.EnumerateBlobsOpts(ctx, dest, EnumerateOpts{ After: after, Limit: limit, }) @@ -52,7 +53,7 @@ func (c *Client) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit i const enumerateBatchSize = 1000 // Note: closes ch. -func (c *Client) EnumerateBlobsOpts(ch chan<- blob.SizedRef, opts EnumerateOpts) error { +func (c *Client) EnumerateBlobsOpts(ctx *context.Context, ch chan<- blob.SizedRef, opts EnumerateOpts) error { defer close(ch) if opts.After != "" && opts.MaxWait != 0 { return errors.New("client error: it's invalid to use enumerate After and MaxWaitSec together") @@ -115,7 +116,11 @@ func (c *Client) EnumerateBlobsOpts(ch chan<- blob.SizedRef, opts EnumerateOpts) if !ok { return error("item in 'blobs' had invalid blobref.", nil) } - ch <- blob.SizedRef{Ref: br, Size: size} + select { + case ch <- blob.SizedRef{Ref: br, Size: size}: + case <-ctx.Done(): + return context.ErrCanceled + } nSent++ if opts.Limit == nSent { // nSent can't be zero at this point, so opts.Limit being 0 diff --git a/pkg/context/context.go b/pkg/context/context.go new file mode 100644 index 000000000..8d3d9ae07 --- /dev/null +++ b/pkg/context/context.go @@ -0,0 +1,83 @@ +/* +Copyright 2013 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Packaage context provides a Context type to propagate state and cancellation +// information. +package context + +import ( + "errors" + "sync" +) + +// ErrCanceled may be returned by code when it receives from a Context.Done channel. +var ErrCanceled = errors.New("canceled") + +// TODO returns a dummy context. It's a signal that the caller code is not yet correct, +// and needs its own context to propagate. +func TODO() *Context { + return nil +} + +// A Context is carries state and cancellation information between calls. +// A nil Context pointer is valid, for now. +type Context struct { + cancelOnce sync.Once + done chan struct{} +} + +func New() *Context { + return &Context{ + done: make(chan struct{}), + } +} + +// New returns a child context attached to the receiver parent context c. +// The returned context is done when the parent is done, but the returned child +// context can be canceled indepedently without affecting the parent. +func (c *Context) New() *Context { + subc := New() + if c == nil { + return subc + } + go func() { + <-c.Done() + subc.Cancel() + }() + return subc +} + +// Done returns a channel that is closed when the Context is cancelled +// or finished. +func (c *Context) Done() <-chan struct{} { + if c == nil { + return nil + } + return c.done +} + +func (c *Context) Cancel() { + if c == nil { + return + } + c.cancelOnce.Do(c.cancel) +} + +func (c *Context) cancel() { + if c.done != nil { + close(c.done) + } +} diff --git a/pkg/index/enumstat.go b/pkg/index/enumstat.go index 7899076df..f413ab827 100644 --- a/pkg/index/enumstat.go +++ b/pkg/index/enumstat.go @@ -22,26 +22,41 @@ import ( "strings" "camlistore.org/pkg/blob" + "camlistore.org/pkg/context" "camlistore.org/pkg/sorted" ) -func (ix *Index) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (ix *Index) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) (err error) { defer close(dest) it := ix.s.Find("have:" + after) + defer func() { + closeErr := it.Close() + if err == nil { + err = closeErr + } + }() + n := int(0) for n < limit && it.Next() { k := it.Key() + if k <= after { + continue + } if !strings.HasPrefix(k, "have:") { break } n++ br, ok := blob.Parse(k[len("have:"):]) - size, err := strconv.ParseInt(it.Value(), 10, 64) + size, err := strconv.ParseUint(it.Value(), 10, 32) if ok && err == nil { - dest <- blob.SizedRef{br, size} + select { + case dest <- blob.SizedRef{br, int64(size)}: + case <-ctx.Done(): + return context.ErrCanceled + } } } - return it.Close() + return nil } func (ix *Index) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error { diff --git a/pkg/server/sync.go b/pkg/server/sync.go index 8fd164bfb..b024be579 100644 --- a/pkg/server/sync.go +++ b/pkg/server/sync.go @@ -30,6 +30,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/readerutil" "camlistore.org/pkg/sorted" @@ -141,7 +142,7 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, go func() { n := sh.runSync("queue", sh.enumerateQueuedBlobs) sh.logf("Queue sync copied %d blobs", n) - n = sh.runSync("full", blobserverEnumerator(fromBs)) + n = sh.runSync("full", blobserverEnumerator(context.TODO(), fromBs)) sh.logf("Full sync copied %d blobs", n) didFullSync <- true sh.syncQueueLoop() @@ -286,9 +287,9 @@ type copyResult struct { err error } -func blobserverEnumerator(src blobserver.BlobEnumerator) func(chan<- blob.SizedRef, <-chan struct{}) error { +func blobserverEnumerator(ctx *context.Context, src blobserver.BlobEnumerator) func(chan<- blob.SizedRef, <-chan struct{}) error { return func(dst chan<- blob.SizedRef, intr <-chan struct{}) error { - return blobserver.EnumerateAll(src, func(sb blob.SizedRef) error { + return blobserver.EnumerateAll(ctx, src, func(sb blob.SizedRef) error { select { case dst <- sb: case <-intr: diff --git a/pkg/test/fetcher.go b/pkg/test/fetcher.go index a488f1cda..e43833b0d 100644 --- a/pkg/test/fetcher.go +++ b/pkg/test/fetcher.go @@ -27,6 +27,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" "camlistore.org/pkg/types" ) @@ -142,7 +143,7 @@ func (tf *Fetcher) BlobrefStrings() []string { return s } -func (tf *Fetcher) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (tf *Fetcher) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) tf.l.Lock() defer tf.l.Unlock() @@ -152,7 +153,11 @@ func (tf *Fetcher) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit continue } b := tf.m[k] - dest <- blob.SizedRef{b.BlobRef(), b.Size()} + select { + case dest <- blob.SizedRef{b.BlobRef(), b.Size()}: + case <-ctx.Done(): + return context.ErrCanceled + } n++ if limit > 0 && n == limit { break diff --git a/server/appengine/camli/storage.go b/server/appengine/camli/storage.go index a4a6c4d9b..dc2701429 100644 --- a/server/appengine/camli/storage.go +++ b/server/appengine/camli/storage.go @@ -33,6 +33,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/context" "camlistore.org/pkg/jsonconfig" ) @@ -340,19 +341,19 @@ func (sto *appengineStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.R return err } -func (sto *appengineStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { +func (sto *appengineStorage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) loan := ctxPool.Get() defer loan.Return() - ctx := loan + actx := loan prefix := sto.namespace + "|" - keyBegin := datastore.NewKey(ctx, memKind, prefix+after, 0, nil) - keyEnd := datastore.NewKey(ctx, memKind, sto.namespace+"~", 0, nil) + keyBegin := datastore.NewKey(actx, memKind, prefix+after, 0, nil) + keyEnd := datastore.NewKey(actx, memKind, sto.namespace+"~", 0, nil) q := datastore.NewQuery(memKind).Limit(int(limit)).Filter("__key__>", keyBegin).Filter("__key__<", keyEnd) - it := q.Run(ctx) + it := q.Run(actx) var row memEnt for { key, err := it.Next(&row) @@ -362,7 +363,11 @@ func (sto *appengineStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after str if err != nil { return err } - dest <- blob.SizedRef{blob.ParseOrZero(key.StringID()[len(prefix):]), row.Size} + select { + case dest <- blob.SizedRef{blob.ParseOrZero(key.StringID()[len(prefix):]), row.Size}: + case <-ctx.Done(): + return context.ErrCanceled + } } return nil }