pkg/sorted: log and skip on too large a key/value

So far we were returning an error, which appears to be a little too
strict, so we now just ignore when a key or value is too large, log it,
and go on.

Fixes #849

Change-Id: Iadd4eaab7459643e22ab3043d1f45e3eab662b30
This commit is contained in:
mpl 2016-08-31 17:51:43 +02:00
parent 2f4a94bca6
commit bcb5fca493
8 changed files with 47 additions and 28 deletions

View File

@ -22,6 +22,7 @@ package buffer // import "camlistore.org/pkg/sorted/buffer"
import ( import (
"fmt" "fmt"
"log"
"sync" "sync"
"camlistore.org/pkg/sorted" "camlistore.org/pkg/sorted"
@ -101,7 +102,8 @@ func (kv *KeyValue) Get(key string) (string, error) {
func (kv *KeyValue) Set(key, value string) error { func (kv *KeyValue) Set(key, value string) error {
if err := sorted.CheckSizes(key, value); err != nil { if err := sorted.CheckSizes(key, value); err != nil {
return err log.Printf("Skipping storing (%q:%q): %v", key, value, err)
return nil
} }
kv.mu.RLock() kv.mu.RLock()
err := kv.buf.Set(key, value) err := kv.buf.Set(key, value)
@ -163,7 +165,8 @@ func (kv *KeyValue) CommitBatch(bm sorted.BatchMutation) error {
continue continue
} else { } else {
if err := sorted.CheckSizes(m.key, m.value); err != nil { if err := sorted.CheckSizes(m.key, m.value); err != nil {
return err log.Printf("Skipping storing (%q:%q): %v", m.key, m.value, err)
continue
} }
} }
bmbuf.Set(m.key, m.value) bmbuf.Set(m.key, m.value)

View File

@ -93,7 +93,8 @@ func (is *kvis) Get(key string) (string, error) {
func (is *kvis) Set(key, value string) error { func (is *kvis) Set(key, value string) error {
if err := sorted.CheckSizes(key, value); err != nil { if err := sorted.CheckSizes(key, value); err != nil {
return err log.Printf("Skipping storing (%q:%q): %v", key, value, err)
return nil
} }
return is.db.Set([]byte(key), []byte(value)) return is.db.Set([]byte(key), []byte(value))
} }
@ -162,7 +163,8 @@ func (is *kvis) CommitBatch(bm sorted.BatchMutation) error {
} }
} else { } else {
if err := sorted.CheckSizes(m.Key(), m.Value()); err != nil { if err := sorted.CheckSizes(m.Key(), m.Value()); err != nil {
return err log.Printf("Skipping storing (%q:%q): %v", m.Key(), m.Value(), err)
continue
} }
if err := is.db.Set([]byte(m.Key()), []byte(m.Value())); err != nil { if err := is.db.Set([]byte(m.Key()), []byte(m.Value())); err != nil {
return err return err

View File

@ -167,11 +167,21 @@ func testInsertLarge(t *testing.T, kv sorted.KeyValue) {
func testInsertTooLarge(t *testing.T, kv sorted.KeyValue) { func testInsertTooLarge(t *testing.T, kv sorted.KeyValue) {
largeKey := make([]byte, sorted.MaxKeySize+1) largeKey := make([]byte, sorted.MaxKeySize+1)
largeValue := make([]byte, sorted.MaxValueSize+1) largeValue := make([]byte, sorted.MaxValueSize+1)
if err := kv.Set(string(largeKey), "whatever"); err == nil || err != sorted.ErrKeyTooLarge { err := kv.Set(string(largeKey), "whatever")
t.Fatalf("Insertion of too large a key should have failed, but err was %v", err) if err != nil {
t.Fatalf("Insertion of too large a key should have skipped with some logging, but err was %v", err)
} }
if err := kv.Set("whatever", string(largeValue)); err == nil || err != sorted.ErrValueTooLarge { _, err = kv.Get(string(largeKey))
t.Fatalf("Insertion of too large a value should have failed, but err was %v", err) if err == nil {
t.Fatal("large key should not have been inserted")
}
err = kv.Set("testInsertTooLarge", string(largeValue))
if err != nil {
t.Fatalf("Insertion of too large a value should have skipped with some logging, but err was %v", err)
}
_, err = kv.Get("testInsertTooLarge")
if err == nil {
t.Fatal("large value should not have been inserted")
} }
} }

View File

@ -22,6 +22,7 @@ package leveldb // import "camlistore.org/pkg/sorted/leveldb"
import ( import (
"errors" "errors"
"fmt" "fmt"
"log"
"os" "os"
"sync" "sync"
@ -109,7 +110,8 @@ func (is *kvis) Get(key string) (string, error) {
func (is *kvis) Set(key, value string) error { func (is *kvis) Set(key, value string) error {
if err := sorted.CheckSizes(key, value); err != nil { if err := sorted.CheckSizes(key, value); err != nil {
return err log.Printf("Skipping storing (%q:%q): %v", key, value, err)
return nil
} }
return is.db.Put([]byte(key), []byte(value), is.writeOpts) return is.db.Put([]byte(key), []byte(value), is.writeOpts)
} }
@ -172,11 +174,7 @@ func (lvb *lvbatch) Set(key, value string) {
return return
} }
if err := sorted.CheckSizes(key, value); err != nil { if err := sorted.CheckSizes(key, value); err != nil {
if err == sorted.ErrKeyTooLarge { log.Printf("Skipping storing (%q:%q): %v", key, value, err)
lvb.err = fmt.Errorf("%v: %v", err, key)
} else {
lvb.err = fmt.Errorf("%v: %v", err, value)
}
return return
} }
lvb.batch.Put([]byte(key), []byte(value)) lvb.batch.Put([]byte(key), []byte(value))

View File

@ -18,6 +18,7 @@ package sorted
import ( import (
"errors" "errors"
"log"
"sync" "sync"
"github.com/syndtr/goleveldb/leveldb/comparer" "github.com/syndtr/goleveldb/leveldb/comparer"
@ -111,7 +112,8 @@ func (mk *memKeys) Find(start, end string) Iterator {
func (mk *memKeys) Set(key, value string) error { func (mk *memKeys) Set(key, value string) error {
if err := CheckSizes(key, value); err != nil { if err := CheckSizes(key, value); err != nil {
return err log.Printf("Skipping storing (%q:%q): %v", key, value, err)
return nil
} }
mk.mu.Lock() mk.mu.Lock()
defer mk.mu.Unlock() defer mk.mu.Unlock()
@ -145,8 +147,11 @@ func (mk *memKeys) CommitBatch(bm BatchMutation) error {
return err return err
} }
} else { } else {
// TODO(mpl): we need to force a reindex when we have a proper solution
// for storing these too large attributes, if ever.
if err := CheckSizes(m.Key(), m.Value()); err != nil { if err := CheckSizes(m.Key(), m.Value()); err != nil {
return err log.Printf("Skipping storing (%q:%q): %v", m.Key(), m.Value(), err)
continue
} }
if err := mk.db.Put([]byte(m.Key()), []byte(m.Value())); err != nil { if err := mk.db.Put([]byte(m.Key()), []byte(m.Value())); err != nil {
return err return err

View File

@ -21,6 +21,7 @@ package mongo // import "camlistore.org/pkg/sorted/mongo"
import ( import (
"bytes" "bytes"
"errors" "errors"
"log"
"sync" "sync"
"time" "time"
@ -146,7 +147,8 @@ func (kv *keyValue) Find(start, end string) sorted.Iterator {
func (kv *keyValue) Set(key, value string) error { func (kv *keyValue) Set(key, value string) error {
if err := sorted.CheckSizes(key, value); err != nil { if err := sorted.CheckSizes(key, value); err != nil {
return err log.Printf("Skipping storing (%q:%q): %v", key, value, err)
return nil
} }
kv.mu.Lock() kv.mu.Lock()
defer kv.mu.Unlock() defer kv.mu.Unlock()
@ -196,7 +198,8 @@ func (kv *keyValue) CommitBatch(bm sorted.BatchMutation) error {
} }
} else { } else {
if err := sorted.CheckSizes(m.Key(), m.Value()); err != nil { if err := sorted.CheckSizes(m.Key(), m.Value()); err != nil {
return err log.Printf("Skipping storing (%q:%q): %v", m.Key(), m.Value(), err)
continue
} }
if _, err := kv.db.Upsert(&bson.M{mgoKey: m.Key()}, &bson.M{mgoKey: m.Key(), mgoValue: m.Value()}); err != nil { if _, err := kv.db.Upsert(&bson.M{mgoKey: m.Key()}, &bson.M{mgoKey: m.Key(), mgoValue: m.Value()}); err != nil {
return err return err

View File

@ -17,6 +17,8 @@ limitations under the License.
package mysql package mysql
import ( import (
"database/sql"
"errors"
"testing" "testing"
"time" "time"
@ -65,9 +67,8 @@ func TestRollback(t *testing.T) {
t.Fatalf("mysql.NewKeyValue = %v", err) t.Fatalf("mysql.NewKeyValue = %v", err)
} }
tooLargeAKey := make([]byte, sorted.MaxKeySize+10) kv.(*keyValue).KeyValue.BatchSetFunc = func(*sql.Tx, string, string) error {
for i := range tooLargeAKey { return errors.New("Forced failure to trigger a rollback")
tooLargeAKey[i] = 'L'
} }
nbConnections := 2 nbConnections := 2
@ -82,7 +83,7 @@ func TestRollback(t *testing.T) {
b := kv.BeginBatch() b := kv.BeginBatch()
// Making the transaction fail, to force a rollback // Making the transaction fail, to force a rollback
// -> this whole test fails before we introduce the rollback in CommitBatch. // -> this whole test fails before we introduce the rollback in CommitBatch.
b.Set(string(tooLargeAKey), "whatever") b.Set("foo", "bar")
if err := kv.CommitBatch(b); err == nil { if err := kv.CommitBatch(b); err == nil {
t.Fatal("wanted failed commit because too large a key") t.Fatal("wanted failed commit because too large a key")
} }

View File

@ -105,11 +105,7 @@ func (b *batchTx) Set(key, value string) {
return return
} }
if err := sorted.CheckSizes(key, value); err != nil { if err := sorted.CheckSizes(key, value); err != nil {
if err == sorted.ErrKeyTooLarge { log.Printf("Skipping storing (%q:%q): %v", key, value, err)
b.err = fmt.Errorf("%v: %v", err, key)
} else {
b.err = fmt.Errorf("%v: %v", err, value)
}
return return
} }
if b.kv.BatchSetFunc != nil { if b.kv.BatchSetFunc != nil {
@ -172,7 +168,8 @@ func (kv *KeyValue) Get(key string) (value string, err error) {
func (kv *KeyValue) Set(key, value string) error { func (kv *KeyValue) Set(key, value string) error {
if err := sorted.CheckSizes(key, value); err != nil { if err := sorted.CheckSizes(key, value); err != nil {
return err log.Printf("Skipping storing (%q:%q): %v", key, value, err)
return nil
} }
if kv.Gate != nil { if kv.Gate != nil {
kv.Gate.Start() kv.Gate.Start()