diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index 01ea73feb..6803d99bb 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -649,6 +649,8 @@ func (s *storage) append(br blob.SizedRef, r io.Reader) error { if s.closed { return errors.New("diskpacked: write to closed storage") } + // to be able to undo the append + origOffset := s.size fn := s.writer.Name() n, err := fmt.Fprintf(s.writer, "[%v %v]", br.Ref.String(), br.Size) @@ -690,7 +692,17 @@ func (s *storage) append(br blob.SizedRef, r io.Reader) error { return err } } - return s.index.Set(br.Ref.String(), blobMeta{packIdx, offset, br.Size}.String()) + err = s.index.Set(br.Ref.String(), blobMeta{packIdx, offset, br.Size}.String()) + if err != nil { + if _, seekErr := s.writer.Seek(origOffset, os.SEEK_SET); seekErr != nil { + log.Printf("ERROR seeking back to the original offset: %v", seekErr) + } else if truncErr := s.writer.Truncate(origOffset); truncErr != nil { + log.Printf("ERROR truncating file after index error: %v", truncErr) + } else { + s.size = origOffset + } + } + return err } // meta fetches the metadata for the specified blob from the index. diff --git a/pkg/blobserver/diskpacked/diskpacked_test.go b/pkg/blobserver/diskpacked/diskpacked_test.go index 8326c27ae..5d22303ff 100644 --- a/pkg/blobserver/diskpacked/diskpacked_test.go +++ b/pkg/blobserver/diskpacked/diskpacked_test.go @@ -17,6 +17,7 @@ limitations under the License. package diskpacked import ( + "errors" "fmt" "io/ioutil" "os" @@ -27,6 +28,7 @@ import ( "camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver/storagetest" "camlistore.org/pkg/jsonconfig" + "camlistore.org/pkg/sorted" "camlistore.org/pkg/test" ) @@ -196,3 +198,57 @@ func TestDelete(t *testing.T) { } } } + +var dummyErr = errors.New("dummy fail") + +func TestDoubleReceiveFailingIndex(t *testing.T) { + sto, cleanup := newTempDiskpacked(t) + defer cleanup() + + sto.(*storage).index = &failingIndex{KeyValue: sto.(*storage).index} + + size := func(n int) int64 { + path := sto.(*storage).filename(n) + fi, err := os.Stat(path) + if err != nil { + t.Fatal(err) + } + return fi.Size() + } + + const blobSize = 5 << 10 + b := &test.Blob{Contents: strings.Repeat("a", blobSize)} + br := b.BlobRef() + + _, err := blobserver.Receive(sto, br, b.Reader()) + if err != nil { + if err != dummyErr { + t.Fatal(err) + } + t.Logf("dummy fail") + } + if size(0) >= blobSize { + t.Fatalf("size = %d; want zero (at most %d)", size(0), blobSize-1) + } + + _, err = blobserver.Receive(sto, br, b.Reader()) + if err != nil { + t.Fatal(err) + } + if size(0) < blobSize { + t.Fatalf("size = %d; want at least %d", size(0), blobSize) + } +} + +type failingIndex struct { + sorted.KeyValue + setCount int +} + +func (idx *failingIndex) Set(key string, value string) error { + idx.setCount++ + if idx.setCount == 1 { // fail the first time + return dummyErr + } + return idx.KeyValue.Set(key, value) +} diff --git a/pkg/blobserver/diskpacked/reindex.go b/pkg/blobserver/diskpacked/reindex.go index f3efe6ee9..f83a83613 100644 --- a/pkg/blobserver/diskpacked/reindex.go +++ b/pkg/blobserver/diskpacked/reindex.go @@ -81,6 +81,7 @@ func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwr // TODO(tgulacsi): proper verbose from context verbose := camliDebug + misses := make(map[blob.Ref]string, 8) err := s.walkPack(verbose, packID, func(packID int, ref blob.Ref, offset int64, size uint32) error { if !ref.Valid() { @@ -92,17 +93,25 @@ func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwr meta := blobMeta{packID, offset, size}.String() if overwrite && batch != nil { batch.Set(ref.String(), meta) - } else { - if old, err := index.Get(ref.String()); err != nil { + return nil + } + if _, ok := misses[ref]; ok { // maybe this is the last of this blob. + delete(misses, ref) + } + if old, err := index.Get(ref.String()); err != nil { + allOk = false + if err == sorted.ErrNotFound { + log.Println(ref.String() + ": cannot find in index!") + } else { + log.Println(ref.String()+": error getting from index: ", err.Error()) + } + } else if old != meta { + if old > meta { + misses[ref] = meta + log.Printf("WARN: possible duplicate blob %s", ref.String()) + } else { allOk = false - if err == sorted.ErrNotFound { - log.Println(ref.String() + ": cannot find in index!") - } else { - log.Println(ref.String()+": error getting from index: ", err.Error()) - } - } else if old != meta { - allOk = false - log.Printf("%s: index mismatch - index=%s data=%s", ref.String(), old, meta) + log.Printf("ERROR: index mismatch for %s - index=%s, meta=%s!", ref.String(), old, meta) } } return nil @@ -111,6 +120,11 @@ func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwr return err } + for ref, meta := range misses { + log.Printf("ERROR: index mismatch for %s (%s)!", ref.String(), meta) + allOk = false + } + if overwrite && batch != nil { log.Printf("overwriting %s from %d", index, packID) if err = index.CommitBatch(batch); err != nil {