diskpacked: allow removing non-existing blob

Let double-delete test pass.
Make leveldb and memdb Deletes go on for ErrNotFound.

Add test for batch deleting nonexisting key, and partial batch delete
errors - fix it for mongo.

Fixes #760.

Change-Id: I0a0e28836a723d245564f3dabaf328bf73bf463a
This commit is contained in:
Tamás Gulácsi 2016-05-10 09:55:30 +02:00
parent ae7b4b5d77
commit 78f21f5e15
4 changed files with 41 additions and 3 deletions

View File

@ -408,7 +408,7 @@ func (s *storage) RemoveBlobs(blobs []blob.Ref) error {
batch.Delete(br.String())
wg.Go(func() error {
defer removeGate.Done()
if err := s.delete(br); err != nil {
if err := s.delete(br); err != nil && err != os.ErrNotExist {
return err
}
return nil

View File

@ -76,6 +76,44 @@ func TestSorted(t *testing.T, kv sorted.KeyValue) {
testInsertTooLarge(t, kv)
// TODO: test batch commits
// Deleting a non-existent item in a batch should not be an error
testDeleteNotFoundBatch(t, kv)
testDeletePartialNotFoundBatch(t, kv)
}
// Do not ever insert that key, as it used for testing deletion of non existing entries
const (
notExistKey = "I do not exist"
butIExistKey = "But I do exist"
)
func testDeleteNotFoundBatch(t *testing.T, kv sorted.KeyValue) {
b := kv.BeginBatch()
b.Delete(notExistKey)
if err := kv.CommitBatch(b); err != nil {
t.Fatalf("Batch deletion of non existing key returned an error: %v", err)
}
}
func testDeleteNotFound(t *testing.T, kv sorted.KeyValue) {
if err := kv.Delete(notExistKey); err != nil {
t.Fatalf("Deletion of non existing key returned an error: %v", err)
}
}
func testDeletePartialNotFoundBatch(t *testing.T, kv sorted.KeyValue) {
if err := kv.Set(butIExistKey, "whatever"); err != nil {
t.Fatal(err)
}
b := kv.BeginBatch()
b.Delete(notExistKey)
b.Delete(butIExistKey)
if err := kv.CommitBatch(b); err != nil {
t.Fatalf("Batch deletion with one non existing key returned an error: %v", err)
}
if val, err := kv.Get(butIExistKey); err != sorted.ErrNotFound || val != "" {
t.Fatalf("Key %q should have been batch deleted", butIExistKey)
}
}
func testInsertLarge(t *testing.T, kv sorted.KeyValue) {

View File

@ -141,7 +141,7 @@ func (mk *memKeys) CommitBatch(bm BatchMutation) error {
defer mk.mu.Unlock()
for _, m := range b.Mutations() {
if m.IsDelete() {
if err := mk.db.Delete([]byte(m.Key())); err != nil {
if err := mk.db.Delete([]byte(m.Key())); err != nil && err != memdb.ErrNotFound {
return err
}
} else {

View File

@ -191,7 +191,7 @@ func (kv *keyValue) CommitBatch(bm sorted.BatchMutation) error {
defer kv.mu.Unlock()
for _, m := range b.Mutations() {
if m.IsDelete() {
if err := kv.db.Remove(bson.M{mgoKey: m.Key()}); err != nil {
if err := kv.db.Remove(bson.M{mgoKey: m.Key()}); err != nil && err != mgo.ErrNotFound {
return err
}
} else {