replica: change receive strategy, fixing a TODO, and start of tests

Change-Id: I74c1206ce4bb34ddffe2601cf95fce393116c303
This commit is contained in:
Brad Fitzpatrick 2014-01-18 21:16:24 -08:00
parent cbed03b187
commit cfeeda0fa9
3 changed files with 153 additions and 39 deletions

View File

@ -39,9 +39,7 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"time"
"camlistore.org/pkg/blob"
@ -66,8 +64,6 @@ type replicaStorage struct {
// Minimum number of writes that must succeed before
// acknowledging success to the client.
minWritesForSuccess int
ctx *http.Request // optional per-request context
}
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) {
@ -172,45 +168,35 @@ func (sto *replicaStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref
}
type sizedBlobAndError struct {
idx int
sb blob.SizedRef
err error
}
func (sto *replicaStorage) ReceiveBlob(b blob.Ref, source io.Reader) (_ blob.SizedRef, err error) {
nReplicas := len(sto.replicas)
rpipe, wpipe, writer := make([]*io.PipeReader, nReplicas), make([]*io.PipeWriter, nReplicas), make([]io.Writer, nReplicas)
for idx := range sto.replicas {
rpipe[idx], wpipe[idx] = io.Pipe()
writer[idx] = wpipe[idx]
// TODO: deal with slow/hung clients. this scheme of pipes +
// multiwriter (even with a bufio.Writer thrown in) isn't
// sufficient to guarantee forward progress. perhaps something
// like &MoveOrDieWriter{Writer: wpipe[idx], HeartbeatSec: 10}
}
upResult := make(chan sizedBlobAndError, nReplicas)
uploadToReplica := func(source io.Reader, dst blobserver.BlobReceiver) {
sb, err := blobserver.Receive(dst, b, source)
if err != nil {
io.Copy(ioutil.Discard, source)
}
upResult <- sizedBlobAndError{sb, err}
}
for idx, replica := range sto.replicas {
go uploadToReplica(rpipe[idx], replica)
}
size, err := io.Copy(io.MultiWriter(writer...), source)
func (sto *replicaStorage) ReceiveBlob(br blob.Ref, src io.Reader) (_ blob.SizedRef, err error) {
// Slurp the whole blob before replicating. Bounded by 16 MB anyway.
var buf bytes.Buffer
size, err := io.Copy(&buf, src)
if err != nil {
for i := range wpipe {
wpipe[i].CloseWithError(err)
}
return
}
for idx := range sto.replicas {
wpipe[idx].Close()
nReplicas := len(sto.replicas)
resc := make(chan sizedBlobAndError, nReplicas)
uploadToReplica := func(idx int, dst blobserver.BlobReceiver) {
// Using ReceiveNoHash because it's already been
// verified implicitly by the io.Copy above:
sb, err := blobserver.ReceiveNoHash(dst, br, bytes.NewReader(buf.Bytes()))
resc <- sizedBlobAndError{idx, sb, err}
}
nSuccess, nFailures := 0, 0
for idx, replica := range sto.replicas {
go uploadToReplica(idx, replica)
}
nSuccess := 0
var fails []sizedBlobAndError
for _ = range sto.replicas {
res := <-upResult
res := <-resc
switch {
case res.err == nil && res.sb.Size == size:
nSuccess++
@ -218,16 +204,19 @@ func (sto *replicaStorage) ReceiveBlob(b blob.Ref, source io.Reader) (_ blob.Siz
return res.sb, nil
}
case res.err == nil:
nFailures++
err = fmt.Errorf("replica: upload shard reported size %d, expected %d", res.sb.Size, size)
res.err = err
fails = append(fails, res)
default:
nFailures++
err = res.err
fails = append(fails, res)
}
}
if nFailures > 0 {
log.Printf("replica: receiving blob, %d successes, %d failures; last error = %v",
nSuccess, nFailures, err)
for _, res := range fails {
log.Printf("replica: receiving blob %v, %d successes, %d failures; backend %s reported: %v",
br,
nSuccess, len(fails),
sto.replicaPrefixes[res.idx], res.err)
}
return
}

View File

@ -0,0 +1,63 @@
/*
Copyright 2014 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package replica
import (
"testing"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/test"
)
func newReplica(t *testing.T, config jsonconfig.Obj) *replicaStorage {
ld := test.NewLoader()
sto, err := newFromConfig(ld, config)
if err != nil {
t.Fatalf("Invalid config: %v", err)
}
return sto.(*replicaStorage)
}
func TestReceive(t *testing.T) {
sto := newReplica(t, map[string]interface{}{
"backends": []interface{}{"/good-1/", "/good-2/"},
})
tb := &test.Blob{Contents: "stuff"}
tbRef := tb.BlobRef()
sb, err := blobserver.Receive(sto, tbRef, tb.Reader())
if err != nil {
t.Fatalf("Receive: %v", err)
}
if int(sb.Size) != len(tb.Contents) {
t.Fatalf("size = %d; want %d", sb.Size, len(tb.Contents))
}
if sb.Ref != tbRef {
t.Fatal("wrong blob received")
}
if len(sto.replicas) != 2 {
t.Fatalf("replicas = %d; want 2", len(sto.replicas))
}
for i, rep := range sto.replicas {
got, err := blobserver.StatBlob(rep, sb.Ref)
if err != nil {
t.Errorf("Replica %s got stat error %v", sto.replicaPrefixes[i], err)
} else if got != sb {
t.Errorf("Replica %s got %+v; want %+v", sto.replicaPrefixes[i], got, sb)
}
}
}

62
pkg/test/loader.go Normal file
View File

@ -0,0 +1,62 @@
/*
Copyright 2014 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package test
import (
"errors"
"log"
"strings"
"sync"
"camlistore.org/pkg/blobserver"
)
// NewLoader
func NewLoader() *Loader {
return &Loader{}
}
type Loader struct {
mu sync.Mutex
}
var _ blobserver.Loader = (*Loader)(nil)
func (ld *Loader) FindHandlerByType(handlerType string) (prefix string, handler interface{}, err error) {
panic("NOIMPL")
}
func (ld *Loader) MyPrefix() string {
return "/lies/"
}
func (ld *Loader) GetHandlerType(prefix string) string {
log.Printf("test.Loader: GetHandlerType called but not implemented.")
return ""
}
func (ld *Loader) GetHandler(prefix string) (interface{}, error) {
log.Printf("test.Loader: GetHandler called but not implemented.")
return nil, errors.New("doesn't exist")
}
func (ld *Loader) GetStorage(prefix string) (blobserver.Storage, error) {
if strings.HasPrefix(prefix, "/good") {
return &Fetcher{}, nil
}
panic("test.Loader.GetStorage: unrecognized prefix type")
}