From ccf33926c83be16d063408e5fdf758761fd34ee9 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 22 May 2011 21:22:21 -0700 Subject: [PATCH] Replica blob handler for live replication. --- build.pl | 1 + config/dev-server-config.json | 29 +++ dev-server | 22 +- lib/go/camli/blobserver/replica/replica.go | 228 +++++++++++++++++++++ server/go/camlistored/camlistored.go | 1 + 5 files changed, 273 insertions(+), 8 deletions(-) create mode 100644 lib/go/camli/blobserver/replica/replica.go diff --git a/build.pl b/build.pl index d3bd421d2..0bee9426a 100755 --- a/build.pl +++ b/build.pl @@ -424,6 +424,7 @@ TARGET: lib/go/camli/blobserver TARGET: lib/go/camli/blobserver/handlers TARGET: lib/go/camli/blobserver/localdisk TARGET: lib/go/camli/blobserver/remote +TARGET: lib/go/camli/blobserver/replica TARGET: lib/go/camli/blobserver/shard TARGET: lib/go/camli/blobserver/s3 TARGET: lib/go/camli/client diff --git a/config/dev-server-config.json b/config/dev-server-config.json index 5b992bc20..1f93e5051 100644 --- a/config/dev-server-config.json +++ b/config/dev-server-config.json @@ -61,6 +61,35 @@ } }, + "/repl/": { + "handler": "replica", + "handlerArgs": { + "backends": ["/r1/", "/r2/", "/r3/"], + "minWritesForSuccess": 2 + } + }, + + "/r1/": { + "handler": "filesystem", + "handlerArgs": { + "path": ["_env", "${CAMLI_ROOT_REPLICA1}"] + } + }, + + "/r2/": { + "handler": "filesystem", + "handlerArgs": { + "path": ["_env", "${CAMLI_ROOT_REPLICA2}"] + } + }, + + "/r3/": { + "handler": "filesystem", + "handlerArgs": { + "path": ["_env", "${CAMLI_ROOT_REPLICA3}"] + } + }, + "/indexer/": { "handler": "mysqlindexer", "handlerArgs": { diff --git a/dev-server b/dev-server index 442ff60f4..f0f7308bd 100755 --- a/dev-server +++ b/dev-server @@ -27,12 +27,15 @@ if ($opt_wipe && -d $root) { print "Wiping $root\n"; system("rm", "-rf", $root) and die "Failed to wipe $root.\n"; } -my ($bsroot, $s1root, $s2root) = ("$root/bs", "$root/s1", "$root/s2"); -for my $d ($bsroot, $s1root, $s2root) { - unless (-d $d) { - system("mkdir", "-p", $d) and die "Failed to create $d.\n"; + +my $suffixdir = sub { + my $suffix = shift; + my $root = "$root/$suffix"; + unless (-d $root) { + system("mkdir", "-p", $root) and die "Failed to create $root.\n"; } -} + return $root; +}; my $DBNAME = "devcamlistore"; my @opts; @@ -62,9 +65,12 @@ print "Starting dev server on $base/ui/ with password \"pass$port\"\n"; $ENV{CAMLI_BASEURL} = $base; $ENV{CAMLI_PASSWORD} = "pass$port"; -$ENV{CAMLI_ROOT} = $bsroot; -$ENV{CAMLI_ROOT_SHARD1} = $s1root; -$ENV{CAMLI_ROOT_SHARD2} = $s2root; +$ENV{CAMLI_ROOT} = $suffixdir->("bs"); +$ENV{CAMLI_ROOT_SHARD1} = $suffixdir->("s1"); +$ENV{CAMLI_ROOT_SHARD2} = $suffixdir->("s2"); +$ENV{CAMLI_ROOT_REPLICA1} = $suffixdir->("r1"); +$ENV{CAMLI_ROOT_REPLICA2} = $suffixdir->("r2"); +$ENV{CAMLI_ROOT_REPLICA3} = $suffixdir->("r3"); $ENV{CAMLI_PORT} = $port; $ENV{CAMLI_SECRET_RING} = "$Bin/lib/go/camli/jsonsign/testdata/test-secring.gpg"; exec("$FindBin::Bin/server/go/camlistored/camlistored", diff --git a/lib/go/camli/blobserver/replica/replica.go b/lib/go/camli/blobserver/replica/replica.go new file mode 100644 index 000000000..f4689bd05 --- /dev/null +++ b/lib/go/camli/blobserver/replica/replica.go @@ -0,0 +1,228 @@ +/* +Copyright 2011 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replica + +import ( + "fmt" + "io" + "log" + "os" + + "camli/blobref" + "camli/blobserver" + "camli/jsonconfig" +) + +var _ = log.Printf + +const buffered = 8 + +type replicaStorage struct { + *blobserver.SimpleBlobHubPartitionMap + + replicaPrefixes []string + replicas []blobserver.Storage + + // Minimum number of writes that must succeed before + // acknowledging success to the client. + minWritesForSuccess int +} + +func (sto *replicaStorage) GetBlobHub() blobserver.BlobHub { + return sto.SimpleBlobHubPartitionMap.GetBlobHub() +} + +func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err os.Error) { + sto := &replicaStorage{ + SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{}, + } + sto.replicaPrefixes = config.RequiredList("backends") + nReplicas := len(sto.replicaPrefixes) + sto.minWritesForSuccess = config.OptionalInt("minWritesForSuccess", nReplicas) + if err := config.Validate(); err != nil { + return nil, err + } + if nReplicas == 0 { + return nil, os.NewError("replica: need at least one replica") + } + if sto.minWritesForSuccess == 0 { + sto.minWritesForSuccess = nReplicas + } + sto.replicas = make([]blobserver.Storage, nReplicas) + for i, prefix := range sto.replicaPrefixes { + replicaSto, err := ld.GetStorage(prefix) + if err != nil { + return nil, err + } + sto.replicas[i] = replicaSto + } + return sto, nil +} + +func (sto *replicaStorage) weightedRandomReplicas() []blobserver.Storage { + // TODO: implement something actually weighted or random. + return sto.replicas +} + +func (sto *replicaStorage) FetchStreaming(b *blobref.BlobRef) (file io.ReadCloser, size int64, err os.Error) { + for _, replica := range sto.weightedRandomReplicas() { + file, size, err = replica.FetchStreaming(b) + if err == nil { + return + } + } + return +} + +func (sto *replicaStorage) Stat(dest chan<- blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error { + if waitSeconds > 0 { + // TODO: handle waitSeconds in-memory, waiting on the blobhub, not going + // to the replicas? + } + + need := make(map[string]*blobref.BlobRef) + for _, br := range blobs { + need[br.String()] = br + } + + ch := make(chan blobref.SizedBlobRef, buffered) + donechan := make(chan bool) + + go func() { + for sb := range ch { + bstr := sb.BlobRef.String() + if _, needed := need[bstr]; needed { + dest <- sb + need[bstr] = nil, false + } + } + donechan <- true + }() + + errch := make(chan os.Error, buffered) + statReplica := func(s blobserver.Storage) { + errch <- s.Stat(ch, blobs, waitSeconds) + } + + for _, replica := range sto.replicas { + go statReplica(replica) + } + + var retErr os.Error + for _ = range sto.replicas { + if err := <-errch; err != nil { + retErr = err + } + } + close(ch) + <-donechan + + // Safe to access need map now; as helper goroutine is + // done with it. + if len(need) == 0 { + return nil + } + return retErr +} + +type sizedBlobAndError struct { + sb blobref.SizedBlobRef + err os.Error +} + +// TODO-GO: s/xxgo/_/ once Go issue 1802 is fixd +func (sto *replicaStorage) ReceiveBlob(b *blobref.BlobRef, source io.Reader) (xxgo blobref.SizedBlobRef, err os.Error) { + nReplicas := len(sto.replicas) + rpipe, wpipe, writer := make([]*io.PipeReader, nReplicas), make([]*io.PipeWriter, nReplicas), make([]io.Writer, nReplicas) + for idx := range sto.replicas { + rpipe[idx], wpipe[idx] = io.Pipe() + writer[idx] = wpipe[idx] + // TODO: deal with slow/hung clients. this scheme of pipes + + // multiwriter (even with a bufio.Writer thrown in) isn't + // sufficient to guarantee forward progress. perhaps something + // like &MoveOrDieWriter{Writer: wpipe[idx], HeartbeatSec: 10} + } + upResult := make(chan sizedBlobAndError, nReplicas) + uploadToReplica := func(source io.Reader, s blobserver.Storage) { + sb, err := s.ReceiveBlob(b, source) + upResult <- sizedBlobAndError{sb, err} + } + for idx, replica := range sto.replicas { + go uploadToReplica(rpipe[idx], replica) + } + size, err := io.Copy(io.MultiWriter(writer...), source) + if err != nil { + return + } + for idx := range sto.replicas { + wpipe[idx].Close() + } + nSuccess := 0 + for _ = range sto.replicas { + switch res := <-upResult; { + case res.err == nil && res.sb.Size == size: + nSuccess++ + if nSuccess == sto.minWritesForSuccess { + return res.sb, nil + } + case res.err == nil: + err = fmt.Errorf("replica: upload shard reported size %d, expected %d", res.sb.Size, size) + default: + err = res.err + } + } + return +} + +func (sto *replicaStorage) Remove(blobs []*blobref.BlobRef) os.Error { + errch := make(chan os.Error, buffered) + removeFrom := func(s blobserver.Storage) { + errch <- s.Remove(blobs) + } + for _, replica := range sto.replicas { + go removeFrom(replica) + } + var reterr os.Error + nSuccess := 0 + for _ = range errch { + if err := <-errch; err != nil { + reterr = err + } else { + nSuccess++ + } + } + if nSuccess > 0 { + // TODO: decide on the return value. for now this is best + // effort and we return nil if any of the blobservers said + // success. maybe a bit weird, though. + return nil + } + return reterr +} + + +func (sto *replicaStorage) EnumerateBlobs(dest chan<- blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error { + // TODO: option to enumerate from one or from all merged. for + // now we'll just do all, even though it's kinda a waste. at + // least then we don't miss anything if a certain node is + // missing some blobs temporarily + return blobserver.MergedEnumerate(dest, sto.replicas, after, limit, waitSeconds) +} + +func init() { + blobserver.RegisterStorageConstructor("replica", blobserver.StorageConstructor(newFromConfig)) +} diff --git a/server/go/camlistored/camlistored.go b/server/go/camlistored/camlistored.go index cbe60931e..b0cd842b0 100644 --- a/server/go/camlistored/camlistored.go +++ b/server/go/camlistored/camlistored.go @@ -40,6 +40,7 @@ import ( // Storage options: _ "camli/blobserver/localdisk" _ "camli/blobserver/remote" + _ "camli/blobserver/replica" _ "camli/blobserver/s3" _ "camli/blobserver/shard" _ "camli/mysqlindexer" // indexer, but uses storage interface