diff --git a/clients/go/camput/camput.go b/clients/go/camput/camput.go index 4345f2143..9de341914 100644 --- a/clients/go/camput/camput.go +++ b/clients/go/camput/camput.go @@ -45,24 +45,36 @@ var flagName = flag.String("name", "", "Optional name attribute to set on perman var flagTag = flag.String("tag", "", "Optional tag attribute to set on permanode when using -permanode and -file. Single value or comma separated ones.") var flagVerbose = flag.Bool("verbose", false, "be verbose") + var flagUseStatCache = flag.Bool("statcache", false, "Use the stat cache, assuming unchanged files already uploaded in the past are still there. Fast, but potentially dangerous.") +var flagUseHaveCache = flag.Bool("havecache", false, "Use the 'have cache', a cache keeping track of what blobs the remote server should already have from previous uploads.") var flagSetAttr = flag.Bool("set-attr", false, "set (replace) an attribute") var flagAddAttr = flag.Bool("add-attr", false, "add an attribute, additional if one already exists") var wereErrors = false +// UploadCache is the "stat cache" for regular files. Given a current +// working directory, possibly relative filename, and stat info, +// returns what the ultimate put result (the top-level "file" schema +// blob) for that regular file was. type UploadCache interface { CachedPutResult(pwd, filename string, fi *os.FileInfo) (*client.PutResult, os.Error) AddCachedPutResult(pwd, filename string, fi *os.FileInfo, pr *client.PutResult) } +type HaveCache interface { + BlobExists(br *blobref.BlobRef) bool + NoteBlobExists(br *blobref.BlobRef) +} + type Uploader struct { *client.Client entityFetcher jsonsign.EntityFetcher - pwd string - cache UploadCache + pwd string + statCache UploadCache + haveCache HaveCache filecapc chan bool } @@ -123,19 +135,17 @@ func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr return nil, err } - if up.cache != nil { - cachedRes, err := up.cache.CachedPutResult(up.pwd, filename, fi) + if up.statCache != nil && fi.IsRegular() { + cachedRes, err := up.statCache.CachedPutResult(up.pwd, filename, fi) if err == nil { vprintf("Cache HIT on %q -> %v", filename, cachedRes) return cachedRes, nil } - if fi.IsRegular() { - defer func() { - if respr != nil && outerr == nil { - up.cache.AddCachedPutResult(up.pwd, filename, fi, respr) - } - }() - } + defer func() { + if respr != nil && outerr == nil { + up.statCache.AddCachedPutResult(up.pwd, filename, fi, respr) + } + }() } m := schema.NewCommonFileMap(filename, fi) @@ -203,17 +213,6 @@ func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr return mappr, err } -func (up *Uploader) UploadMap(m map[string]interface{}) (*client.PutResult, os.Error) { - json, err := schema.MapToCamliJson(m) - if err != nil { - return nil, err - } - if *flagVerbose { - fmt.Printf("json: %s\n", json) - } - return up.Upload(client.NewUploadHandleFromString(json)) -} - func (up *Uploader) SignMap(m map[string]interface{}) (string, os.Error) { camliSigBlobref := up.Client.SignerPublicKeyBlobref() if camliSigBlobref == nil { @@ -234,12 +233,34 @@ func (up *Uploader) SignMap(m map[string]interface{}) (string, os.Error) { return sr.Sign() } +func (up *Uploader) UploadMap(m map[string]interface{}) (*client.PutResult, os.Error) { + json, err := schema.MapToCamliJson(m) + if err != nil { + return nil, err + } + vprintf("json: %s\n", json) + return up.uploadString(json) +} + func (up *Uploader) UploadAndSignMap(m map[string]interface{}) (*client.PutResult, os.Error) { signed, err := up.SignMap(m) if err != nil { return nil, err } - return up.Upload(client.NewUploadHandleFromString(signed)) + return up.uploadString(signed) +} + +func (up *Uploader) uploadString(s string) (*client.PutResult, os.Error) { + uh := client.NewUploadHandleFromString(s) + if c := up.haveCache; c != nil && c.BlobExists(uh.BlobRef) { + vprintf("HaveCache HIT for %s / %d", uh.BlobRef, uh.Size) + return &client.PutResult{BlobRef: uh.BlobRef, Size: uh.Size, Skipped: true}, nil + } + pr, err := up.Upload(uh) + if err == nil && up.haveCache != nil { + up.haveCache.NoteBlobExists(uh.BlobRef) + } + return pr, err } func (up *Uploader) UploadNewPermanode() (*client.PutResult, os.Error) { @@ -324,9 +345,14 @@ func main() { } if *flagUseStatCache { - cache := NewFlatCache() + cache := NewFlatStatCache() defer cache.Save() - up.cache = cache + up.statCache = cache + } + if *flagUseHaveCache { + cache := NewFlatHaveCache() + defer cache.Save() + up.haveCache = cache } switch { diff --git a/clients/go/camput/flatcache.go b/clients/go/camput/flatcache.go index cb58db360..904278e50 100644 --- a/clients/go/camput/flatcache.go +++ b/clients/go/camput/flatcache.go @@ -24,6 +24,7 @@ import ( "reflect" "sync" + "camli/blobref" "camli/client" "camli/osutil" ) @@ -33,18 +34,18 @@ type fileInfoPutRes struct { Pr client.PutResult } -// FlatCache is an ugly hack, until leveldb-go is ready +// FlatStatCache is an ugly hack, until leveldb-go is ready // (http://code.google.com/p/leveldb-go/) -type FlatCache struct { +type FlatStatCache struct { mu sync.Mutex filename string m map[string]fileInfoPutRes dirty map[string]fileInfoPutRes } -func NewFlatCache() *FlatCache { - filename := filepath.Join(osutil.CacheDir(), "camput.cache") - fc := &FlatCache{ +func NewFlatStatCache() *FlatStatCache { + filename := filepath.Join(osutil.CacheDir(), "camput.statcache") + fc := &FlatStatCache{ filename: filename, m: make(map[string]fileInfoPutRes), dirty: make(map[string]fileInfoPutRes), @@ -68,7 +69,7 @@ func NewFlatCache() *FlatCache { return fc } -var _ UploadCache = (*FlatCache)(nil) +var _ UploadCache = (*FlatStatCache)(nil) var ErrCacheMiss = os.NewError("not in cache") @@ -78,7 +79,7 @@ func cacheKey(pwd, filename string) string { return filepath.Clean(pwd) + "\x00" + filepath.Clean(filename) } -func (c *FlatCache) CachedPutResult(pwd, filename string, fi *os.FileInfo) (*client.PutResult, os.Error) { +func (c *FlatStatCache) CachedPutResult(pwd, filename string, fi *os.FileInfo) (*client.PutResult, os.Error) { c.mu.Lock() defer c.mu.Unlock() @@ -96,7 +97,7 @@ func (c *FlatCache) CachedPutResult(pwd, filename string, fi *os.FileInfo) (*cli return &pr, nil } -func (c *FlatCache) AddCachedPutResult(pwd, filename string, fi *os.FileInfo, pr *client.PutResult) { +func (c *FlatStatCache) AddCachedPutResult(pwd, filename string, fi *os.FileInfo, pr *client.PutResult) { c.mu.Lock() defer c.mu.Unlock() key := cacheKey(pwd, filename) @@ -108,17 +109,17 @@ func (c *FlatCache) AddCachedPutResult(pwd, filename string, fi *os.FileInfo, pr c.m[key] = val } -func (c *FlatCache) Save() { +func (c *FlatStatCache) Save() { c.mu.Lock() defer c.mu.Unlock() if len(c.dirty) == 0 { - vprintf("FlatCache: Save, but nothing dirty") + vprintf("FlatStatCache: Save, but nothing dirty") return } f, err := os.OpenFile(c.filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600) if err != nil { - log.Fatalf("FlatCache OpenFile: %v", err) + log.Fatalf("FlatStatCache OpenFile: %v", err) } defer f.Close() e := gob.NewEncoder(f) @@ -132,5 +133,73 @@ func (c *FlatCache) Save() { write(v) } c.dirty = make(map[string]fileInfoPutRes) - log.Printf("FlatCache: saved") + log.Printf("FlatStatCache: saved") +} + +type FlatHaveCache struct { + mu sync.Mutex + filename string + m map[string]bool + dirty map[string]bool +} + +func NewFlatHaveCache() *FlatHaveCache { + filename := filepath.Join(osutil.CacheDir(), "camput.havecache") + c := &FlatHaveCache{ + filename: filename, + m: make(map[string]bool), + dirty: make(map[string]bool), + } + if f, err := os.Open(filename); err == nil { + defer f.Close() + d := gob.NewDecoder(f) + for { + var key string + if d.Decode(&key) != nil { + break + } + c.m[key] = true + } + } + return c +} + +func (c *FlatHaveCache) BlobExists(br *blobref.BlobRef) bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.m[br.String()] +} + +func (c *FlatHaveCache) NoteBlobExists(br *blobref.BlobRef) { + c.mu.Lock() + defer c.mu.Unlock() + k := br.String() + c.m[k] = true + c.dirty[k] = true +} + +func (c *FlatHaveCache) Save() { + c.mu.Lock() + defer c.mu.Unlock() + if len(c.dirty) == 0 { + vprintf("FlatHaveCache: Save, but nothing dirty") + return + } + + f, err := os.OpenFile(c.filename, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0600) + if err != nil { + log.Fatalf("FlatHaveCache OpenFile: %v", err) + } + defer f.Close() + e := gob.NewEncoder(f) + write := func(v interface{}) { + if err := e.Encode(v); err != nil { + panic("Encode: " + err.String()) + } + } + for k, _ := range c.dirty { + write(k) + } + c.dirty = make(map[string]bool) + log.Printf("FlatHaveCache: saved") }