replica: allow optional readBackends config option. Default is same as write backends.

Change-Id: I8aae98f76449c41d937469cf91d6788c348807ed
This commit is contained in:
Brad Fitzpatrick 2013-09-08 16:35:56 -07:00
parent 97fab00c71
commit 13a21a61df
1 changed files with 27 additions and 14 deletions

View File

@ -50,9 +50,14 @@ import (
const buffered = 8
type replicaStorage struct {
// Replicas for writing:
replicaPrefixes []string
replicas []blobserver.Storage
// Replicas for reading:
readPrefixes []string
readReplicas []blobserver.Storage
// Minimum number of writes that must succeed before
// acknowledging success to the client.
minWritesForSuccess int
@ -63,6 +68,7 @@ type replicaStorage struct {
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
sto := &replicaStorage{
replicaPrefixes: config.RequiredList("backends"),
readPrefixes: config.OptionalList("readBackends"),
}
nReplicas := len(sto.replicaPrefixes)
sto.minWritesForSuccess = config.OptionalInt("minWritesForSuccess", nReplicas)
@ -75,19 +81,31 @@ func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobser
if sto.minWritesForSuccess == 0 {
sto.minWritesForSuccess = nReplicas
}
sto.replicas = make([]blobserver.Storage, nReplicas)
for i, prefix := range sto.replicaPrefixes {
replicaSto, err := ld.GetStorage(prefix)
// readPrefixes defaults to the write prefixes.
if len(sto.readPrefixes) == 0 {
sto.readPrefixes = sto.replicaPrefixes
}
for _, prefix := range sto.replicaPrefixes {
s, err := ld.GetStorage(prefix)
if err != nil {
return nil, err
}
sto.replicas[i] = replicaSto
sto.replicas = append(sto.replicas, s)
}
for _, prefix := range sto.readPrefixes {
s, err := ld.GetStorage(prefix)
if err != nil {
return nil, err
}
sto.readReplicas = append(sto.readReplicas, s)
}
return sto, nil
}
func (sto *replicaStorage) FetchStreaming(b blob.Ref) (file io.ReadCloser, size int64, err error) {
for _, replica := range sto.replicas {
// TODO: race these? first to respond?
for _, replica := range sto.readReplicas {
file, size, err = replica.FetchStreaming(b)
if err == nil {
return
@ -96,8 +114,7 @@ func (sto *replicaStorage) FetchStreaming(b blob.Ref) (file io.ReadCloser, size
return
}
// StatBlobs stats all replicas.
// See TODO on EnumerateBlobs.
// StatBlobs stats all read replicas.
func (sto *replicaStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
need := make(map[blob.Ref]bool)
for _, br := range blobs {
@ -122,12 +139,12 @@ func (sto *replicaStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref
errc <- s.StatBlobs(ch, blobs)
}
for _, replica := range sto.replicas {
for _, replica := range sto.readReplicas {
go statReplica(replica)
}
var retErr error
for _ = range sto.replicas {
for _ = range sto.readReplicas {
if err := <-errc; err != nil {
retErr = err
}
@ -231,11 +248,7 @@ func (sto *replicaStorage) RemoveBlobs(blobs []blob.Ref) error {
}
func (sto *replicaStorage) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
// TODO: option to enumerate from one or from all merged. for
// now we'll just do all, even though it's kinda a waste. at
// least then we don't miss anything if a certain node is
// missing some blobs temporarily
return blobserver.MergedEnumerate(dest, sto.replicas, after, limit)
return blobserver.MergedEnumerate(dest, sto.readReplicas, after, limit)
}
func init() {