mirror of https://github.com/perkeep/perkeep.git
Diskpacked: revert append on index .Set error
Seek back and truncate the file to its original size if index.Set returns error. As we return the index.Set's error, such appends are treated as failed, and the application (or the user) will try again. Such duplicately stored blobs just complicates recovery. The other part of this CL makes the reindex-diskpacked command report such duplicately stored blobs just warnings, not errors. Change-Id: I39f049cd136a2efd840e76a2cfa5d06016a189c3
This commit is contained in:
parent
312f7a2aa6
commit
d9f41b747d
|
@ -649,6 +649,8 @@ func (s *storage) append(br blob.SizedRef, r io.Reader) error {
|
|||
if s.closed {
|
||||
return errors.New("diskpacked: write to closed storage")
|
||||
}
|
||||
// to be able to undo the append
|
||||
origOffset := s.size
|
||||
|
||||
fn := s.writer.Name()
|
||||
n, err := fmt.Fprintf(s.writer, "[%v %v]", br.Ref.String(), br.Size)
|
||||
|
@ -690,7 +692,17 @@ func (s *storage) append(br blob.SizedRef, r io.Reader) error {
|
|||
return err
|
||||
}
|
||||
}
|
||||
return s.index.Set(br.Ref.String(), blobMeta{packIdx, offset, br.Size}.String())
|
||||
err = s.index.Set(br.Ref.String(), blobMeta{packIdx, offset, br.Size}.String())
|
||||
if err != nil {
|
||||
if _, seekErr := s.writer.Seek(origOffset, os.SEEK_SET); seekErr != nil {
|
||||
log.Printf("ERROR seeking back to the original offset: %v", seekErr)
|
||||
} else if truncErr := s.writer.Truncate(origOffset); truncErr != nil {
|
||||
log.Printf("ERROR truncating file after index error: %v", truncErr)
|
||||
} else {
|
||||
s.size = origOffset
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// meta fetches the metadata for the specified blob from the index.
|
||||
|
|
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package diskpacked
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
@ -27,6 +28,7 @@ import (
|
|||
"camlistore.org/pkg/blobserver"
|
||||
"camlistore.org/pkg/blobserver/storagetest"
|
||||
"camlistore.org/pkg/jsonconfig"
|
||||
"camlistore.org/pkg/sorted"
|
||||
"camlistore.org/pkg/test"
|
||||
)
|
||||
|
||||
|
@ -196,3 +198,57 @@ func TestDelete(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
var dummyErr = errors.New("dummy fail")
|
||||
|
||||
func TestDoubleReceiveFailingIndex(t *testing.T) {
|
||||
sto, cleanup := newTempDiskpacked(t)
|
||||
defer cleanup()
|
||||
|
||||
sto.(*storage).index = &failingIndex{KeyValue: sto.(*storage).index}
|
||||
|
||||
size := func(n int) int64 {
|
||||
path := sto.(*storage).filename(n)
|
||||
fi, err := os.Stat(path)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
return fi.Size()
|
||||
}
|
||||
|
||||
const blobSize = 5 << 10
|
||||
b := &test.Blob{Contents: strings.Repeat("a", blobSize)}
|
||||
br := b.BlobRef()
|
||||
|
||||
_, err := blobserver.Receive(sto, br, b.Reader())
|
||||
if err != nil {
|
||||
if err != dummyErr {
|
||||
t.Fatal(err)
|
||||
}
|
||||
t.Logf("dummy fail")
|
||||
}
|
||||
if size(0) >= blobSize {
|
||||
t.Fatalf("size = %d; want zero (at most %d)", size(0), blobSize-1)
|
||||
}
|
||||
|
||||
_, err = blobserver.Receive(sto, br, b.Reader())
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if size(0) < blobSize {
|
||||
t.Fatalf("size = %d; want at least %d", size(0), blobSize)
|
||||
}
|
||||
}
|
||||
|
||||
type failingIndex struct {
|
||||
sorted.KeyValue
|
||||
setCount int
|
||||
}
|
||||
|
||||
func (idx *failingIndex) Set(key string, value string) error {
|
||||
idx.setCount++
|
||||
if idx.setCount == 1 { // fail the first time
|
||||
return dummyErr
|
||||
}
|
||||
return idx.KeyValue.Set(key, value)
|
||||
}
|
||||
|
|
|
@ -81,6 +81,7 @@ func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwr
|
|||
|
||||
// TODO(tgulacsi): proper verbose from context
|
||||
verbose := camliDebug
|
||||
misses := make(map[blob.Ref]string, 8)
|
||||
err := s.walkPack(verbose, packID,
|
||||
func(packID int, ref blob.Ref, offset int64, size uint32) error {
|
||||
if !ref.Valid() {
|
||||
|
@ -92,17 +93,25 @@ func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwr
|
|||
meta := blobMeta{packID, offset, size}.String()
|
||||
if overwrite && batch != nil {
|
||||
batch.Set(ref.String(), meta)
|
||||
} else {
|
||||
if old, err := index.Get(ref.String()); err != nil {
|
||||
return nil
|
||||
}
|
||||
if _, ok := misses[ref]; ok { // maybe this is the last of this blob.
|
||||
delete(misses, ref)
|
||||
}
|
||||
if old, err := index.Get(ref.String()); err != nil {
|
||||
allOk = false
|
||||
if err == sorted.ErrNotFound {
|
||||
log.Println(ref.String() + ": cannot find in index!")
|
||||
} else {
|
||||
log.Println(ref.String()+": error getting from index: ", err.Error())
|
||||
}
|
||||
} else if old != meta {
|
||||
if old > meta {
|
||||
misses[ref] = meta
|
||||
log.Printf("WARN: possible duplicate blob %s", ref.String())
|
||||
} else {
|
||||
allOk = false
|
||||
if err == sorted.ErrNotFound {
|
||||
log.Println(ref.String() + ": cannot find in index!")
|
||||
} else {
|
||||
log.Println(ref.String()+": error getting from index: ", err.Error())
|
||||
}
|
||||
} else if old != meta {
|
||||
allOk = false
|
||||
log.Printf("%s: index mismatch - index=%s data=%s", ref.String(), old, meta)
|
||||
log.Printf("ERROR: index mismatch for %s - index=%s, meta=%s!", ref.String(), old, meta)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
|
@ -111,6 +120,11 @@ func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwr
|
|||
return err
|
||||
}
|
||||
|
||||
for ref, meta := range misses {
|
||||
log.Printf("ERROR: index mismatch for %s (%s)!", ref.String(), meta)
|
||||
allOk = false
|
||||
}
|
||||
|
||||
if overwrite && batch != nil {
|
||||
log.Printf("overwriting %s from %d", index, packID)
|
||||
if err = index.CommitBatch(batch); err != nil {
|
||||
|
|
Loading…
Reference in New Issue