blobserver/memory: add cache mode, where old entries are evicted

Change-Id: Id09f981afda8ab55971b5491ff488e696e6b5ae2
This commit is contained in:
Brad Fitzpatrick 2014-09-26 16:41:31 -07:00
parent 43bfb2b01d
commit f99a7a6fa9
2 changed files with 86 additions and 21 deletions

View File

@ -32,6 +32,7 @@ import (
"camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver"
"camlistore.org/pkg/context" "camlistore.org/pkg/context"
"camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/lru"
"camlistore.org/pkg/types" "camlistore.org/pkg/types"
) )
@ -39,9 +40,15 @@ import (
// interface. It also includes other convenience methods used by // interface. It also includes other convenience methods used by
// tests. // tests.
type Storage struct { type Storage struct {
maxSize int64 // or zero if no limit
mu sync.RWMutex // guards following 2 fields. mu sync.RWMutex // guards following 2 fields.
m map[blob.Ref][]byte // maps blob ref to its contents m map[blob.Ref][]byte // maps blob ref to its contents
sorted []string // blobrefs sorted size int64 // sum of len(values(m))
// lru is non-nil if we're in cache mode.
// Else it maps blobref.String() to a nil value.
lru *lru.Cache
blobsFetched int64 // atomic blobsFetched int64 // atomic
bytesFetched int64 // atomic bytesFetched int64 // atomic
@ -58,9 +65,21 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
return &Storage{}, nil return &Storage{}, nil
} }
// NewCache returns a cache that won't store more than size bytes.
// Blobs are evicted in LRU order.
func NewCache(size int64) *Storage {
return &Storage{
maxSize: size,
lru: lru.New(1 << 31), // ~infinite items; we evict by size, not count
}
}
func (s *Storage) Fetch(ref blob.Ref) (file io.ReadCloser, size uint32, err error) { func (s *Storage) Fetch(ref blob.Ref) (file io.ReadCloser, size uint32, err error) {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
if s.lru != nil {
s.lru.Get(ref.String()) // force to head
}
if s.m == nil { if s.m == nil {
err = os.ErrNotExist err = os.ErrNotExist
return return
@ -126,8 +145,17 @@ func (s *Storage) ReceiveBlob(br blob.Ref, source io.Reader) (blob.SizedRef, err
_, had := s.m[br] _, had := s.m[br]
if !had { if !had {
s.m[br] = all s.m[br] = all
s.sorted = append(s.sorted, br.String()) if s.lru != nil {
sort.Strings(s.sorted) s.lru.Add(br.String(), nil)
}
s.size += int64(len(all))
for s.maxSize != 0 && s.size > s.maxSize {
if key, _ := s.lru.RemoveOldest(); key != "" {
s.removeBlobLocked(blob.MustParse(key))
} else {
break // shouldn't happen
}
}
} }
return blob.SizedRef{br, uint32(len(all))}, nil return blob.SizedRef{br, uint32(len(all))}, nil
} }
@ -148,12 +176,24 @@ func (s *Storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef
defer close(dest) defer close(dest)
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
// TODO(bradfitz): care about keeping this sorted like we used
// to? I think it was more expensive than it was worth before,
// since maintaining it was more costly than how often it was
// used. But perhaps it'd make sense to maintain it lazily:
// construct it on EnumerateBlobs but invalidate it everywhere
// else. Probably doesn't matter much.
sorted := make([]blob.Ref, 0, len(s.m))
for br := range s.m {
sorted = append(sorted, br)
}
sort.Sort(blob.ByRef(sorted))
n := 0 n := 0
for _, k := range s.sorted { for _, br := range sorted {
if k <= after { if after != "" && br.String() <= after {
continue continue
} }
br := blob.MustParse(k)
select { select {
case dest <- blob.SizedRef{br, uint32(len(s.m[br]))}: case dest <- blob.SizedRef{br, uint32(len(s.m[br]))}:
case <-ctx.Done(): case <-ctx.Done():
@ -171,16 +211,20 @@ func (s *Storage) RemoveBlobs(blobs []blob.Ref) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
for _, br := range blobs { for _, br := range blobs {
delete(s.m, br) s.removeBlobLocked(br)
} }
s.sorted = s.sorted[:0]
for k := range s.m {
s.sorted = append(s.sorted, k.String())
}
sort.Strings(s.sorted)
return nil return nil
} }
func (s *Storage) removeBlobLocked(br blob.Ref) {
v, had := s.m[br]
if !had {
return
}
s.size -= int64(len(v))
delete(s.m, br)
}
// BlobContents returns as a string the contents of the blob br. // BlobContents returns as a string the contents of the blob br.
func (s *Storage) BlobContents(br blob.Ref) (contents string, ok bool) { func (s *Storage) BlobContents(br blob.Ref) (contents string, ok bool) {
s.mu.RLock() s.mu.RLock()
@ -203,19 +247,18 @@ func (s *Storage) NumBlobs() int {
func (s *Storage) SumBlobSize() int64 { func (s *Storage) SumBlobSize() int64 {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
var n int64 return s.size
for _, b := range s.m {
n += int64(len(b))
}
return n
} }
// BlobrefStrings returns the sorted stringified blobrefs stored in s. // BlobrefStrings returns the sorted stringified blobrefs stored in s.
func (s *Storage) BlobrefStrings() []string { func (s *Storage) BlobrefStrings() []string {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() defer s.mu.RUnlock()
sorted := make([]string, len(s.sorted)) sorted := make([]string, 0, len(s.m))
copy(sorted, s.sorted) for br := range s.m {
sorted = append(sorted, br.String())
}
sort.Strings(sorted)
return sorted return sorted
} }

View File

@ -17,11 +17,13 @@ limitations under the License.
package memory_test package memory_test
import ( import (
"strings"
"testing" "testing"
"camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/memory" "camlistore.org/pkg/blobserver/memory"
"camlistore.org/pkg/blobserver/storagetest" "camlistore.org/pkg/blobserver/storagetest"
"camlistore.org/pkg/test"
) )
// TestMemoryStorage tests against an in-memory blobserver. // TestMemoryStorage tests against an in-memory blobserver.
@ -31,3 +33,23 @@ func TestMemoryStorage(t *testing.T) {
return &memory.Storage{}, func() {} return &memory.Storage{}, func() {}
}) })
} }
func TestCache(t *testing.T) {
c := memory.NewCache(1024)
(&test.Blob{"foo"}).MustUpload(t, c)
if got, want := c.SumBlobSize(), int64(3); got != want {
t.Errorf("size = %d; want %d", got, want)
}
(&test.Blob{"bar"}).MustUpload(t, c)
if got, want := c.SumBlobSize(), int64(6); got != want {
t.Errorf("size = %d; want %d", got, want)
}
(&test.Blob{strings.Repeat("x", 1020)}).MustUpload(t, c)
if got, want := c.SumBlobSize(), int64(1023); got != want {
t.Errorf("size = %d; want %d", got, want)
}
(&test.Blob{"five!"}).MustUpload(t, c)
if got, want := c.SumBlobSize(), int64(5); got != want {
t.Errorf("size = %d; want %d", got, want)
}
}