camput: add a 'havecache', like brackup's inventory cache

Change-Id: Ie5399fb08febf685f72d17a51d65962da9a3d32d
This commit is contained in:
Brad Fitzpatrick 2011-09-17 16:59:04 -07:00
parent 6f91469776
commit 71ed6f8165
2 changed files with 132 additions and 37 deletions

View File

@ -45,24 +45,36 @@ var flagName = flag.String("name", "", "Optional name attribute to set on perman
var flagTag = flag.String("tag", "", "Optional tag attribute to set on permanode when using -permanode and -file. Single value or comma separated ones.") var flagTag = flag.String("tag", "", "Optional tag attribute to set on permanode when using -permanode and -file. Single value or comma separated ones.")
var flagVerbose = flag.Bool("verbose", false, "be verbose") var flagVerbose = flag.Bool("verbose", false, "be verbose")
var flagUseStatCache = flag.Bool("statcache", false, "Use the stat cache, assuming unchanged files already uploaded in the past are still there. Fast, but potentially dangerous.") var flagUseStatCache = flag.Bool("statcache", false, "Use the stat cache, assuming unchanged files already uploaded in the past are still there. Fast, but potentially dangerous.")
var flagUseHaveCache = flag.Bool("havecache", false, "Use the 'have cache', a cache keeping track of what blobs the remote server should already have from previous uploads.")
var flagSetAttr = flag.Bool("set-attr", false, "set (replace) an attribute") var flagSetAttr = flag.Bool("set-attr", false, "set (replace) an attribute")
var flagAddAttr = flag.Bool("add-attr", false, "add an attribute, additional if one already exists") var flagAddAttr = flag.Bool("add-attr", false, "add an attribute, additional if one already exists")
var wereErrors = false var wereErrors = false
// UploadCache is the "stat cache" for regular files. Given a current
// working directory, possibly relative filename, and stat info,
// returns what the ultimate put result (the top-level "file" schema
// blob) for that regular file was.
type UploadCache interface { type UploadCache interface {
CachedPutResult(pwd, filename string, fi *os.FileInfo) (*client.PutResult, os.Error) CachedPutResult(pwd, filename string, fi *os.FileInfo) (*client.PutResult, os.Error)
AddCachedPutResult(pwd, filename string, fi *os.FileInfo, pr *client.PutResult) AddCachedPutResult(pwd, filename string, fi *os.FileInfo, pr *client.PutResult)
} }
type HaveCache interface {
BlobExists(br *blobref.BlobRef) bool
NoteBlobExists(br *blobref.BlobRef)
}
type Uploader struct { type Uploader struct {
*client.Client *client.Client
entityFetcher jsonsign.EntityFetcher entityFetcher jsonsign.EntityFetcher
pwd string pwd string
cache UploadCache statCache UploadCache
haveCache HaveCache
filecapc chan bool filecapc chan bool
} }
@ -123,19 +135,17 @@ func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr
return nil, err return nil, err
} }
if up.cache != nil { if up.statCache != nil && fi.IsRegular() {
cachedRes, err := up.cache.CachedPutResult(up.pwd, filename, fi) cachedRes, err := up.statCache.CachedPutResult(up.pwd, filename, fi)
if err == nil { if err == nil {
vprintf("Cache HIT on %q -> %v", filename, cachedRes) vprintf("Cache HIT on %q -> %v", filename, cachedRes)
return cachedRes, nil return cachedRes, nil
} }
if fi.IsRegular() { defer func() {
defer func() { if respr != nil && outerr == nil {
if respr != nil && outerr == nil { up.statCache.AddCachedPutResult(up.pwd, filename, fi, respr)
up.cache.AddCachedPutResult(up.pwd, filename, fi, respr) }
} }()
}()
}
} }
m := schema.NewCommonFileMap(filename, fi) m := schema.NewCommonFileMap(filename, fi)
@ -203,17 +213,6 @@ func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr
return mappr, err return mappr, err
} }
func (up *Uploader) UploadMap(m map[string]interface{}) (*client.PutResult, os.Error) {
json, err := schema.MapToCamliJson(m)
if err != nil {
return nil, err
}
if *flagVerbose {
fmt.Printf("json: %s\n", json)
}
return up.Upload(client.NewUploadHandleFromString(json))
}
func (up *Uploader) SignMap(m map[string]interface{}) (string, os.Error) { func (up *Uploader) SignMap(m map[string]interface{}) (string, os.Error) {
camliSigBlobref := up.Client.SignerPublicKeyBlobref() camliSigBlobref := up.Client.SignerPublicKeyBlobref()
if camliSigBlobref == nil { if camliSigBlobref == nil {
@ -234,12 +233,34 @@ func (up *Uploader) SignMap(m map[string]interface{}) (string, os.Error) {
return sr.Sign() return sr.Sign()
} }
func (up *Uploader) UploadMap(m map[string]interface{}) (*client.PutResult, os.Error) {
json, err := schema.MapToCamliJson(m)
if err != nil {
return nil, err
}
vprintf("json: %s\n", json)
return up.uploadString(json)
}
func (up *Uploader) UploadAndSignMap(m map[string]interface{}) (*client.PutResult, os.Error) { func (up *Uploader) UploadAndSignMap(m map[string]interface{}) (*client.PutResult, os.Error) {
signed, err := up.SignMap(m) signed, err := up.SignMap(m)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return up.Upload(client.NewUploadHandleFromString(signed)) return up.uploadString(signed)
}
func (up *Uploader) uploadString(s string) (*client.PutResult, os.Error) {
uh := client.NewUploadHandleFromString(s)
if c := up.haveCache; c != nil && c.BlobExists(uh.BlobRef) {
vprintf("HaveCache HIT for %s / %d", uh.BlobRef, uh.Size)
return &client.PutResult{BlobRef: uh.BlobRef, Size: uh.Size, Skipped: true}, nil
}
pr, err := up.Upload(uh)
if err == nil && up.haveCache != nil {
up.haveCache.NoteBlobExists(uh.BlobRef)
}
return pr, err
} }
func (up *Uploader) UploadNewPermanode() (*client.PutResult, os.Error) { func (up *Uploader) UploadNewPermanode() (*client.PutResult, os.Error) {
@ -324,9 +345,14 @@ func main() {
} }
if *flagUseStatCache { if *flagUseStatCache {
cache := NewFlatCache() cache := NewFlatStatCache()
defer cache.Save() defer cache.Save()
up.cache = cache up.statCache = cache
}
if *flagUseHaveCache {
cache := NewFlatHaveCache()
defer cache.Save()
up.haveCache = cache
} }
switch { switch {

View File

@ -24,6 +24,7 @@ import (
"reflect" "reflect"
"sync" "sync"
"camli/blobref"
"camli/client" "camli/client"
"camli/osutil" "camli/osutil"
) )
@ -33,18 +34,18 @@ type fileInfoPutRes struct {
Pr client.PutResult Pr client.PutResult
} }
// FlatCache is an ugly hack, until leveldb-go is ready // FlatStatCache is an ugly hack, until leveldb-go is ready
// (http://code.google.com/p/leveldb-go/) // (http://code.google.com/p/leveldb-go/)
type FlatCache struct { type FlatStatCache struct {
mu sync.Mutex mu sync.Mutex
filename string filename string
m map[string]fileInfoPutRes m map[string]fileInfoPutRes
dirty map[string]fileInfoPutRes dirty map[string]fileInfoPutRes
} }
func NewFlatCache() *FlatCache { func NewFlatStatCache() *FlatStatCache {
filename := filepath.Join(osutil.CacheDir(), "camput.cache") filename := filepath.Join(osutil.CacheDir(), "camput.statcache")
fc := &FlatCache{ fc := &FlatStatCache{
filename: filename, filename: filename,
m: make(map[string]fileInfoPutRes), m: make(map[string]fileInfoPutRes),
dirty: make(map[string]fileInfoPutRes), dirty: make(map[string]fileInfoPutRes),
@ -68,7 +69,7 @@ func NewFlatCache() *FlatCache {
return fc return fc
} }
var _ UploadCache = (*FlatCache)(nil) var _ UploadCache = (*FlatStatCache)(nil)
var ErrCacheMiss = os.NewError("not in cache") var ErrCacheMiss = os.NewError("not in cache")
@ -78,7 +79,7 @@ func cacheKey(pwd, filename string) string {
return filepath.Clean(pwd) + "\x00" + filepath.Clean(filename) return filepath.Clean(pwd) + "\x00" + filepath.Clean(filename)
} }
func (c *FlatCache) CachedPutResult(pwd, filename string, fi *os.FileInfo) (*client.PutResult, os.Error) { func (c *FlatStatCache) CachedPutResult(pwd, filename string, fi *os.FileInfo) (*client.PutResult, os.Error) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
@ -96,7 +97,7 @@ func (c *FlatCache) CachedPutResult(pwd, filename string, fi *os.FileInfo) (*cli
return &pr, nil return &pr, nil
} }
func (c *FlatCache) AddCachedPutResult(pwd, filename string, fi *os.FileInfo, pr *client.PutResult) { func (c *FlatStatCache) AddCachedPutResult(pwd, filename string, fi *os.FileInfo, pr *client.PutResult) {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
key := cacheKey(pwd, filename) key := cacheKey(pwd, filename)
@ -108,17 +109,17 @@ func (c *FlatCache) AddCachedPutResult(pwd, filename string, fi *os.FileInfo, pr
c.m[key] = val c.m[key] = val
} }
func (c *FlatCache) Save() { func (c *FlatStatCache) Save() {
c.mu.Lock() c.mu.Lock()
defer c.mu.Unlock() defer c.mu.Unlock()
if len(c.dirty) == 0 { if len(c.dirty) == 0 {
vprintf("FlatCache: Save, but nothing dirty") vprintf("FlatStatCache: Save, but nothing dirty")
return return
} }
f, err := os.OpenFile(c.filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600) f, err := os.OpenFile(c.filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
if err != nil { if err != nil {
log.Fatalf("FlatCache OpenFile: %v", err) log.Fatalf("FlatStatCache OpenFile: %v", err)
} }
defer f.Close() defer f.Close()
e := gob.NewEncoder(f) e := gob.NewEncoder(f)
@ -132,5 +133,73 @@ func (c *FlatCache) Save() {
write(v) write(v)
} }
c.dirty = make(map[string]fileInfoPutRes) c.dirty = make(map[string]fileInfoPutRes)
log.Printf("FlatCache: saved") log.Printf("FlatStatCache: saved")
}
type FlatHaveCache struct {
mu sync.Mutex
filename string
m map[string]bool
dirty map[string]bool
}
func NewFlatHaveCache() *FlatHaveCache {
filename := filepath.Join(osutil.CacheDir(), "camput.havecache")
c := &FlatHaveCache{
filename: filename,
m: make(map[string]bool),
dirty: make(map[string]bool),
}
if f, err := os.Open(filename); err == nil {
defer f.Close()
d := gob.NewDecoder(f)
for {
var key string
if d.Decode(&key) != nil {
break
}
c.m[key] = true
}
}
return c
}
func (c *FlatHaveCache) BlobExists(br *blobref.BlobRef) bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.m[br.String()]
}
func (c *FlatHaveCache) NoteBlobExists(br *blobref.BlobRef) {
c.mu.Lock()
defer c.mu.Unlock()
k := br.String()
c.m[k] = true
c.dirty[k] = true
}
func (c *FlatHaveCache) Save() {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.dirty) == 0 {
vprintf("FlatHaveCache: Save, but nothing dirty")
return
}
f, err := os.OpenFile(c.filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
log.Fatalf("FlatHaveCache OpenFile: %v", err)
}
defer f.Close()
e := gob.NewEncoder(f)
write := func(v interface{}) {
if err := e.Encode(v); err != nil {
panic("Encode: " + err.String())
}
}
for k, _ := range c.dirty {
write(k)
}
c.dirty = make(map[string]bool)
log.Printf("FlatHaveCache: saved")
} }