blobserver/google/cloudstorage: add optional in-memory Fetch cache

So blobpacked can get blobs quickly without HTTP GET requests when it needs
to do all the small blob fetches when writing the zip file.

Change-Id: Ibbab0483920f2ef3b70c60eeebcd0056a8e4364f
This commit is contained in:
Brad Fitzpatrick 2014-09-27 16:53:34 -07:00
parent f99a7a6fa9
commit 85ef3b89e9
2 changed files with 37 additions and 13 deletions

4
TODO
View File

@ -4,6 +4,10 @@ There are two TODO lists. This file (good for airplanes) and the online bug trac
Offline list:
-- put a memory.Storage cache in blobserver/s3, like blobserver/google/cloudstorage.
(for blobpacked being fast to re-GET a recent blob after an upload of a file,
without doing actual HTTP GETs to get the small blobs back)
-- reindexing:
* review blob streaming interface & diskpacked implementation thereof.
notably: should the continuation token come per-blob? then we could

View File

@ -31,6 +31,7 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/memory"
"camlistore.org/pkg/constants"
"camlistore.org/pkg/context"
"camlistore.org/pkg/googlestorage"
@ -41,6 +42,7 @@ import (
type Storage struct {
bucket string // the gs bucket containing blobs
client *googlestorage.Client
cache *memory.Storage // or nil for no cache
// For blobserver.Generationer:
genTime time.Time
@ -61,8 +63,9 @@ func (gs *Storage) ResetStorageGeneration() error { return errors.New("not suppo
func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
var (
auth = config.RequiredObject("auth")
bucket = config.RequiredString("bucket")
auth = config.RequiredObject("auth")
bucket = config.RequiredString("bucket")
cacheSize = config.OptionalInt64("cacheSize", 32<<20)
clientID = auth.RequiredString("client_id") // or "auto" for service accounts
clientSecret = auth.OptionalString("client_secret", "")
@ -94,6 +97,10 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
clientID, clientSecret, refreshToken))
}
if cacheSize != 0 {
gs.cache = memory.NewCache(cacheSize)
}
bi, err := gs.client.BucketInfo(bucket)
if err != nil {
return nil, fmt.Errorf("error statting bucket %q: %v", bucket, err)
@ -127,26 +134,31 @@ func (gs *Storage) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRe
return nil
}
func (gs *Storage) ReceiveBlob(br blob.Ref, source io.Reader) (blob.SizedRef, error) {
buf := &bytes.Buffer{}
size, err := io.Copy(buf, source)
func (s *Storage) ReceiveBlob(br blob.Ref, source io.Reader) (blob.SizedRef, error) {
var buf bytes.Buffer
size, err := io.Copy(&buf, source)
if err != nil {
return blob.SizedRef{}, err
}
for tries, shouldRetry := 0, true; tries < 2 && shouldRetry; tries++ {
shouldRetry, err = gs.client.PutObject(
&googlestorage.Object{Bucket: gs.bucket, Key: br.String()},
shouldRetry, err = s.client.PutObject(
&googlestorage.Object{Bucket: s.bucket, Key: br.String()},
ioutil.NopCloser(bytes.NewReader(buf.Bytes())))
}
if err != nil {
return blob.SizedRef{}, err
}
if s.cache != nil {
// NoHash because it's already verified if we read it
// without errors on the io.Copy above.
blobserver.ReceiveNoHash(s.cache, br, bytes.NewReader(buf.Bytes()))
}
return blob.SizedRef{Ref: br, Size: uint32(size)}, nil
}
func (gs *Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
// TODO: use cache
var grp syncutil.Group
gate := syncutil.NewGate(20) // arbitrary cap
for i := range blobs {
@ -172,16 +184,24 @@ func (gs *Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error
return grp.Err()
}
func (gs *Storage) Fetch(blob blob.Ref) (file io.ReadCloser, size uint32, err error) {
file, sz, err := gs.client.GetObject(&googlestorage.Object{Bucket: gs.bucket, Key: blob.String()})
func (s *Storage) Fetch(br blob.Ref) (rc io.ReadCloser, size uint32, err error) {
if s.cache != nil {
if rc, size, err = s.cache.Fetch(br); err == nil {
return
}
}
rc, sz, err := s.client.GetObject(&googlestorage.Object{Bucket: s.bucket, Key: br.String()})
if err != nil && sz > constants.MaxBlobSize {
err = errors.New("object too big")
}
return file, uint32(sz), err
return rc, uint32(sz), err
}
func (gs *Storage) RemoveBlobs(blobs []blob.Ref) error {
func (s *Storage) RemoveBlobs(blobs []blob.Ref) error {
if s.cache != nil {
s.cache.RemoveBlobs(blobs)
}
gate := syncutil.NewGate(50) // arbitrary
var grp syncutil.Group
for i := range blobs {
@ -189,7 +209,7 @@ func (gs *Storage) RemoveBlobs(blobs []blob.Ref) error {
br := blobs[i]
grp.Go(func() error {
defer gate.Done()
return gs.client.DeleteObject(&googlestorage.Object{Bucket: gs.bucket, Key: br.String()})
return s.client.DeleteObject(&googlestorage.Object{Bucket: s.bucket, Key: br.String()})
})
}
return grp.Err()