Merge "pkg/sorted/buffer: flesh out, add test"

This commit is contained in:
Andrew Gerrand 2014-07-14 23:49:59 +00:00 committed by Gerrit Code Review
commit 83a4b39f10
2 changed files with 281 additions and 24 deletions

View File

@ -33,7 +33,7 @@ import (
// maxBufferBytes <= 0, no automatic flushing is performed.
func New(buffer, backing sorted.KeyValue, maxBufferBytes int64) *KeyValue {
return &KeyValue{
buffer: buffer,
buf: buffer,
back: backing,
maxBuffer: maxBufferBytes,
}
@ -42,19 +42,52 @@ func New(buffer, backing sorted.KeyValue, maxBufferBytes int64) *KeyValue {
var _ sorted.KeyValue = (*KeyValue)(nil)
type KeyValue struct {
buffer, back sorted.KeyValue
buf, back sorted.KeyValue
maxBuffer int64
mu sync.Mutex
bufMu sync.Mutex
buffered int64
// This read lock should be held during Set/Get/Delete/BatchCommit,
// and the write lock should be held during Flush.
mu sync.RWMutex
}
func (kv *KeyValue) Flush() error {
panic("TODO: implement Flush")
kv.mu.Lock()
defer kv.mu.Unlock()
var (
bmback = kv.back.BeginBatch()
bmbuf = kv.buf.BeginBatch()
commit = false
it = kv.buf.Find("", "")
)
for it.Next() {
bmback.Set(it.Key(), it.Value())
bmbuf.Delete(it.Key())
commit = true
}
if err := it.Close(); err != nil {
return err
}
if commit {
if err := kv.back.CommitBatch(bmback); err != nil {
return err
}
if err := kv.buf.CommitBatch(bmbuf); err != nil {
return err
}
kv.bufMu.Lock()
kv.buffered = 0
kv.bufMu.Unlock()
}
return nil
}
func (kv *KeyValue) Get(key string) (string, error) {
v, err := kv.buffer.Get(key)
kv.mu.RLock()
defer kv.mu.RUnlock()
v, err := kv.buf.Get(key)
switch err {
case sorted.ErrNotFound:
break
@ -67,17 +100,31 @@ func (kv *KeyValue) Get(key string) (string, error) {
}
func (kv *KeyValue) Set(key, value string) error {
return kv.buffer.Set(key, value)
kv.mu.RLock()
err := kv.buf.Set(key, value)
kv.mu.RUnlock()
if err == nil {
kv.bufMu.Lock()
kv.buffered += int64(len(key) + len(value))
doFlush := kv.buffered > kv.maxBuffer
kv.bufMu.Unlock()
if doFlush {
err = kv.Flush()
}
}
return err
}
func (kv *KeyValue) Delete(key string) error {
kv.mu.RLock()
defer kv.mu.RUnlock()
// This isn't an ideal implementation, since it synchronously
// deletes from the backing store. But deletes aren't really
// used, so ignoring for now.
// Could also use a syncutil.Group to do these in parallel,
// but the buffer should be an in-memory implementation
// anyway, so should be fast.
err1 := kv.buffer.Delete(key)
err1 := kv.buf.Delete(key)
err2 := kv.back.Delete(key)
if err1 != nil {
return err1
@ -90,12 +137,37 @@ func (kv *KeyValue) BeginBatch() sorted.BatchMutation {
}
func (kv *KeyValue) CommitBatch(bm sorted.BatchMutation) error {
kv.mu.RLock()
defer kv.mu.RUnlock()
b, ok := bm.(*batch)
if !ok {
return fmt.Errorf("unexpected BatchMutation type %T", bm)
}
_ = b
panic("TODO")
var (
// A batch mutation for applying this mutation to the buffer.
bmbuf = kv.buf.BeginBatch()
// A lazily created batch mutation for deleting from the backing
// storage; this should be rare. (See Delete above.)
bmback sorted.BatchMutation
)
for _, m := range b.mods {
if m.isDelete {
bmbuf.Delete(m.key)
if bmback == nil {
bmback = kv.back.BeginBatch()
}
bmback.Delete(m.key)
continue
}
bmbuf.Set(m.key, m.value)
}
if err := kv.buf.CommitBatch(bmbuf); err != nil {
return err
}
if bmback != nil {
return kv.back.CommitBatch(bmback)
}
return nil
}
func (kv *KeyValue) Close() error {
@ -106,11 +178,12 @@ func (kv *KeyValue) Close() error {
}
func (kv *KeyValue) Find(start, end string) sorted.Iterator {
ibuf := kv.buffer.Find(start, end)
// TODO(adg): hold read lock while iterating? seems complicated
ibuf := kv.buf.Find(start, end)
iback := kv.back.Find(start, end)
return &iter{
ibuf: ibuf,
iback: iback,
buf: subIter{Iterator: ibuf},
back: subIter{Iterator: iback},
}
}
@ -137,23 +210,103 @@ func (b *batch) Delete(key string) {
}
type iter struct {
ibuf sorted.Iterator
iback sorted.Iterator
bufEOF bool
backEOF bool
buf, back subIter
}
func (it *iter) Key() string { panic("TODO") }
func (it *iter) Value() string { panic("TODO") }
func (it *iter) KeyBytes() []byte { panic("TODO") }
func (it *iter) ValueBytes() []byte { panic("TODO") }
func (it *iter) Next() bool { panic("TODO") }
func (it *iter) current() *subIter {
switch {
case it.back.eof:
return &it.buf
case it.buf.eof:
return &it.back
case it.buf.key <= it.back.key:
return &it.buf
default:
return &it.back
}
}
func (it *iter) Next() bool {
// Call Next on both iterators for the first time, if we haven't
// already, so that the key comparisons below are valid.
start := false
if it.buf.key == "" && !it.buf.eof {
start = it.buf.next()
}
if it.back.key == "" && !it.buf.eof {
start = it.back.next() || start
}
if start {
// We started iterating with at least one value.
return true
}
// Bail if both iterators are done.
if it.buf.eof && it.back.eof {
return false
}
// If one iterator is done, advance the other.
if it.buf.eof {
return it.back.next()
}
if it.back.eof {
return it.buf.next()
}
// If both iterators still going,
// advance the one that is further behind,
// or both simultaneously if they point to the same key.
switch {
case it.buf.key < it.back.key:
it.buf.next()
case it.buf.key > it.back.key:
it.back.next()
case it.buf.key == it.back.key:
n1, n2 := it.buf.next(), it.back.next()
if !n1 && !n2 {
// Both finished simultaneously.
return false
}
}
return true
}
func (it *iter) Key() string {
return it.current().key
}
func (it *iter) Value() string {
return it.current().Value()
}
func (it *iter) KeyBytes() []byte {
return it.current().KeyBytes()
}
func (it *iter) ValueBytes() []byte {
return it.current().ValueBytes()
}
func (it *iter) Close() error {
err1 := it.ibuf.Close()
err2 := it.iback.Close()
err1 := it.buf.Close()
err2 := it.back.Close()
if err1 != nil {
return err1
}
return err2
}
// subIter is an iterator (either the backing storage or the buffer) that
// keeps track of the current key and whether it has reached EOF.
type subIter struct {
sorted.Iterator
key string
eof bool
}
func (it *subIter) next() bool {
if it.Next() {
it.key = it.Key()
return true
}
it.eof = true
return false
}

View File

@ -0,0 +1,104 @@
/*
Copyright 2014 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package buffer
import (
"testing"
"camlistore.org/pkg/sorted"
)
// TODO(adg): test batch mutations
// TODO(adg): test auto-flush behavior
func TestBuffer(t *testing.T) {
var (
toBack = []mod{
{false, "b", "b1"},
{false, "d", "d1"},
{false, "f", "f1"},
}
toBuf = []mod{
{false, "a", "a2"},
{false, "b", "b2"},
{false, "c", "c2"},
{false, "e", "e2"},
{true, "f", ""},
{false, "g", "g2"},
}
backBeforeFlush = []mod{
{false, "b", "b1"},
{false, "d", "d1"},
// f deleted
}
want = []mod{
{false, "a", "a2"},
{false, "b", "b2"},
{false, "c", "c2"},
{false, "d", "d1"},
{false, "e", "e2"},
// f deleted
{false, "g", "g2"},
}
)
// Populate backing storage.
backing := sorted.NewMemoryKeyValue()
for _, m := range toBack {
backing.Set(m.key, m.value)
}
// Wrap with buffered storage, populate.
buf := New(sorted.NewMemoryKeyValue(), backing, 1<<20)
for _, m := range toBuf {
if m.isDelete {
buf.Delete(m.key)
} else {
buf.Set(m.key, m.value)
}
}
// Check contents of buffered storage.
check(t, buf, "buffered", want)
check(t, backing, "backing before flush", backBeforeFlush)
// Flush.
if err := buf.Flush(); err != nil {
t.Fatal("flush error: ", err)
}
// Check contents of backing storage.
check(t, backing, "backing after flush", want)
}
func check(t *testing.T, kv sorted.KeyValue, prefix string, want []mod) {
it := kv.Find("", "")
for i, m := range want {
if !it.Next() {
t.Fatalf("%v: unexpected it.Next == false on iteration %d", prefix, i)
}
if k, v := it.Key(), it.Value(); k != m.key || v != m.value {
t.Errorf("%v: got key == %q value == %q, want key == %q value == %q on iteration %d",
prefix, k, v, m.key, m.value, i)
}
}
if it.Next() {
t.Errorf("%v: unexpected it.Next == true after complete iteration", prefix)
}
if err := it.Close(); err != nil {
t.Errorf("%v: error closing iterator: %v", prefix, err)
}
}