Start of making QueueCreator optional for replication sources.

Change-Id: I82290991208c6e8953bdc63424760b378437674d
This commit is contained in:
Brad Fitzpatrick 2013-09-01 16:19:07 -07:00
parent 9ae1a64c38
commit 0457e9359a
1 changed files with 17 additions and 8 deletions

View File

@ -43,7 +43,8 @@ const (
// TODO: expose copierPoolSize as tunable
type SyncHandler struct {
fromName, fromqName, toName string
from, fromq, to blobserver.Storage
from, fromq blobserver.Storage
to blobserver.BlobReceiver
idle bool // if true, the handler does nothing other than providing the discovery.
@ -63,14 +64,15 @@ func init() {
blobserver.RegisterHandlerConstructor("sync", newSyncFromConfig)
}
func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (h http.Handler, err error) {
func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, error) {
from := conf.RequiredString("from")
to := conf.RequiredString("to")
fullSync := conf.OptionalBool("fullSyncOnStart", false)
blockFullSync := conf.OptionalBool("blockingFullSyncOnStart", false)
idle := conf.OptionalBool("idle", false)
if err = conf.Validate(); err != nil {
return
queueDir := conf.OptionalString("queueDir", "")
if err := conf.Validate(); err != nil {
return nil, err
}
if idle {
synch, err := createIdleSyncHandler(from, to)
@ -81,19 +83,26 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (h http.Handle
}
fromBs, err := ld.GetStorage(from)
if err != nil {
return
return nil, err
}
toBs, err := ld.GetStorage(to)
if err != nil {
return
return nil, err
}
fromQsc, ok := fromBs.(blobserver.StorageQueueCreator)
if !ok {
if queueDir != "" {
// TODO(bradfitz): finish implementing.
// Should create a a localdisk target on the
// queueDir and setup sync from that. Be sure
// to remove item from top-level TODO too.
}
return nil, fmt.Errorf("Prefix %s (type %T) does not support being efficient replication source (queueing)", from, fromBs)
}
synch, err := createSyncHandler(from, to, fromQsc, toBs)
if err != nil {
return
return nil, err
}
if fullSync || blockFullSync {
@ -136,7 +145,7 @@ type timestampedError struct {
err error
}
func createSyncHandler(fromName, toName string, from blobserver.StorageQueueCreator, to blobserver.Storage) (*SyncHandler, error) {
func createSyncHandler(fromName, toName string, from blobserver.StorageQueueCreator, to blobserver.BlobReceiver) (*SyncHandler, error) {
h := &SyncHandler{
copierPoolSize: 3,
from: from,