From 7c319ca28477830881e4a534c3d0dc55d6bb91fd Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sun, 20 Jan 2013 09:52:03 -0800 Subject: [PATCH] pkg/cacher: unify the cammount and camget disk caching code Change-Id: I8e53d00f1f5459856a4e2b863d74c2b7c3f515bc --- TODO | 4 --- cmd/camget/camget.go | 26 +++++------------ cmd/cammount/cammount.go | 15 +++------- pkg/cacher/cacher.go | 60 ++++++++++++++++++++++++++++++++++++---- 4 files changed, 66 insertions(+), 39 deletions(-) diff --git a/TODO b/TODO index 2493c315c..954fee6b3 100644 --- a/TODO +++ b/TODO @@ -5,10 +5,6 @@ latest efforts to revive it. The place to start looking is: server/camlistored/ui/fileembed_appengine.go --- unify cammount and camget's blob caching. move their init code to - pkg/cacher, and add an LRU and optional upper bound size limit in - pkg/cacher too. - -- should a "share" claim be not a claim but its own permanode, so it can be rescinded? right now you can't really unshare a "haveref" claim. or rather, TODO: verify we support "delete" claims to diff --git a/cmd/camget/camget.go b/cmd/camget/camget.go index 739f34a90..c5baf22cf 100644 --- a/cmd/camget/camget.go +++ b/cmd/camget/camget.go @@ -41,7 +41,6 @@ import ( "flag" "fmt" "io" - "io/ioutil" "log" "net" "net/http" @@ -49,7 +48,6 @@ import ( "path/filepath" "camlistore.org/pkg/blobref" - "camlistore.org/pkg/blobserver/localdisk" // used for the blob cache "camlistore.org/pkg/cacher" "camlistore.org/pkg/client" "camlistore.org/pkg/httputil" @@ -116,27 +114,18 @@ func main() { } cl.SetHTTPClient(&http.Client{Transport: httpStats}) - // Put a local disk cache in front of the HTTP client. - // TODO: this could be better about proactively cleaning things. - // Fetching 2 TB shouldn't write 2 TB to /tmp before it's done. - // Maybe the cache needs an LRU/size cap. - cacheDir, err := ioutil.TempDir("", "camlicache") - if err != nil { - log.Fatalf("Error creating temp cache directory: %v\n", err) - } - defer os.RemoveAll(cacheDir) - diskcache, err := localdisk.New(cacheDir) + diskCacheFetcher, err := cacher.NewDiskCache(cl) if err != nil { log.Fatalf("Error setting up local disk cache: %v", err) } + defer diskCacheFetcher.Clean() if *flagVerbose { - log.Printf("Using temp blob cache directory %s", cacheDir) + log.Printf("Using temp blob cache directory %s", diskCacheFetcher.Root) } - fetcher := cacher.NewCachingFetcher(diskcache, cl) for _, br := range items { if *flagGraph { - printGraph(fetcher, br) + printGraph(diskCacheFetcher, br) return } if *flagCheck { @@ -148,13 +137,12 @@ func main() { var rc io.ReadCloser var err error if *flagContents { - seekFetcher := blobref.SeekerFromStreamingFetcher(fetcher) - rc, err = schema.NewFileReader(seekFetcher, br) + rc, err = schema.NewFileReader(diskCacheFetcher, br) if err == nil { rc.(*schema.FileReader).LoadAllChunks() } } else { - rc, err = fetch(fetcher, br) + rc, err = fetch(diskCacheFetcher, br) } if err != nil { log.Fatal(err) @@ -164,7 +152,7 @@ func main() { log.Fatalf("Failed reading %q: %v", br, err) } } else { - if err := smartFetch(fetcher, *flagOutput, br); err != nil { + if err := smartFetch(diskCacheFetcher, *flagOutput, br); err != nil { log.Fatal(err) } } diff --git a/cmd/cammount/cammount.go b/cmd/cammount/cammount.go index 6c13bd7c6..66ccbbae8 100644 --- a/cmd/cammount/cammount.go +++ b/cmd/cammount/cammount.go @@ -19,14 +19,12 @@ package main import ( "flag" "fmt" - "io/ioutil" "log" "os" "os/signal" "syscall" "camlistore.org/pkg/blobref" - "camlistore.org/pkg/blobserver/localdisk" // used for the blob cache "camlistore.org/pkg/cacher" "camlistore.org/pkg/client" "camlistore.org/pkg/fs" @@ -53,16 +51,11 @@ func main() { client := client.NewOrFail() // automatic from flags - cacheDir, err := ioutil.TempDir("", "camlicache") - if err != nil { - errorf("Error creating temp cache directory: %v\n", err) - } - defer os.RemoveAll(cacheDir) - diskcache, err := localdisk.New(cacheDir) + diskCacheFetcher, err := cacher.NewDiskCache(client) if err != nil { errorf("Error setting up local disk cache: %v", err) } - fetcher := cacher.NewCachingFetcher(diskcache, client) + defer diskCacheFetcher.Clean() var camfs *fs.CamliFileSystem if flag.NArg() == 2 { @@ -71,12 +64,12 @@ func main() { errorf("Error parsing root blobref: %q\n", root) } var err error - camfs, err = fs.NewRootedCamliFileSystem(fetcher, root) + camfs, err = fs.NewRootedCamliFileSystem(diskCacheFetcher, root) if err != nil { errorf("Error creating root with %v: %v", root, err) } } else { - camfs = fs.NewCamliFileSystem(fetcher) + camfs = fs.NewCamliFileSystem(diskCacheFetcher) log.Printf("starting with fs %#v", camfs) } diff --git a/pkg/cacher/cacher.go b/pkg/cacher/cacher.go index 5db69a24f..48dfba455 100644 --- a/pkg/cacher/cacher.go +++ b/pkg/cacher/cacher.go @@ -14,20 +14,27 @@ See the License for the specific language governing permissions and limitations under the License. */ +// Package cacher provides various blobref fetching caching mechanisms. package cacher import ( "io" + "io/ioutil" + "os" "camlistore.org/pkg/blobref" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/blobserver/localdisk" "camlistore.org/pkg/singleflight" ) -func NewCachingFetcher(cacheTarget blobserver.Cache, sfetcher blobref.StreamingFetcher) *CachingFetcher { - return &CachingFetcher{c: cacheTarget, sf: sfetcher} +// NewCachingFetcher returns a CachingFetcher that fetches from +// fetcher and writes to and serves from cache. +func NewCachingFetcher(cache blobserver.Cache, fetcher blobref.StreamingFetcher) *CachingFetcher { + return &CachingFetcher{c: cache, sf: fetcher} } +// A CachingFetcher is a blobref.StreamingFetcher and a blobref.SeekFetcher. type CachingFetcher struct { c blobserver.Cache sf blobref.StreamingFetcher @@ -35,9 +42,6 @@ type CachingFetcher struct { g singleflight.Group } -var _ blobref.StreamingFetcher = (*CachingFetcher)(nil) -var _ blobref.SeekFetcher = (*CachingFetcher)(nil) - func (cf *CachingFetcher) FetchStreaming(br *blobref.BlobRef) (file io.ReadCloser, size int64, err error) { file, size, err = cf.c.Fetch(br) if err == nil { @@ -72,3 +76,49 @@ func (cf *CachingFetcher) faultIn(br *blobref.BlobRef) error { }) return err } + +// A DiskCache is a blobref.StreamingFetcher and blobref.SeekFetcher +// that serves from a local temp directory and is backed by a another +// blobref.StreamingFetcher (usually the pkg/client HTTP client). +type DiskCache struct { + *CachingFetcher + + // Root is the temp directory being used to store files. + // It is available mostly for debug printing. + Root string +} + +// NewDiskCache returns a new DiskCache from a StreamingFetcher, which +// is usually the pkg/client HTTP client (which typically has much +// higher latency and lower bandwidth than local disk). +func NewDiskCache(fetcher blobref.StreamingFetcher) (*DiskCache, error) { + // TODO: max disk size, keep LRU of access, smarter cleaning, + // persistent directory per-user, etc. + + cacheDir, err := ioutil.TempDir("", "camlicache") + if err != nil { + return nil, err + } + diskcache, err := localdisk.New(cacheDir) + if err != nil { + return nil, err + } + dc := &DiskCache{ + CachingFetcher: NewCachingFetcher(diskcache, fetcher), + Root: cacheDir, + } + return dc, nil +} + +// Clean cleans some or all of the DiskCache. +func (dc *DiskCache) Clean() { + // TODO: something less aggressive? + os.RemoveAll(dc.Root) +} + +var ( + _ blobref.StreamingFetcher = (*CachingFetcher)(nil) + _ blobref.SeekFetcher = (*CachingFetcher)(nil) + _ blobref.StreamingFetcher = (*DiskCache)(nil) + _ blobref.SeekFetcher = (*DiskCache)(nil) +)