diff --git a/pkg/blobserver/replica/replica.go b/pkg/blobserver/replica/replica.go index 6e76aa76f..0785a8b99 100644 --- a/pkg/blobserver/replica/replica.go +++ b/pkg/blobserver/replica/replica.go @@ -39,9 +39,7 @@ import ( "errors" "fmt" "io" - "io/ioutil" "log" - "net/http" "time" "camlistore.org/pkg/blob" @@ -66,8 +64,6 @@ type replicaStorage struct { // Minimum number of writes that must succeed before // acknowledging success to the client. minWritesForSuccess int - - ctx *http.Request // optional per-request context } func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (storage blobserver.Storage, err error) { @@ -172,45 +168,35 @@ func (sto *replicaStorage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref } type sizedBlobAndError struct { + idx int sb blob.SizedRef err error } -func (sto *replicaStorage) ReceiveBlob(b blob.Ref, source io.Reader) (_ blob.SizedRef, err error) { - nReplicas := len(sto.replicas) - rpipe, wpipe, writer := make([]*io.PipeReader, nReplicas), make([]*io.PipeWriter, nReplicas), make([]io.Writer, nReplicas) - for idx := range sto.replicas { - rpipe[idx], wpipe[idx] = io.Pipe() - writer[idx] = wpipe[idx] - // TODO: deal with slow/hung clients. this scheme of pipes + - // multiwriter (even with a bufio.Writer thrown in) isn't - // sufficient to guarantee forward progress. perhaps something - // like &MoveOrDieWriter{Writer: wpipe[idx], HeartbeatSec: 10} - } - upResult := make(chan sizedBlobAndError, nReplicas) - uploadToReplica := func(source io.Reader, dst blobserver.BlobReceiver) { - sb, err := blobserver.Receive(dst, b, source) - if err != nil { - io.Copy(ioutil.Discard, source) - } - upResult <- sizedBlobAndError{sb, err} - } - for idx, replica := range sto.replicas { - go uploadToReplica(rpipe[idx], replica) - } - size, err := io.Copy(io.MultiWriter(writer...), source) +func (sto *replicaStorage) ReceiveBlob(br blob.Ref, src io.Reader) (_ blob.SizedRef, err error) { + // Slurp the whole blob before replicating. Bounded by 16 MB anyway. + var buf bytes.Buffer + size, err := io.Copy(&buf, src) if err != nil { - for i := range wpipe { - wpipe[i].CloseWithError(err) - } return } - for idx := range sto.replicas { - wpipe[idx].Close() + + nReplicas := len(sto.replicas) + resc := make(chan sizedBlobAndError, nReplicas) + uploadToReplica := func(idx int, dst blobserver.BlobReceiver) { + // Using ReceiveNoHash because it's already been + // verified implicitly by the io.Copy above: + sb, err := blobserver.ReceiveNoHash(dst, br, bytes.NewReader(buf.Bytes())) + resc <- sizedBlobAndError{idx, sb, err} } - nSuccess, nFailures := 0, 0 + for idx, replica := range sto.replicas { + go uploadToReplica(idx, replica) + } + + nSuccess := 0 + var fails []sizedBlobAndError for _ = range sto.replicas { - res := <-upResult + res := <-resc switch { case res.err == nil && res.sb.Size == size: nSuccess++ @@ -218,16 +204,19 @@ func (sto *replicaStorage) ReceiveBlob(b blob.Ref, source io.Reader) (_ blob.Siz return res.sb, nil } case res.err == nil: - nFailures++ err = fmt.Errorf("replica: upload shard reported size %d, expected %d", res.sb.Size, size) + res.err = err + fails = append(fails, res) default: - nFailures++ err = res.err + fails = append(fails, res) } } - if nFailures > 0 { - log.Printf("replica: receiving blob, %d successes, %d failures; last error = %v", - nSuccess, nFailures, err) + for _, res := range fails { + log.Printf("replica: receiving blob %v, %d successes, %d failures; backend %s reported: %v", + br, + nSuccess, len(fails), + sto.replicaPrefixes[res.idx], res.err) } return } diff --git a/pkg/blobserver/replica/replica_test.go b/pkg/blobserver/replica/replica_test.go new file mode 100644 index 000000000..71d7afcba --- /dev/null +++ b/pkg/blobserver/replica/replica_test.go @@ -0,0 +1,63 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package replica + +import ( + "testing" + + "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/jsonconfig" + "camlistore.org/pkg/test" +) + +func newReplica(t *testing.T, config jsonconfig.Obj) *replicaStorage { + ld := test.NewLoader() + sto, err := newFromConfig(ld, config) + if err != nil { + t.Fatalf("Invalid config: %v", err) + } + return sto.(*replicaStorage) +} + +func TestReceive(t *testing.T) { + sto := newReplica(t, map[string]interface{}{ + "backends": []interface{}{"/good-1/", "/good-2/"}, + }) + tb := &test.Blob{Contents: "stuff"} + tbRef := tb.BlobRef() + sb, err := blobserver.Receive(sto, tbRef, tb.Reader()) + if err != nil { + t.Fatalf("Receive: %v", err) + } + if int(sb.Size) != len(tb.Contents) { + t.Fatalf("size = %d; want %d", sb.Size, len(tb.Contents)) + } + if sb.Ref != tbRef { + t.Fatal("wrong blob received") + } + if len(sto.replicas) != 2 { + t.Fatalf("replicas = %d; want 2", len(sto.replicas)) + } + for i, rep := range sto.replicas { + got, err := blobserver.StatBlob(rep, sb.Ref) + if err != nil { + t.Errorf("Replica %s got stat error %v", sto.replicaPrefixes[i], err) + } else if got != sb { + t.Errorf("Replica %s got %+v; want %+v", sto.replicaPrefixes[i], got, sb) + } + } +} diff --git a/pkg/test/loader.go b/pkg/test/loader.go new file mode 100644 index 000000000..e6a1c6fda --- /dev/null +++ b/pkg/test/loader.go @@ -0,0 +1,62 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package test + +import ( + "errors" + "log" + "strings" + "sync" + + "camlistore.org/pkg/blobserver" +) + +// NewLoader +func NewLoader() *Loader { + return &Loader{} +} + +type Loader struct { + mu sync.Mutex +} + +var _ blobserver.Loader = (*Loader)(nil) + +func (ld *Loader) FindHandlerByType(handlerType string) (prefix string, handler interface{}, err error) { + panic("NOIMPL") +} + +func (ld *Loader) MyPrefix() string { + return "/lies/" +} + +func (ld *Loader) GetHandlerType(prefix string) string { + log.Printf("test.Loader: GetHandlerType called but not implemented.") + return "" +} + +func (ld *Loader) GetHandler(prefix string) (interface{}, error) { + log.Printf("test.Loader: GetHandler called but not implemented.") + return nil, errors.New("doesn't exist") +} + +func (ld *Loader) GetStorage(prefix string) (blobserver.Storage, error) { + if strings.HasPrefix(prefix, "/good") { + return &Fetcher{}, nil + } + panic("test.Loader.GetStorage: unrecognized prefix type") +}