From e321eb75d57df544cbf193bd4cfc28f45b824f31 Mon Sep 17 00:00:00 2001 From: mpl Date: Thu, 18 Aug 2016 23:42:09 +0200 Subject: [PATCH] camput: replace kv with leveldb for stat/havecache Fixes #382 Change-Id: I9600049395c706f84f284fd7c774c3101a8eaf6d --- cmd/camput/cache.go | 4 +++ cmd/camput/kvcache.go | 77 +++++++++++++++---------------------------- 2 files changed, 30 insertions(+), 51 deletions(-) diff --git a/cmd/camput/cache.go b/cmd/camput/cache.go index fc446dca0..c616bbd16 100644 --- a/cmd/camput/cache.go +++ b/cmd/camput/cache.go @@ -24,6 +24,8 @@ import ( ) // A HaveCache tracks whether a remove blobserver has a blob or not. +// As NoteBlobExists overwrites any existing value, it is the responsibilty of +// the caller to stat before writing. type HaveCache interface { StatBlobCache(br blob.Ref) (size uint32, ok bool) NoteBlobExists(br blob.Ref, size uint32) @@ -45,6 +47,8 @@ type UploadCache interface { // was uploaded. If withPermanode, it means a planned permanode was created // for this file when it was uploaded (with -filenodes), and the cache entry // will reflect that. + // As AddCachedPutResult overwrites any existing value, it is the responsibilty of + // the caller to stat before writing. AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult, withPermanode bool) Close() error } diff --git a/cmd/camput/kvcache.go b/cmd/camput/kvcache.go index 3688dae32..86b018a27 100644 --- a/cmd/camput/kvcache.go +++ b/cmd/camput/kvcache.go @@ -33,15 +33,15 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/client" - "camlistore.org/pkg/kvutil" "camlistore.org/pkg/osutil" - "github.com/cznic/kv" + + "github.com/syndtr/goleveldb/leveldb" ) var errCacheMiss = errors.New("not in cache") // KvHaveCache is a HaveCache on top of a single -// mutable database file on disk using github.com/cznic/kv. +// mutable database file on disk using github.com/syndtr/goleveldb. // It stores the blobref in binary as the key, and // the blobsize in binary as the value. // Access to the cache is restricted to one process @@ -49,13 +49,13 @@ var errCacheMiss = errors.New("not in cache") // to remove the lock. type KvHaveCache struct { filename string - db *kv.DB + db *leveldb.DB } func NewKvHaveCache(gen string) *KvHaveCache { cleanCacheDir() - fullPath := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen)+".kv") - db, err := kvutil.Open(fullPath, nil) + fullPath := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen)+".leveldb") + db, err := leveldb.OpenFile(fullPath, nil) if err != nil { log.Fatalf("Could not create/open new have cache at %v, %v", fullPath, err) } @@ -76,14 +76,14 @@ func (c *KvHaveCache) StatBlobCache(br blob.Ref) (size uint32, ok bool) { return } binBr, _ := br.MarshalBinary() - binVal, err := c.db.Get(nil, binBr) + binVal, err := c.db.Get(binBr, nil) if err != nil { + if err == leveldb.ErrNotFound { + cachelog.Printf("have cache MISS on %v", br) + return + } log.Fatalf("Could not query have cache %v for %v: %v", c.filename, br, err) } - if binVal == nil { - cachelog.Printf("have cache MISS on %v", br) - return - } val, err := strconv.ParseUint(string(binVal), 10, 32) if err != nil { log.Fatalf("Could not decode have cache binary value for %v: %v", br, err) @@ -105,21 +105,13 @@ func (c *KvHaveCache) NoteBlobExists(br blob.Ref, size uint32) { binBr, _ := br.MarshalBinary() binVal := []byte(strconv.Itoa(int(size))) cachelog.Printf("Adding to have cache %v: %q", br, binVal) - _, _, err := c.db.Put(nil, binBr, - func(binBr, old []byte) ([]byte, bool, error) { - // We do not overwrite dups - if old != nil { - return nil, false, nil - } - return binVal, true, nil - }) - if err != nil { + if err := c.db.Put(binBr, binVal, nil); err != nil { log.Fatalf("Could not write %v in have cache: %v", br, err) } } // KvStatCache is an UploadCache on top of a single -// mutable database file on disk using github.com/cznic/kv. +// mutable database file on disk using github.com/syndtr/goleveldb. // It stores a binary combination of an os.FileInfo fingerprint and // a client.Putresult as the key, and the blobsize in binary as // the value. @@ -128,12 +120,12 @@ func (c *KvHaveCache) NoteBlobExists(br blob.Ref, size uint32) { // to remove the lock. type KvStatCache struct { filename string - db *kv.DB + db *leveldb.DB } func NewKvStatCache(gen string) *KvStatCache { - fullPath := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen)+".kv") - db, err := kvutil.Open(fullPath, nil) + fullPath := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen)+".leveldb") + db, err := leveldb.OpenFile(fullPath, nil) if err != nil { log.Fatalf("Could not create/open new stat cache at %v, %v", fullPath, err) } @@ -156,13 +148,13 @@ func (c *KvStatCache) CachedPutResult(pwd, filename string, fi os.FileInfo, with Permanode: withPermanode, } binKey, err := cacheKey.marshalBinary() - binVal, err := c.db.Get(nil, binKey) + binVal, err := c.db.Get(binKey, nil) if err != nil { - log.Fatalf("Could not query stat cache %v for %q: %v", binKey, fullPath, err) - } - if binVal == nil { - cachelog.Printf("stat cache MISS on %q", binKey) - return nil, errCacheMiss + if err == leveldb.ErrNotFound { + cachelog.Printf("stat cache MISS on %q", binKey) + return nil, errCacheMiss + } + log.Fatalf("Could not query stat cache %q for %v: %v", binKey, fullPath, err) } val := &statCacheValue{} if err = val.unmarshalBinary(binVal); err != nil { @@ -194,15 +186,7 @@ func (c *KvStatCache) AddCachedPutResult(pwd, filename string, fi os.FileInfo, p log.Fatalf("Could not add %q to stat cache: %v", binKey, err) } cachelog.Printf("Adding to stat cache %q: %q", binKey, binVal) - _, _, err = c.db.Put(nil, binKey, - func(binKey, old []byte) ([]byte, bool, error) { - // We do not overwrite dups - if old != nil { - return nil, false, nil - } - return binVal, true, nil - }) - if err != nil { + if err := c.db.Put(binKey, binVal, nil); err != nil { log.Fatalf("Could not add %q to stat cache: %v", binKey, err) } } @@ -339,8 +323,8 @@ func fileInfoToFingerprint(fi os.FileInfo) statFingerprint { return statFingerprint(fmt.Sprintf("%dB/%dMOD/sys-%d", fi.Size(), fi.ModTime().UnixNano(), sysHash)) } -// Delete stranded lock files and all but the oldest 5 -// havecache/statcache files, unless they're newer than 30 days. +// Delete all but the oldest 5 havecache/statcache dirs, unless they're newer +// than 30 days. func cleanCacheDir() { dir := osutil.CacheDir() f, err := os.Open(dir) @@ -358,16 +342,7 @@ func cleanCacheDir() { seen[fi.Name()] = true } - for name := range seen { - if strings.HasSuffix(name, ".lock") && !seen[strings.TrimSuffix(name, ".lock")] { - os.Remove(filepath.Join(dir, name)) - } - } - for _, fi := range fis { - if strings.HasSuffix(fi.Name(), ".lock") { - continue - } if strings.HasPrefix(fi.Name(), "camput.havecache.") { haveCache = append(haveCache, fi) continue @@ -385,7 +360,7 @@ func cleanCacheDir() { list = list[:len(list)-5] for _, fi := range list { if fi.ModTime().Before(time.Now().Add(-30 * 24 * time.Hour)) { - os.Remove(filepath.Join(dir, fi.Name())) + os.RemoveAll(filepath.Join(dir, fi.Name())) } } }