From 85ef3b89e9d2a503bd8d63a4cda6e77044386928 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 27 Sep 2014 16:53:34 -0700 Subject: [PATCH] blobserver/google/cloudstorage: add optional in-memory Fetch cache So blobpacked can get blobs quickly without HTTP GET requests when it needs to do all the small blob fetches when writing the zip file. Change-Id: Ibbab0483920f2ef3b70c60eeebcd0056a8e4364f --- TODO | 4 ++ pkg/blobserver/google/cloudstorage/storage.go | 46 +++++++++++++------ 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/TODO b/TODO index 926c83cb8..675146f33 100644 --- a/TODO +++ b/TODO @@ -4,6 +4,10 @@ There are two TODO lists. This file (good for airplanes) and the online bug trac Offline list: +-- put a memory.Storage cache in blobserver/s3, like blobserver/google/cloudstorage. + (for blobpacked being fast to re-GET a recent blob after an upload of a file, + without doing actual HTTP GETs to get the small blobs back) + -- reindexing: * review blob streaming interface & diskpacked implementation thereof. notably: should the continuation token come per-blob? then we could diff --git a/pkg/blobserver/google/cloudstorage/storage.go b/pkg/blobserver/google/cloudstorage/storage.go index 008469226..0b5859fa9 100644 --- a/pkg/blobserver/google/cloudstorage/storage.go +++ b/pkg/blobserver/google/cloudstorage/storage.go @@ -31,6 +31,7 @@ import ( "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/blobserver/memory" "camlistore.org/pkg/constants" "camlistore.org/pkg/context" "camlistore.org/pkg/googlestorage" @@ -41,6 +42,7 @@ import ( type Storage struct { bucket string // the gs bucket containing blobs client *googlestorage.Client + cache *memory.Storage // or nil for no cache // For blobserver.Generationer: genTime time.Time @@ -61,8 +63,9 @@ func (gs *Storage) ResetStorageGeneration() error { return errors.New("not suppo func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) { var ( - auth = config.RequiredObject("auth") - bucket = config.RequiredString("bucket") + auth = config.RequiredObject("auth") + bucket = config.RequiredString("bucket") + cacheSize = config.OptionalInt64("cacheSize", 32<<20) clientID = auth.RequiredString("client_id") // or "auto" for service accounts clientSecret = auth.OptionalString("client_secret", "") @@ -94,6 +97,10 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora clientID, clientSecret, refreshToken)) } + if cacheSize != 0 { + gs.cache = memory.NewCache(cacheSize) + } + bi, err := gs.client.BucketInfo(bucket) if err != nil { return nil, fmt.Errorf("error statting bucket %q: %v", bucket, err) @@ -127,26 +134,31 @@ func (gs *Storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRe return nil } -func (gs *Storage) ReceiveBlob(br blob.Ref, source io.Reader) (blob.SizedRef, error) { - buf := &bytes.Buffer{} - size, err := io.Copy(buf, source) +func (s *Storage) ReceiveBlob(br blob.Ref, source io.Reader) (blob.SizedRef, error) { + var buf bytes.Buffer + size, err := io.Copy(&buf, source) if err != nil { return blob.SizedRef{}, err } for tries, shouldRetry := 0, true; tries < 2 && shouldRetry; tries++ { - shouldRetry, err = gs.client.PutObject( - &googlestorage.Object{Bucket: gs.bucket, Key: br.String()}, + shouldRetry, err = s.client.PutObject( + &googlestorage.Object{Bucket: s.bucket, Key: br.String()}, ioutil.NopCloser(bytes.NewReader(buf.Bytes()))) } if err != nil { return blob.SizedRef{}, err } - + if s.cache != nil { + // NoHash because it's already verified if we read it + // without errors on the io.Copy above. + blobserver.ReceiveNoHash(s.cache, br, bytes.NewReader(buf.Bytes())) + } return blob.SizedRef{Ref: br, Size: uint32(size)}, nil } func (gs *Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error { + // TODO: use cache var grp syncutil.Group gate := syncutil.NewGate(20) // arbitrary cap for i := range blobs { @@ -172,16 +184,24 @@ func (gs *Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error return grp.Err() } -func (gs *Storage) Fetch(blob blob.Ref) (file io.ReadCloser, size uint32, err error) { - file, sz, err := gs.client.GetObject(&googlestorage.Object{Bucket: gs.bucket, Key: blob.String()}) +func (s *Storage) Fetch(br blob.Ref) (rc io.ReadCloser, size uint32, err error) { + if s.cache != nil { + if rc, size, err = s.cache.Fetch(br); err == nil { + return + } + } + rc, sz, err := s.client.GetObject(&googlestorage.Object{Bucket: s.bucket, Key: br.String()}) if err != nil && sz > constants.MaxBlobSize { err = errors.New("object too big") } - return file, uint32(sz), err + return rc, uint32(sz), err } -func (gs *Storage) RemoveBlobs(blobs []blob.Ref) error { +func (s *Storage) RemoveBlobs(blobs []blob.Ref) error { + if s.cache != nil { + s.cache.RemoveBlobs(blobs) + } gate := syncutil.NewGate(50) // arbitrary var grp syncutil.Group for i := range blobs { @@ -189,7 +209,7 @@ func (gs *Storage) RemoveBlobs(blobs []blob.Ref) error { br := blobs[i] grp.Go(func() error { defer gate.Done() - return gs.client.DeleteObject(&googlestorage.Object{Bucket: gs.bucket, Key: br.String()}) + return s.client.DeleteObject(&googlestorage.Object{Bucket: s.bucket, Key: br.String()}) }) } return grp.Err()