Replica blob handler for live replication.

This commit is contained in:
Brad Fitzpatrick 2011-05-22 21:22:21 -07:00
parent 11c959caab
commit ccf33926c8
5 changed files with 273 additions and 8 deletions

View File

@ -424,6 +424,7 @@ TARGET: lib/go/camli/blobserver
TARGET: lib/go/camli/blobserver/handlers
TARGET: lib/go/camli/blobserver/localdisk
TARGET: lib/go/camli/blobserver/remote
TARGET: lib/go/camli/blobserver/replica
TARGET: lib/go/camli/blobserver/shard
TARGET: lib/go/camli/blobserver/s3
TARGET: lib/go/camli/client

View File

@ -61,6 +61,35 @@
}
},
"/repl/": {
"handler": "replica",
"handlerArgs": {
"backends": ["/r1/", "/r2/", "/r3/"],
"minWritesForSuccess": 2
}
},
"/r1/": {
"handler": "filesystem",
"handlerArgs": {
"path": ["_env", "${CAMLI_ROOT_REPLICA1}"]
}
},
"/r2/": {
"handler": "filesystem",
"handlerArgs": {
"path": ["_env", "${CAMLI_ROOT_REPLICA2}"]
}
},
"/r3/": {
"handler": "filesystem",
"handlerArgs": {
"path": ["_env", "${CAMLI_ROOT_REPLICA3}"]
}
},
"/indexer/": {
"handler": "mysqlindexer",
"handlerArgs": {

View File

@ -27,12 +27,15 @@ if ($opt_wipe && -d $root) {
print "Wiping $root\n";
system("rm", "-rf", $root) and die "Failed to wipe $root.\n";
}
my ($bsroot, $s1root, $s2root) = ("$root/bs", "$root/s1", "$root/s2");
for my $d ($bsroot, $s1root, $s2root) {
unless (-d $d) {
system("mkdir", "-p", $d) and die "Failed to create $d.\n";
my $suffixdir = sub {
my $suffix = shift;
my $root = "$root/$suffix";
unless (-d $root) {
system("mkdir", "-p", $root) and die "Failed to create $root.\n";
}
}
return $root;
};
my $DBNAME = "devcamlistore";
my @opts;
@ -62,9 +65,12 @@ print "Starting dev server on $base/ui/ with password \"pass$port\"\n";
$ENV{CAMLI_BASEURL} = $base;
$ENV{CAMLI_PASSWORD} = "pass$port";
$ENV{CAMLI_ROOT} = $bsroot;
$ENV{CAMLI_ROOT_SHARD1} = $s1root;
$ENV{CAMLI_ROOT_SHARD2} = $s2root;
$ENV{CAMLI_ROOT} = $suffixdir->("bs");
$ENV{CAMLI_ROOT_SHARD1} = $suffixdir->("s1");
$ENV{CAMLI_ROOT_SHARD2} = $suffixdir->("s2");
$ENV{CAMLI_ROOT_REPLICA1} = $suffixdir->("r1");
$ENV{CAMLI_ROOT_REPLICA2} = $suffixdir->("r2");
$ENV{CAMLI_ROOT_REPLICA3} = $suffixdir->("r3");
$ENV{CAMLI_PORT} = $port;
$ENV{CAMLI_SECRET_RING} = "$Bin/lib/go/camli/jsonsign/testdata/test-secring.gpg";
exec("$FindBin::Bin/server/go/camlistored/camlistored",

View File

@ -0,0 +1,228 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replica
import (
"fmt"
"io"
"log"
"os"
"camli/blobref"
"camli/blobserver"
"camli/jsonconfig"
)
var _ = log.Printf
const buffered = 8
type replicaStorage struct {
*blobserver.SimpleBlobHubPartitionMap
replicaPrefixes []string
replicas []blobserver.Storage
// Minimum number of writes that must succeed before
// acknowledging success to the client.
minWritesForSuccess int
}
func (sto *replicaStorage) GetBlobHub() blobserver.BlobHub {
return sto.SimpleBlobHubPartitionMap.GetBlobHub()
}
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err os.Error) {
sto := &replicaStorage{
SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{},
}
sto.replicaPrefixes = config.RequiredList("backends")
nReplicas := len(sto.replicaPrefixes)
sto.minWritesForSuccess = config.OptionalInt("minWritesForSuccess", nReplicas)
if err := config.Validate(); err != nil {
return nil, err
}
if nReplicas == 0 {
return nil, os.NewError("replica: need at least one replica")
}
if sto.minWritesForSuccess == 0 {
sto.minWritesForSuccess = nReplicas
}
sto.replicas = make([]blobserver.Storage, nReplicas)
for i, prefix := range sto.replicaPrefixes {
replicaSto, err := ld.GetStorage(prefix)
if err != nil {
return nil, err
}
sto.replicas[i] = replicaSto
}
return sto, nil
}
func (sto *replicaStorage) weightedRandomReplicas() []blobserver.Storage {
// TODO: implement something actually weighted or random.
return sto.replicas
}
func (sto *replicaStorage) FetchStreaming(b *blobref.BlobRef) (file io.ReadCloser, size int64, err os.Error) {
for _, replica := range sto.weightedRandomReplicas() {
file, size, err = replica.FetchStreaming(b)
if err == nil {
return
}
}
return
}
func (sto *replicaStorage) Stat(dest chan<- blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) os.Error {
if waitSeconds > 0 {
// TODO: handle waitSeconds in-memory, waiting on the blobhub, not going
// to the replicas?
}
need := make(map[string]*blobref.BlobRef)
for _, br := range blobs {
need[br.String()] = br
}
ch := make(chan blobref.SizedBlobRef, buffered)
donechan := make(chan bool)
go func() {
for sb := range ch {
bstr := sb.BlobRef.String()
if _, needed := need[bstr]; needed {
dest <- sb
need[bstr] = nil, false
}
}
donechan <- true
}()
errch := make(chan os.Error, buffered)
statReplica := func(s blobserver.Storage) {
errch <- s.Stat(ch, blobs, waitSeconds)
}
for _, replica := range sto.replicas {
go statReplica(replica)
}
var retErr os.Error
for _ = range sto.replicas {
if err := <-errch; err != nil {
retErr = err
}
}
close(ch)
<-donechan
// Safe to access need map now; as helper goroutine is
// done with it.
if len(need) == 0 {
return nil
}
return retErr
}
type sizedBlobAndError struct {
sb blobref.SizedBlobRef
err os.Error
}
// TODO-GO: s/xxgo/_/ once Go issue 1802 is fixd
func (sto *replicaStorage) ReceiveBlob(b *blobref.BlobRef, source io.Reader) (xxgo blobref.SizedBlobRef, err os.Error) {
nReplicas := len(sto.replicas)
rpipe, wpipe, writer := make([]*io.PipeReader, nReplicas), make([]*io.PipeWriter, nReplicas), make([]io.Writer, nReplicas)
for idx := range sto.replicas {
rpipe[idx], wpipe[idx] = io.Pipe()
writer[idx] = wpipe[idx]
// TODO: deal with slow/hung clients. this scheme of pipes +
// multiwriter (even with a bufio.Writer thrown in) isn't
// sufficient to guarantee forward progress. perhaps something
// like &MoveOrDieWriter{Writer: wpipe[idx], HeartbeatSec: 10}
}
upResult := make(chan sizedBlobAndError, nReplicas)
uploadToReplica := func(source io.Reader, s blobserver.Storage) {
sb, err := s.ReceiveBlob(b, source)
upResult <- sizedBlobAndError{sb, err}
}
for idx, replica := range sto.replicas {
go uploadToReplica(rpipe[idx], replica)
}
size, err := io.Copy(io.MultiWriter(writer...), source)
if err != nil {
return
}
for idx := range sto.replicas {
wpipe[idx].Close()
}
nSuccess := 0
for _ = range sto.replicas {
switch res := <-upResult; {
case res.err == nil && res.sb.Size == size:
nSuccess++
if nSuccess == sto.minWritesForSuccess {
return res.sb, nil
}
case res.err == nil:
err = fmt.Errorf("replica: upload shard reported size %d, expected %d", res.sb.Size, size)
default:
err = res.err
}
}
return
}
func (sto *replicaStorage) Remove(blobs []*blobref.BlobRef) os.Error {
errch := make(chan os.Error, buffered)
removeFrom := func(s blobserver.Storage) {
errch <- s.Remove(blobs)
}
for _, replica := range sto.replicas {
go removeFrom(replica)
}
var reterr os.Error
nSuccess := 0
for _ = range errch {
if err := <-errch; err != nil {
reterr = err
} else {
nSuccess++
}
}
if nSuccess > 0 {
// TODO: decide on the return value. for now this is best
// effort and we return nil if any of the blobservers said
// success. maybe a bit weird, though.
return nil
}
return reterr
}
func (sto *replicaStorage) EnumerateBlobs(dest chan<- blobref.SizedBlobRef, after string, limit uint, waitSeconds int) os.Error {
// TODO: option to enumerate from one or from all merged. for
// now we'll just do all, even though it's kinda a waste. at
// least then we don't miss anything if a certain node is
// missing some blobs temporarily
return blobserver.MergedEnumerate(dest, sto.replicas, after, limit, waitSeconds)
}
func init() {
blobserver.RegisterStorageConstructor("replica", blobserver.StorageConstructor(newFromConfig))
}

View File

@ -40,6 +40,7 @@ import (
// Storage options:
_ "camli/blobserver/localdisk"
_ "camli/blobserver/remote"
_ "camli/blobserver/replica"
_ "camli/blobserver/s3"
_ "camli/blobserver/shard"
_ "camli/mysqlindexer" // indexer, but uses storage interface