diff --git a/pkg/blobserver/blobhub.go b/pkg/blobserver/blobhub.go index 40837b9e3..99a143f2c 100644 --- a/pkg/blobserver/blobhub.go +++ b/pkg/blobserver/blobhub.go @@ -33,13 +33,13 @@ type BlobHub interface { // // If any synchronous receive hooks are registered, they're run before // NotifyBlobReceived returns and their error is returned. - NotifyBlobReceived(blob blob.Ref) error + NotifyBlobReceived(blob.SizedRef) error // AddReceiveHook adds a hook that is synchronously run // whenever blobs are received. All registered hooks are run // on each blob upload but if more than one returns an error, // NotifyBlobReceived will only return one of the errors. - AddReceiveHook(func(blob.Ref) error) + AddReceiveHook(func(blob.SizedRef) error) RegisterListener(ch chan<- blob.Ref) UnregisterListener(ch chan<- blob.Ref) @@ -107,15 +107,17 @@ func WaitForBlob(storage interface{}, deadline time.Time, blobs []blob.Ref) { type memHub struct { mu sync.RWMutex - hooks []func(blob.Ref) error + hooks []func(blob.SizedRef) error listeners map[chan<- blob.Ref]bool blobListeners map[blob.Ref]map[chan<- blob.Ref]bool } -func (h *memHub) NotifyBlobReceived(br blob.Ref) error { +func (h *memHub) NotifyBlobReceived(sb blob.SizedRef) error { h.mu.RLock() defer h.mu.RUnlock() + br := sb.Ref + // Callback channels to notify, nil until non-empty var notify []chan<- blob.Ref @@ -145,7 +147,7 @@ func (h *memHub) NotifyBlobReceived(br blob.Ref) error { var ret error for _, hook := range h.hooks { - if err := hook(br); err != nil && ret == nil { + if err := hook(sb); err != nil && ret == nil { ret = err } } @@ -200,7 +202,7 @@ func (h *memHub) UnregisterBlobListener(br blob.Ref, ch chan<- blob.Ref) { } } -func (h *memHub) AddReceiveHook(hook func(blob.Ref) error) { +func (h *memHub) AddReceiveHook(hook func(blob.SizedRef) error) { h.mu.Lock() defer h.mu.Unlock() h.hooks = append(h.hooks, hook) diff --git a/pkg/blobserver/blobhub_test.go b/pkg/blobserver/blobhub_test.go index 324c6c159..088e04487 100644 --- a/pkg/blobserver/blobhub_test.go +++ b/pkg/blobserver/blobhub_test.go @@ -17,10 +17,11 @@ limitations under the License. package blobserver import ( - "camlistore.org/pkg/blob" - . "camlistore.org/pkg/test/asserts" "testing" "time" + + "camlistore.org/pkg/blob" + . "camlistore.org/pkg/test/asserts" ) func TestHubRegistration(t *testing.T) { @@ -72,14 +73,14 @@ func TestHubFiring(t *testing.T) { blob1 := blob.MustParse("sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") blobsame := blob.MustParse("sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33") - hub.NotifyBlobReceived(blob1) // no-op + hub.NotifyBlobReceived(blob.SizedRef{blob1, 123}) // no-op hub.RegisterListener(ch) hub.RegisterBlobListener(blob1, bch) - hub.NotifyBlobReceived(blobsame) + hub.NotifyBlobReceived(blob.SizedRef{blobsame, 456}) - tmr1 := time.NewTimer(1e9) + tmr1 := time.NewTimer(1 * time.Second) select { case <-tmr1.C: t.Fatal("timer expired on receiving from ch") diff --git a/pkg/blobserver/receive.go b/pkg/blobserver/receive.go index 48d3d6622..820702aa4 100644 --- a/pkg/blobserver/receive.go +++ b/pkg/blobserver/receive.go @@ -50,7 +50,7 @@ func receive(dst BlobReceiver, br blob.Ref, src io.Reader, checkHash bool) (sb b if err != nil { return } - err = GetHub(dst).NotifyBlobReceived(sb.Ref) + err = GetHub(dst).NotifyBlobReceived(sb) return } diff --git a/pkg/server/sync.go b/pkg/server/sync.go index 9a086cc57..883a4416a 100644 --- a/pkg/server/sync.go +++ b/pkg/server/sync.go @@ -142,6 +142,7 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, go synch.syncQueueLoop() } + blobserver.GetHub(fromBs).AddReceiveHook(synch.enqueue) return synch, nil } @@ -472,12 +473,21 @@ func (sh *SyncHandler) ReceiveBlob(br blob.Ref, r io.Reader) (sb blob.SizedRef, if err != nil { return } + sb = blob.SizedRef{br, n} + return sb, sh.enqueue(sb) +} + +func (sh *SyncHandler) enqueue(sb blob.SizedRef) error { // TODO: include current time in encoded value, to attempt to // do in-order delivery to remote side later? Possible // friendly optimization later. Might help peer's indexer have // less missing deps. - err = sh.queue.Set(br.String(), fmt.Sprint(n)) - return blob.SizedRef{br, n}, err + if err := sh.queue.Set(sb.Ref.String(), fmt.Sprint(sb.Size)); err != nil { + return err + } + // TODO(bradfitz): non-blocking send to wake up looping + // goroutine if it's sleeping. + return nil } // TODO(bradfitz): implement these? what do they mean? possibilities: