camput: replace kv with leveldb for stat/havecache

Fixes #382

Change-Id: I9600049395c706f84f284fd7c774c3101a8eaf6d
This commit is contained in:
mpl 2016-08-18 23:42:09 +02:00
parent 108f4b7f61
commit e321eb75d5
2 changed files with 30 additions and 51 deletions

View File

@ -24,6 +24,8 @@ import (
)
// A HaveCache tracks whether a remove blobserver has a blob or not.
// As NoteBlobExists overwrites any existing value, it is the responsibilty of
// the caller to stat before writing.
type HaveCache interface {
StatBlobCache(br blob.Ref) (size uint32, ok bool)
NoteBlobExists(br blob.Ref, size uint32)
@ -45,6 +47,8 @@ type UploadCache interface {
// was uploaded. If withPermanode, it means a planned permanode was created
// for this file when it was uploaded (with -filenodes), and the cache entry
// will reflect that.
// As AddCachedPutResult overwrites any existing value, it is the responsibilty of
// the caller to stat before writing.
AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult, withPermanode bool)
Close() error
}

View File

@ -33,15 +33,15 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/client"
"camlistore.org/pkg/kvutil"
"camlistore.org/pkg/osutil"
"github.com/cznic/kv"
"github.com/syndtr/goleveldb/leveldb"
)
var errCacheMiss = errors.New("not in cache")
// KvHaveCache is a HaveCache on top of a single
// mutable database file on disk using github.com/cznic/kv.
// mutable database file on disk using github.com/syndtr/goleveldb.
// It stores the blobref in binary as the key, and
// the blobsize in binary as the value.
// Access to the cache is restricted to one process
@ -49,13 +49,13 @@ var errCacheMiss = errors.New("not in cache")
// to remove the lock.
type KvHaveCache struct {
filename string
db *kv.DB
db *leveldb.DB
}
func NewKvHaveCache(gen string) *KvHaveCache {
cleanCacheDir()
fullPath := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen)+".kv")
db, err := kvutil.Open(fullPath, nil)
fullPath := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen)+".leveldb")
db, err := leveldb.OpenFile(fullPath, nil)
if err != nil {
log.Fatalf("Could not create/open new have cache at %v, %v", fullPath, err)
}
@ -76,14 +76,14 @@ func (c *KvHaveCache) StatBlobCache(br blob.Ref) (size uint32, ok bool) {
return
}
binBr, _ := br.MarshalBinary()
binVal, err := c.db.Get(nil, binBr)
binVal, err := c.db.Get(binBr, nil)
if err != nil {
if err == leveldb.ErrNotFound {
cachelog.Printf("have cache MISS on %v", br)
return
}
log.Fatalf("Could not query have cache %v for %v: %v", c.filename, br, err)
}
if binVal == nil {
cachelog.Printf("have cache MISS on %v", br)
return
}
val, err := strconv.ParseUint(string(binVal), 10, 32)
if err != nil {
log.Fatalf("Could not decode have cache binary value for %v: %v", br, err)
@ -105,21 +105,13 @@ func (c *KvHaveCache) NoteBlobExists(br blob.Ref, size uint32) {
binBr, _ := br.MarshalBinary()
binVal := []byte(strconv.Itoa(int(size)))
cachelog.Printf("Adding to have cache %v: %q", br, binVal)
_, _, err := c.db.Put(nil, binBr,
func(binBr, old []byte) ([]byte, bool, error) {
// We do not overwrite dups
if old != nil {
return nil, false, nil
}
return binVal, true, nil
})
if err != nil {
if err := c.db.Put(binBr, binVal, nil); err != nil {
log.Fatalf("Could not write %v in have cache: %v", br, err)
}
}
// KvStatCache is an UploadCache on top of a single
// mutable database file on disk using github.com/cznic/kv.
// mutable database file on disk using github.com/syndtr/goleveldb.
// It stores a binary combination of an os.FileInfo fingerprint and
// a client.Putresult as the key, and the blobsize in binary as
// the value.
@ -128,12 +120,12 @@ func (c *KvHaveCache) NoteBlobExists(br blob.Ref, size uint32) {
// to remove the lock.
type KvStatCache struct {
filename string
db *kv.DB
db *leveldb.DB
}
func NewKvStatCache(gen string) *KvStatCache {
fullPath := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen)+".kv")
db, err := kvutil.Open(fullPath, nil)
fullPath := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen)+".leveldb")
db, err := leveldb.OpenFile(fullPath, nil)
if err != nil {
log.Fatalf("Could not create/open new stat cache at %v, %v", fullPath, err)
}
@ -156,13 +148,13 @@ func (c *KvStatCache) CachedPutResult(pwd, filename string, fi os.FileInfo, with
Permanode: withPermanode,
}
binKey, err := cacheKey.marshalBinary()
binVal, err := c.db.Get(nil, binKey)
binVal, err := c.db.Get(binKey, nil)
if err != nil {
log.Fatalf("Could not query stat cache %v for %q: %v", binKey, fullPath, err)
}
if binVal == nil {
cachelog.Printf("stat cache MISS on %q", binKey)
return nil, errCacheMiss
if err == leveldb.ErrNotFound {
cachelog.Printf("stat cache MISS on %q", binKey)
return nil, errCacheMiss
}
log.Fatalf("Could not query stat cache %q for %v: %v", binKey, fullPath, err)
}
val := &statCacheValue{}
if err = val.unmarshalBinary(binVal); err != nil {
@ -194,15 +186,7 @@ func (c *KvStatCache) AddCachedPutResult(pwd, filename string, fi os.FileInfo, p
log.Fatalf("Could not add %q to stat cache: %v", binKey, err)
}
cachelog.Printf("Adding to stat cache %q: %q", binKey, binVal)
_, _, err = c.db.Put(nil, binKey,
func(binKey, old []byte) ([]byte, bool, error) {
// We do not overwrite dups
if old != nil {
return nil, false, nil
}
return binVal, true, nil
})
if err != nil {
if err := c.db.Put(binKey, binVal, nil); err != nil {
log.Fatalf("Could not add %q to stat cache: %v", binKey, err)
}
}
@ -339,8 +323,8 @@ func fileInfoToFingerprint(fi os.FileInfo) statFingerprint {
return statFingerprint(fmt.Sprintf("%dB/%dMOD/sys-%d", fi.Size(), fi.ModTime().UnixNano(), sysHash))
}
// Delete stranded lock files and all but the oldest 5
// havecache/statcache files, unless they're newer than 30 days.
// Delete all but the oldest 5 havecache/statcache dirs, unless they're newer
// than 30 days.
func cleanCacheDir() {
dir := osutil.CacheDir()
f, err := os.Open(dir)
@ -358,16 +342,7 @@ func cleanCacheDir() {
seen[fi.Name()] = true
}
for name := range seen {
if strings.HasSuffix(name, ".lock") && !seen[strings.TrimSuffix(name, ".lock")] {
os.Remove(filepath.Join(dir, name))
}
}
for _, fi := range fis {
if strings.HasSuffix(fi.Name(), ".lock") {
continue
}
if strings.HasPrefix(fi.Name(), "camput.havecache.") {
haveCache = append(haveCache, fi)
continue
@ -385,7 +360,7 @@ func cleanCacheDir() {
list = list[:len(list)-5]
for _, fi := range list {
if fi.ModTime().Before(time.Now().Add(-30 * 24 * time.Hour)) {
os.Remove(filepath.Join(dir, fi.Name()))
os.RemoveAll(filepath.Join(dir, fi.Name()))
}
}
}