mirror of https://github.com/perkeep/perkeep.git
blobserver/s3: add optional in-memory Fetch cache
Change-Id: I77e27555d28002ba01f1501e8a77eb4adbf701fe
This commit is contained in:
parent
c0a8aafbe0
commit
53be9298a5
4
TODO
4
TODO
|
@ -4,10 +4,6 @@ There are two TODO lists. This file (good for airplanes) and the online bug trac
|
|||
|
||||
Offline list:
|
||||
|
||||
-- put a memory.Storage cache in blobserver/s3, like blobserver/google/cloudstorage.
|
||||
(for blobpacked being fast to re-GET a recent blob after an upload of a file,
|
||||
without doing actual HTTP GETs to get the small blobs back)
|
||||
|
||||
-- reindexing:
|
||||
* review blob streaming interface & diskpacked implementation thereof.
|
||||
notably: should the continuation token come per-blob? then we could
|
||||
|
|
|
@ -26,6 +26,11 @@ func (sto *s3Storage) Fetch(blob blob.Ref) (file io.ReadCloser, size uint32, err
|
|||
if faultGet.FailErr(&err) {
|
||||
return
|
||||
}
|
||||
if sto.cache != nil {
|
||||
if file, size, err = sto.cache.Fetch(blob); err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
file, sz, err := sto.s3Client.Get(sto.bucket, blob.String())
|
||||
return file, uint32(sz), err
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"io"
|
||||
|
||||
"camlistore.org/pkg/blob"
|
||||
"camlistore.org/pkg/blobserver"
|
||||
)
|
||||
|
||||
func (sto *s3Storage) ReceiveBlob(b blob.Ref, source io.Reader) (sr blob.SizedRef, err error) {
|
||||
|
@ -41,5 +42,10 @@ func (sto *s3Storage) ReceiveBlob(b blob.Ref, source io.Reader) (sr blob.SizedRe
|
|||
if err != nil {
|
||||
return sr, err
|
||||
}
|
||||
if sto.cache != nil {
|
||||
// NoHash because it's already verified if we read it
|
||||
// without errors on the io.Copy above.
|
||||
blobserver.ReceiveNoHash(sto.cache, b, bytes.NewReader(buf.Bytes()))
|
||||
}
|
||||
return blob.SizedRef{Ref: b, Size: uint32(size)}, nil
|
||||
}
|
||||
|
|
|
@ -24,6 +24,9 @@ import (
|
|||
var removeGate = syncutil.NewGate(20) // arbitrary
|
||||
|
||||
func (sto *s3Storage) RemoveBlobs(blobs []blob.Ref) error {
|
||||
if sto.cache != nil {
|
||||
sto.cache.RemoveBlobs(blobs)
|
||||
}
|
||||
var wg syncutil.Group
|
||||
|
||||
for _, blob := range blobs {
|
||||
|
|
|
@ -39,6 +39,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"camlistore.org/pkg/blobserver"
|
||||
"camlistore.org/pkg/blobserver/memory"
|
||||
"camlistore.org/pkg/fault"
|
||||
"camlistore.org/pkg/jsonconfig"
|
||||
"camlistore.org/pkg/misc/amazon/s3"
|
||||
|
@ -55,6 +56,7 @@ type s3Storage struct {
|
|||
s3Client *s3.Client
|
||||
bucket string
|
||||
hostname string
|
||||
cache *memory.Storage // or nil for no cache
|
||||
}
|
||||
|
||||
func (s *s3Storage) String() string {
|
||||
|
@ -63,6 +65,7 @@ func (s *s3Storage) String() string {
|
|||
|
||||
func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
|
||||
hostname := config.OptionalString("hostname", "s3.amazonaws.com")
|
||||
cacheSize := config.OptionalInt64("cacheSize", 32<<20)
|
||||
client := &s3.Client{
|
||||
Auth: &s3.Auth{
|
||||
AccessKey: config.RequiredString("aws_access_key"),
|
||||
|
@ -79,6 +82,9 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
|
|||
if err := config.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cacheSize != 0 {
|
||||
sto.cache = memory.NewCache(cacheSize)
|
||||
}
|
||||
if !skipStartupCheck {
|
||||
_, err := client.ListBucket(sto.bucket, "", 1)
|
||||
if serr, ok := err.(*s3.Error); ok {
|
||||
|
|
|
@ -30,6 +30,7 @@ func (sto *s3Storage) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) (er
|
|||
if faultStat.FailErr(&err) {
|
||||
return
|
||||
}
|
||||
// TODO: use sto.cache
|
||||
var wg syncutil.Group
|
||||
for _, br := range blobs {
|
||||
br := br
|
||||
|
|
Loading…
Reference in New Issue