diff --git a/pkg/sorted/buffer/buffer.go b/pkg/sorted/buffer/buffer.go index df940ec36..34c1a6d75 100644 --- a/pkg/sorted/buffer/buffer.go +++ b/pkg/sorted/buffer/buffer.go @@ -33,7 +33,7 @@ import ( // maxBufferBytes <= 0, no automatic flushing is performed. func New(buffer, backing sorted.KeyValue, maxBufferBytes int64) *KeyValue { return &KeyValue{ - buffer: buffer, + buf: buffer, back: backing, maxBuffer: maxBufferBytes, } @@ -42,19 +42,52 @@ func New(buffer, backing sorted.KeyValue, maxBufferBytes int64) *KeyValue { var _ sorted.KeyValue = (*KeyValue)(nil) type KeyValue struct { - buffer, back sorted.KeyValue - maxBuffer int64 + buf, back sorted.KeyValue + maxBuffer int64 - mu sync.Mutex + bufMu sync.Mutex buffered int64 + + // This read lock should be held during Set/Get/Delete/BatchCommit, + // and the write lock should be held during Flush. + mu sync.RWMutex } func (kv *KeyValue) Flush() error { - panic("TODO: implement Flush") + kv.mu.Lock() + defer kv.mu.Unlock() + var ( + bmback = kv.back.BeginBatch() + bmbuf = kv.buf.BeginBatch() + commit = false + it = kv.buf.Find("", "") + ) + for it.Next() { + bmback.Set(it.Key(), it.Value()) + bmbuf.Delete(it.Key()) + commit = true + } + if err := it.Close(); err != nil { + return err + } + if commit { + if err := kv.back.CommitBatch(bmback); err != nil { + return err + } + if err := kv.buf.CommitBatch(bmbuf); err != nil { + return err + } + kv.bufMu.Lock() + kv.buffered = 0 + kv.bufMu.Unlock() + } + return nil } func (kv *KeyValue) Get(key string) (string, error) { - v, err := kv.buffer.Get(key) + kv.mu.RLock() + defer kv.mu.RUnlock() + v, err := kv.buf.Get(key) switch err { case sorted.ErrNotFound: break @@ -67,17 +100,31 @@ func (kv *KeyValue) Get(key string) (string, error) { } func (kv *KeyValue) Set(key, value string) error { - return kv.buffer.Set(key, value) + kv.mu.RLock() + err := kv.buf.Set(key, value) + kv.mu.RUnlock() + if err == nil { + kv.bufMu.Lock() + kv.buffered += int64(len(key) + len(value)) + doFlush := kv.buffered > kv.maxBuffer + kv.bufMu.Unlock() + if doFlush { + err = kv.Flush() + } + } + return err } func (kv *KeyValue) Delete(key string) error { + kv.mu.RLock() + defer kv.mu.RUnlock() // This isn't an ideal implementation, since it synchronously // deletes from the backing store. But deletes aren't really // used, so ignoring for now. // Could also use a syncutil.Group to do these in parallel, // but the buffer should be an in-memory implementation // anyway, so should be fast. - err1 := kv.buffer.Delete(key) + err1 := kv.buf.Delete(key) err2 := kv.back.Delete(key) if err1 != nil { return err1 @@ -90,12 +137,37 @@ func (kv *KeyValue) BeginBatch() sorted.BatchMutation { } func (kv *KeyValue) CommitBatch(bm sorted.BatchMutation) error { + kv.mu.RLock() + defer kv.mu.RUnlock() b, ok := bm.(*batch) if !ok { return fmt.Errorf("unexpected BatchMutation type %T", bm) } - _ = b - panic("TODO") + var ( + // A batch mutation for applying this mutation to the buffer. + bmbuf = kv.buf.BeginBatch() + // A lazily created batch mutation for deleting from the backing + // storage; this should be rare. (See Delete above.) + bmback sorted.BatchMutation + ) + for _, m := range b.mods { + if m.isDelete { + bmbuf.Delete(m.key) + if bmback == nil { + bmback = kv.back.BeginBatch() + } + bmback.Delete(m.key) + continue + } + bmbuf.Set(m.key, m.value) + } + if err := kv.buf.CommitBatch(bmbuf); err != nil { + return err + } + if bmback != nil { + return kv.back.CommitBatch(bmback) + } + return nil } func (kv *KeyValue) Close() error { @@ -106,11 +178,12 @@ func (kv *KeyValue) Close() error { } func (kv *KeyValue) Find(start, end string) sorted.Iterator { - ibuf := kv.buffer.Find(start, end) + // TODO(adg): hold read lock while iterating? seems complicated + ibuf := kv.buf.Find(start, end) iback := kv.back.Find(start, end) return &iter{ - ibuf: ibuf, - iback: iback, + buf: subIter{Iterator: ibuf}, + back: subIter{Iterator: iback}, } } @@ -137,23 +210,103 @@ func (b *batch) Delete(key string) { } type iter struct { - ibuf sorted.Iterator - iback sorted.Iterator - bufEOF bool - backEOF bool + buf, back subIter } -func (it *iter) Key() string { panic("TODO") } -func (it *iter) Value() string { panic("TODO") } -func (it *iter) KeyBytes() []byte { panic("TODO") } -func (it *iter) ValueBytes() []byte { panic("TODO") } -func (it *iter) Next() bool { panic("TODO") } +func (it *iter) current() *subIter { + switch { + case it.back.eof: + return &it.buf + case it.buf.eof: + return &it.back + case it.buf.key <= it.back.key: + return &it.buf + default: + return &it.back + } +} + +func (it *iter) Next() bool { + // Call Next on both iterators for the first time, if we haven't + // already, so that the key comparisons below are valid. + start := false + if it.buf.key == "" && !it.buf.eof { + start = it.buf.next() + } + if it.back.key == "" && !it.buf.eof { + start = it.back.next() || start + } + if start { + // We started iterating with at least one value. + return true + } + // Bail if both iterators are done. + if it.buf.eof && it.back.eof { + return false + } + // If one iterator is done, advance the other. + if it.buf.eof { + return it.back.next() + } + if it.back.eof { + return it.buf.next() + } + // If both iterators still going, + // advance the one that is further behind, + // or both simultaneously if they point to the same key. + switch { + case it.buf.key < it.back.key: + it.buf.next() + case it.buf.key > it.back.key: + it.back.next() + case it.buf.key == it.back.key: + n1, n2 := it.buf.next(), it.back.next() + if !n1 && !n2 { + // Both finished simultaneously. + return false + } + } + return true +} + +func (it *iter) Key() string { + return it.current().key +} + +func (it *iter) Value() string { + return it.current().Value() +} + +func (it *iter) KeyBytes() []byte { + return it.current().KeyBytes() +} + +func (it *iter) ValueBytes() []byte { + return it.current().ValueBytes() +} func (it *iter) Close() error { - err1 := it.ibuf.Close() - err2 := it.iback.Close() + err1 := it.buf.Close() + err2 := it.back.Close() if err1 != nil { return err1 } return err2 } + +// subIter is an iterator (either the backing storage or the buffer) that +// keeps track of the current key and whether it has reached EOF. +type subIter struct { + sorted.Iterator + key string + eof bool +} + +func (it *subIter) next() bool { + if it.Next() { + it.key = it.Key() + return true + } + it.eof = true + return false +} diff --git a/pkg/sorted/buffer/buffer_test.go b/pkg/sorted/buffer/buffer_test.go new file mode 100644 index 000000000..dbb646485 --- /dev/null +++ b/pkg/sorted/buffer/buffer_test.go @@ -0,0 +1,104 @@ +/* +Copyright 2014 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package buffer + +import ( + "testing" + + "camlistore.org/pkg/sorted" +) + +// TODO(adg): test batch mutations +// TODO(adg): test auto-flush behavior + +func TestBuffer(t *testing.T) { + var ( + toBack = []mod{ + {false, "b", "b1"}, + {false, "d", "d1"}, + {false, "f", "f1"}, + } + toBuf = []mod{ + {false, "a", "a2"}, + {false, "b", "b2"}, + {false, "c", "c2"}, + {false, "e", "e2"}, + {true, "f", ""}, + {false, "g", "g2"}, + } + backBeforeFlush = []mod{ + {false, "b", "b1"}, + {false, "d", "d1"}, + // f deleted + } + want = []mod{ + {false, "a", "a2"}, + {false, "b", "b2"}, + {false, "c", "c2"}, + {false, "d", "d1"}, + {false, "e", "e2"}, + // f deleted + {false, "g", "g2"}, + } + ) + + // Populate backing storage. + backing := sorted.NewMemoryKeyValue() + for _, m := range toBack { + backing.Set(m.key, m.value) + } + // Wrap with buffered storage, populate. + buf := New(sorted.NewMemoryKeyValue(), backing, 1<<20) + for _, m := range toBuf { + if m.isDelete { + buf.Delete(m.key) + } else { + buf.Set(m.key, m.value) + } + } + + // Check contents of buffered storage. + check(t, buf, "buffered", want) + check(t, backing, "backing before flush", backBeforeFlush) + + // Flush. + if err := buf.Flush(); err != nil { + t.Fatal("flush error: ", err) + } + + // Check contents of backing storage. + check(t, backing, "backing after flush", want) +} + +func check(t *testing.T, kv sorted.KeyValue, prefix string, want []mod) { + it := kv.Find("", "") + for i, m := range want { + if !it.Next() { + t.Fatalf("%v: unexpected it.Next == false on iteration %d", prefix, i) + } + if k, v := it.Key(), it.Value(); k != m.key || v != m.value { + t.Errorf("%v: got key == %q value == %q, want key == %q value == %q on iteration %d", + prefix, k, v, m.key, m.value, i) + } + } + if it.Next() { + t.Errorf("%v: unexpected it.Next == true after complete iteration", prefix) + } + if err := it.Close(); err != nil { + t.Errorf("%v: error closing iterator: %v", prefix, err) + } +}