From 13a21a61df57dbc90ac0e15d538dd924074afe29 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 8 Sep 2013 16:35:56 -0700 Subject: [PATCH] replica: allow optional readBackends config option. Default is same as write backends. Change-Id: I8aae98f76449c41d937469cf91d6788c348807ed --- pkg/blobserver/replica/replica.go | 41 ++++++++++++++++++++----------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/pkg/blobserver/replica/replica.go b/pkg/blobserver/replica/replica.go index 1d840eceb..63d7b9dc0 100644 --- a/pkg/blobserver/replica/replica.go +++ b/pkg/blobserver/replica/replica.go @@ -50,9 +50,14 @@ import ( const buffered = 8 type replicaStorage struct { + // Replicas for writing: replicaPrefixes []string replicas []blobserver.Storage + // Replicas for reading: + readPrefixes []string + readReplicas []blobserver.Storage + // Minimum number of writes that must succeed before // acknowledging success to the client. minWritesForSuccess int @@ -63,6 +68,7 @@ type replicaStorage struct { func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) { sto := &replicaStorage{ replicaPrefixes: config.RequiredList("backends"), + readPrefixes: config.OptionalList("readBackends"), } nReplicas := len(sto.replicaPrefixes) sto.minWritesForSuccess = config.OptionalInt("minWritesForSuccess", nReplicas) @@ -75,19 +81,31 @@ func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobser if sto.minWritesForSuccess == 0 { sto.minWritesForSuccess = nReplicas } - sto.replicas = make([]blobserver.Storage, nReplicas) - for i, prefix := range sto.replicaPrefixes { - replicaSto, err := ld.GetStorage(prefix) + // readPrefixes defaults to the write prefixes. + if len(sto.readPrefixes) == 0 { + sto.readPrefixes = sto.replicaPrefixes + } + + for _, prefix := range sto.replicaPrefixes { + s, err := ld.GetStorage(prefix) if err != nil { return nil, err } - sto.replicas[i] = replicaSto + sto.replicas = append(sto.replicas, s) + } + for _, prefix := range sto.readPrefixes { + s, err := ld.GetStorage(prefix) + if err != nil { + return nil, err + } + sto.readReplicas = append(sto.readReplicas, s) } return sto, nil } func (sto *replicaStorage) FetchStreaming(b blob.Ref) (file io.ReadCloser, size int64, err error) { - for _, replica := range sto.replicas { + // TODO: race these? first to respond? + for _, replica := range sto.readReplicas { file, size, err = replica.FetchStreaming(b) if err == nil { return @@ -96,8 +114,7 @@ func (sto *replicaStorage) FetchStreaming(b blob.Ref) (file io.ReadCloser, size return } -// StatBlobs stats all replicas. -// See TODO on EnumerateBlobs. +// StatBlobs stats all read replicas. func (sto *replicaStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error { need := make(map[blob.Ref]bool) for _, br := range blobs { @@ -122,12 +139,12 @@ func (sto *replicaStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref errc <- s.StatBlobs(ch, blobs) } - for _, replica := range sto.replicas { + for _, replica := range sto.readReplicas { go statReplica(replica) } var retErr error - for _ = range sto.replicas { + for _ = range sto.readReplicas { if err := <-errc; err != nil { retErr = err } @@ -231,11 +248,7 @@ func (sto *replicaStorage) RemoveBlobs(blobs []blob.Ref) error { } func (sto *replicaStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error { - // TODO: option to enumerate from one or from all merged. for - // now we'll just do all, even though it's kinda a waste. at - // least then we don't miss anything if a certain node is - // missing some blobs temporarily - return blobserver.MergedEnumerate(dest, sto.replicas, after, limit) + return blobserver.MergedEnumerate(dest, sto.readReplicas, after, limit) } func init() {