pkg/cacher: unify the cammount and camget disk caching code

Change-Id: I8e53d00f1f5459856a4e2b863d74c2b7c3f515bc
This commit is contained in:
Brad Fitzpatrick 2013-01-20 09:52:03 -08:00
parent d4e4653ae3
commit 7c319ca284
4 changed files with 66 additions and 39 deletions

4
TODO
View File

@ -5,10 +5,6 @@
latest efforts to revive it. The place to start looking is:
server/camlistored/ui/fileembed_appengine.go
-- unify cammount and camget's blob caching. move their init code to
pkg/cacher, and add an LRU and optional upper bound size limit in
pkg/cacher too.
-- should a "share" claim be not a claim but its own permanode, so it
can be rescinded? right now you can't really unshare a "haveref"
claim. or rather, TODO: verify we support "delete" claims to

View File

@ -41,7 +41,6 @@ import (
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"net"
"net/http"
@ -49,7 +48,6 @@ import (
"path/filepath"
"camlistore.org/pkg/blobref"
"camlistore.org/pkg/blobserver/localdisk" // used for the blob cache
"camlistore.org/pkg/cacher"
"camlistore.org/pkg/client"
"camlistore.org/pkg/httputil"
@ -116,27 +114,18 @@ func main() {
}
cl.SetHTTPClient(&http.Client{Transport: httpStats})
// Put a local disk cache in front of the HTTP client.
// TODO: this could be better about proactively cleaning things.
// Fetching 2 TB shouldn't write 2 TB to /tmp before it's done.
// Maybe the cache needs an LRU/size cap.
cacheDir, err := ioutil.TempDir("", "camlicache")
if err != nil {
log.Fatalf("Error creating temp cache directory: %v\n", err)
}
defer os.RemoveAll(cacheDir)
diskcache, err := localdisk.New(cacheDir)
diskCacheFetcher, err := cacher.NewDiskCache(cl)
if err != nil {
log.Fatalf("Error setting up local disk cache: %v", err)
}
defer diskCacheFetcher.Clean()
if *flagVerbose {
log.Printf("Using temp blob cache directory %s", cacheDir)
log.Printf("Using temp blob cache directory %s", diskCacheFetcher.Root)
}
fetcher := cacher.NewCachingFetcher(diskcache, cl)
for _, br := range items {
if *flagGraph {
printGraph(fetcher, br)
printGraph(diskCacheFetcher, br)
return
}
if *flagCheck {
@ -148,13 +137,12 @@ func main() {
var rc io.ReadCloser
var err error
if *flagContents {
seekFetcher := blobref.SeekerFromStreamingFetcher(fetcher)
rc, err = schema.NewFileReader(seekFetcher, br)
rc, err = schema.NewFileReader(diskCacheFetcher, br)
if err == nil {
rc.(*schema.FileReader).LoadAllChunks()
}
} else {
rc, err = fetch(fetcher, br)
rc, err = fetch(diskCacheFetcher, br)
}
if err != nil {
log.Fatal(err)
@ -164,7 +152,7 @@ func main() {
log.Fatalf("Failed reading %q: %v", br, err)
}
} else {
if err := smartFetch(fetcher, *flagOutput, br); err != nil {
if err := smartFetch(diskCacheFetcher, *flagOutput, br); err != nil {
log.Fatal(err)
}
}

View File

@ -19,14 +19,12 @@ package main
import (
"flag"
"fmt"
"io/ioutil"
"log"
"os"
"os/signal"
"syscall"
"camlistore.org/pkg/blobref"
"camlistore.org/pkg/blobserver/localdisk" // used for the blob cache
"camlistore.org/pkg/cacher"
"camlistore.org/pkg/client"
"camlistore.org/pkg/fs"
@ -53,16 +51,11 @@ func main() {
client := client.NewOrFail() // automatic from flags
cacheDir, err := ioutil.TempDir("", "camlicache")
if err != nil {
errorf("Error creating temp cache directory: %v\n", err)
}
defer os.RemoveAll(cacheDir)
diskcache, err := localdisk.New(cacheDir)
diskCacheFetcher, err := cacher.NewDiskCache(client)
if err != nil {
errorf("Error setting up local disk cache: %v", err)
}
fetcher := cacher.NewCachingFetcher(diskcache, client)
defer diskCacheFetcher.Clean()
var camfs *fs.CamliFileSystem
if flag.NArg() == 2 {
@ -71,12 +64,12 @@ func main() {
errorf("Error parsing root blobref: %q\n", root)
}
var err error
camfs, err = fs.NewRootedCamliFileSystem(fetcher, root)
camfs, err = fs.NewRootedCamliFileSystem(diskCacheFetcher, root)
if err != nil {
errorf("Error creating root with %v: %v", root, err)
}
} else {
camfs = fs.NewCamliFileSystem(fetcher)
camfs = fs.NewCamliFileSystem(diskCacheFetcher)
log.Printf("starting with fs %#v", camfs)
}

View File

@ -14,20 +14,27 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
// Package cacher provides various blobref fetching caching mechanisms.
package cacher
import (
"io"
"io/ioutil"
"os"
"camlistore.org/pkg/blobref"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/localdisk"
"camlistore.org/pkg/singleflight"
)
func NewCachingFetcher(cacheTarget blobserver.Cache, sfetcher blobref.StreamingFetcher) *CachingFetcher {
return &CachingFetcher{c: cacheTarget, sf: sfetcher}
// NewCachingFetcher returns a CachingFetcher that fetches from
// fetcher and writes to and serves from cache.
func NewCachingFetcher(cache blobserver.Cache, fetcher blobref.StreamingFetcher) *CachingFetcher {
return &CachingFetcher{c: cache, sf: fetcher}
}
// A CachingFetcher is a blobref.StreamingFetcher and a blobref.SeekFetcher.
type CachingFetcher struct {
c blobserver.Cache
sf blobref.StreamingFetcher
@ -35,9 +42,6 @@ type CachingFetcher struct {
g singleflight.Group
}
var _ blobref.StreamingFetcher = (*CachingFetcher)(nil)
var _ blobref.SeekFetcher = (*CachingFetcher)(nil)
func (cf *CachingFetcher) FetchStreaming(br *blobref.BlobRef) (file io.ReadCloser, size int64, err error) {
file, size, err = cf.c.Fetch(br)
if err == nil {
@ -72,3 +76,49 @@ func (cf *CachingFetcher) faultIn(br *blobref.BlobRef) error {
})
return err
}
// A DiskCache is a blobref.StreamingFetcher and blobref.SeekFetcher
// that serves from a local temp directory and is backed by a another
// blobref.StreamingFetcher (usually the pkg/client HTTP client).
type DiskCache struct {
*CachingFetcher
// Root is the temp directory being used to store files.
// It is available mostly for debug printing.
Root string
}
// NewDiskCache returns a new DiskCache from a StreamingFetcher, which
// is usually the pkg/client HTTP client (which typically has much
// higher latency and lower bandwidth than local disk).
func NewDiskCache(fetcher blobref.StreamingFetcher) (*DiskCache, error) {
// TODO: max disk size, keep LRU of access, smarter cleaning,
// persistent directory per-user, etc.
cacheDir, err := ioutil.TempDir("", "camlicache")
if err != nil {
return nil, err
}
diskcache, err := localdisk.New(cacheDir)
if err != nil {
return nil, err
}
dc := &DiskCache{
CachingFetcher: NewCachingFetcher(diskcache, fetcher),
Root: cacheDir,
}
return dc, nil
}
// Clean cleans some or all of the DiskCache.
func (dc *DiskCache) Clean() {
// TODO: something less aggressive?
os.RemoveAll(dc.Root)
}
var (
_ blobref.StreamingFetcher = (*CachingFetcher)(nil)
_ blobref.SeekFetcher = (*CachingFetcher)(nil)
_ blobref.StreamingFetcher = (*DiskCache)(nil)
_ blobref.SeekFetcher = (*DiskCache)(nil)
)