From 53be9298a5a606b3619f49a6054f880d3289dd3d Mon Sep 17 00:00:00 2001 From: mpl Date: Sat, 18 Oct 2014 19:22:35 +0200 Subject: [PATCH] blobserver/s3: add optional in-memory Fetch cache Change-Id: I77e27555d28002ba01f1501e8a77eb4adbf701fe --- TODO | 4 ---- pkg/blobserver/s3/fetch.go | 5 +++++ pkg/blobserver/s3/receive.go | 6 ++++++ pkg/blobserver/s3/remove.go | 3 +++ pkg/blobserver/s3/s3.go | 6 ++++++ pkg/blobserver/s3/stat.go | 1 + 6 files changed, 21 insertions(+), 4 deletions(-) diff --git a/TODO b/TODO index 675146f33..926c83cb8 100644 --- a/TODO +++ b/TODO @@ -4,10 +4,6 @@ There are two TODO lists. This file (good for airplanes) and the online bug trac Offline list: --- put a memory.Storage cache in blobserver/s3, like blobserver/google/cloudstorage. - (for blobpacked being fast to re-GET a recent blob after an upload of a file, - without doing actual HTTP GETs to get the small blobs back) - -- reindexing: * review blob streaming interface & diskpacked implementation thereof. notably: should the continuation token come per-blob? then we could diff --git a/pkg/blobserver/s3/fetch.go b/pkg/blobserver/s3/fetch.go index ddbb20e34..b3a6b0883 100644 --- a/pkg/blobserver/s3/fetch.go +++ b/pkg/blobserver/s3/fetch.go @@ -26,6 +26,11 @@ func (sto *s3Storage) Fetch(blob blob.Ref) (file io.ReadCloser, size uint32, err if faultGet.FailErr(&err) { return } + if sto.cache != nil { + if file, size, err = sto.cache.Fetch(blob); err == nil { + return + } + } file, sz, err := sto.s3Client.Get(sto.bucket, blob.String()) return file, uint32(sz), err } diff --git a/pkg/blobserver/s3/receive.go b/pkg/blobserver/s3/receive.go index afd970a55..695bfc22a 100644 --- a/pkg/blobserver/s3/receive.go +++ b/pkg/blobserver/s3/receive.go @@ -22,6 +22,7 @@ import ( "io" "camlistore.org/pkg/blob" + "camlistore.org/pkg/blobserver" ) func (sto *s3Storage) ReceiveBlob(b blob.Ref, source io.Reader) (sr blob.SizedRef, err error) { @@ -41,5 +42,10 @@ func (sto *s3Storage) ReceiveBlob(b blob.Ref, source io.Reader) (sr blob.SizedRe if err != nil { return sr, err } + if sto.cache != nil { + // NoHash because it's already verified if we read it + // without errors on the io.Copy above. + blobserver.ReceiveNoHash(sto.cache, b, bytes.NewReader(buf.Bytes())) + } return blob.SizedRef{Ref: b, Size: uint32(size)}, nil } diff --git a/pkg/blobserver/s3/remove.go b/pkg/blobserver/s3/remove.go index 3fd727bd5..0ff7b4c3c 100644 --- a/pkg/blobserver/s3/remove.go +++ b/pkg/blobserver/s3/remove.go @@ -24,6 +24,9 @@ import ( var removeGate = syncutil.NewGate(20) // arbitrary func (sto *s3Storage) RemoveBlobs(blobs []blob.Ref) error { + if sto.cache != nil { + sto.cache.RemoveBlobs(blobs) + } var wg syncutil.Group for _, blob := range blobs { diff --git a/pkg/blobserver/s3/s3.go b/pkg/blobserver/s3/s3.go index 29cef36af..02cdbcf6b 100644 --- a/pkg/blobserver/s3/s3.go +++ b/pkg/blobserver/s3/s3.go @@ -39,6 +39,7 @@ import ( "strings" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/blobserver/memory" "camlistore.org/pkg/fault" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/misc/amazon/s3" @@ -55,6 +56,7 @@ type s3Storage struct { s3Client *s3.Client bucket string hostname string + cache *memory.Storage // or nil for no cache } func (s *s3Storage) String() string { @@ -63,6 +65,7 @@ func (s *s3Storage) String() string { func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) { hostname := config.OptionalString("hostname", "s3.amazonaws.com") + cacheSize := config.OptionalInt64("cacheSize", 32<<20) client := &s3.Client{ Auth: &s3.Auth{ AccessKey: config.RequiredString("aws_access_key"), @@ -79,6 +82,9 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora if err := config.Validate(); err != nil { return nil, err } + if cacheSize != 0 { + sto.cache = memory.NewCache(cacheSize) + } if !skipStartupCheck { _, err := client.ListBucket(sto.bucket, "", 1) if serr, ok := err.(*s3.Error); ok { diff --git a/pkg/blobserver/s3/stat.go b/pkg/blobserver/s3/stat.go index 6a2b3a3fc..b7beb7b28 100644 --- a/pkg/blobserver/s3/stat.go +++ b/pkg/blobserver/s3/stat.go @@ -30,6 +30,7 @@ func (sto *s3Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) (er if faultStat.FailErr(&err) { return } + // TODO: use sto.cache var wg syncutil.Group for _, br := range blobs { br := br