sync: have handler register receive hook with its source.

Lets legacy configs work, even without replicating directly to it.

Change-Id: I8bdb8651040794ae346f19d6dd67a0da07505f07
This commit is contained in:
Brad Fitzpatrick 2013-11-24 16:20:11 -08:00
parent b9fd11a8f6
commit cf388c2f2a
4 changed files with 27 additions and 14 deletions

View File

@ -33,13 +33,13 @@ type BlobHub interface {
//
// If any synchronous receive hooks are registered, they're run before
// NotifyBlobReceived returns and their error is returned.
NotifyBlobReceived(blob blob.Ref) error
NotifyBlobReceived(blob.SizedRef) error
// AddReceiveHook adds a hook that is synchronously run
// whenever blobs are received. All registered hooks are run
// on each blob upload but if more than one returns an error,
// NotifyBlobReceived will only return one of the errors.
AddReceiveHook(func(blob.Ref) error)
AddReceiveHook(func(blob.SizedRef) error)
RegisterListener(ch chan<- blob.Ref)
UnregisterListener(ch chan<- blob.Ref)
@ -107,15 +107,17 @@ func WaitForBlob(storage interface{}, deadline time.Time, blobs []blob.Ref) {
type memHub struct {
mu sync.RWMutex
hooks []func(blob.Ref) error
hooks []func(blob.SizedRef) error
listeners map[chan<- blob.Ref]bool
blobListeners map[blob.Ref]map[chan<- blob.Ref]bool
}
func (h *memHub) NotifyBlobReceived(br blob.Ref) error {
func (h *memHub) NotifyBlobReceived(sb blob.SizedRef) error {
h.mu.RLock()
defer h.mu.RUnlock()
br := sb.Ref
// Callback channels to notify, nil until non-empty
var notify []chan<- blob.Ref
@ -145,7 +147,7 @@ func (h *memHub) NotifyBlobReceived(br blob.Ref) error {
var ret error
for _, hook := range h.hooks {
if err := hook(br); err != nil && ret == nil {
if err := hook(sb); err != nil && ret == nil {
ret = err
}
}
@ -200,7 +202,7 @@ func (h *memHub) UnregisterBlobListener(br blob.Ref, ch chan<- blob.Ref) {
}
}
func (h *memHub) AddReceiveHook(hook func(blob.Ref) error) {
func (h *memHub) AddReceiveHook(hook func(blob.SizedRef) error) {
h.mu.Lock()
defer h.mu.Unlock()
h.hooks = append(h.hooks, hook)

View File

@ -17,10 +17,11 @@ limitations under the License.
package blobserver
import (
"camlistore.org/pkg/blob"
. "camlistore.org/pkg/test/asserts"
"testing"
"time"
"camlistore.org/pkg/blob"
. "camlistore.org/pkg/test/asserts"
)
func TestHubRegistration(t *testing.T) {
@ -72,14 +73,14 @@ func TestHubFiring(t *testing.T) {
blob1 := blob.MustParse("sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33")
blobsame := blob.MustParse("sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33")
hub.NotifyBlobReceived(blob1) // no-op
hub.NotifyBlobReceived(blob.SizedRef{blob1, 123}) // no-op
hub.RegisterListener(ch)
hub.RegisterBlobListener(blob1, bch)
hub.NotifyBlobReceived(blobsame)
hub.NotifyBlobReceived(blob.SizedRef{blobsame, 456})
tmr1 := time.NewTimer(1e9)
tmr1 := time.NewTimer(1 * time.Second)
select {
case <-tmr1.C:
t.Fatal("timer expired on receiving from ch")

View File

@ -50,7 +50,7 @@ func receive(dst BlobReceiver, br blob.Ref, src io.Reader, checkHash bool) (sb b
if err != nil {
return
}
err = GetHub(dst).NotifyBlobReceived(sb.Ref)
err = GetHub(dst).NotifyBlobReceived(sb)
return
}

View File

@ -142,6 +142,7 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler,
go synch.syncQueueLoop()
}
blobserver.GetHub(fromBs).AddReceiveHook(synch.enqueue)
return synch, nil
}
@ -472,12 +473,21 @@ func (sh *SyncHandler) ReceiveBlob(br blob.Ref, r io.Reader) (sb blob.SizedRef,
if err != nil {
return
}
sb = blob.SizedRef{br, n}
return sb, sh.enqueue(sb)
}
func (sh *SyncHandler) enqueue(sb blob.SizedRef) error {
// TODO: include current time in encoded value, to attempt to
// do in-order delivery to remote side later? Possible
// friendly optimization later. Might help peer's indexer have
// less missing deps.
err = sh.queue.Set(br.String(), fmt.Sprint(n))
return blob.SizedRef{br, n}, err
if err := sh.queue.Set(sb.Ref.String(), fmt.Sprint(sb.Size)); err != nil {
return err
}
// TODO(bradfitz): non-blocking send to wake up looping
// goroutine if it's sleeping.
return nil
}
// TODO(bradfitz): implement these? what do they mean? possibilities: