mirror of https://github.com/perkeep/perkeep.git
Merge "pkg/blobserver/s3: Replace s3-specific memory cache with a proxycache."
This commit is contained in:
commit
e64ed803e0
|
@ -27,11 +27,6 @@ func (sto *s3Storage) Fetch(ctx context.Context, blob blob.Ref) (file io.ReadClo
|
|||
if faultGet.FailErr(&err) {
|
||||
return
|
||||
}
|
||||
if sto.cache != nil {
|
||||
if file, size, err = sto.cache.Fetch(ctx, blob); err == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
file, sz, err := sto.s3Client.Get(ctx, sto.bucket, sto.dirPrefix+blob.String())
|
||||
return file, uint32(sz), err
|
||||
}
|
||||
|
|
|
@ -23,7 +23,6 @@ import (
|
|||
"io"
|
||||
|
||||
"perkeep.org/pkg/blob"
|
||||
"perkeep.org/pkg/blobserver"
|
||||
)
|
||||
|
||||
func (sto *s3Storage) ReceiveBlob(ctx context.Context, b blob.Ref, source io.Reader) (sr blob.SizedRef, err error) {
|
||||
|
@ -43,10 +42,5 @@ func (sto *s3Storage) ReceiveBlob(ctx context.Context, b blob.Ref, source io.Rea
|
|||
if err != nil {
|
||||
return sr, err
|
||||
}
|
||||
if sto.cache != nil {
|
||||
// NoHash because it's already verified if we read it
|
||||
// without errors on the io.Copy above.
|
||||
blobserver.ReceiveNoHash(ctx, sto.cache, b, bytes.NewReader(buf.Bytes()))
|
||||
}
|
||||
return blob.SizedRef{Ref: b, Size: uint32(size)}, nil
|
||||
}
|
||||
|
|
|
@ -27,9 +27,6 @@ import (
|
|||
var removeGate = syncutil.NewGate(20) // arbitrary
|
||||
|
||||
func (sto *s3Storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
|
||||
if sto.cache != nil {
|
||||
sto.cache.RemoveBlobs(ctx, blobs)
|
||||
}
|
||||
var wg syncutil.Group
|
||||
|
||||
for _, blob := range blobs {
|
||||
|
|
|
@ -43,6 +43,7 @@ import (
|
|||
"perkeep.org/pkg/blob"
|
||||
"perkeep.org/pkg/blobserver"
|
||||
"perkeep.org/pkg/blobserver/memory"
|
||||
"perkeep.org/pkg/blobserver/proxycache"
|
||||
|
||||
"go4.org/fault"
|
||||
"go4.org/jsonconfig"
|
||||
|
@ -73,7 +74,6 @@ type s3Storage struct {
|
|||
// slash.
|
||||
dirPrefix string
|
||||
hostname string
|
||||
cache *memory.Storage // or nil for no cache
|
||||
}
|
||||
|
||||
func (s *s3Storage) String() string {
|
||||
|
@ -113,9 +113,7 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
|
|||
if err := config.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if cacheSize != 0 {
|
||||
sto.cache = memory.NewCache(cacheSize)
|
||||
}
|
||||
|
||||
ctx := context.Background() // TODO: 5 min timeout or something?
|
||||
if !skipStartupCheck {
|
||||
_, err := client.ListBucket(ctx, sto.bucket, "", 1)
|
||||
|
@ -154,6 +152,13 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
|
|||
return nil, fmt.Errorf("Error listing bucket %s: %v", sto.bucket, err)
|
||||
}
|
||||
}
|
||||
|
||||
if cacheSize != 0 {
|
||||
// This has two layers of LRU caching (proxycache and memory).
|
||||
// We make the outer one 4x the size so that it doesn't evict from the
|
||||
// underlying one when it's about to perform its own eviction.
|
||||
return proxycache.New(cacheSize<<2, memory.NewCache(cacheSize), sto), nil
|
||||
}
|
||||
return sto, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -33,7 +33,6 @@ func (sto *s3Storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(b
|
|||
if faultStat.FailErr(&err) {
|
||||
return
|
||||
}
|
||||
// TODO: use sto.cache
|
||||
return blobserver.StatBlobsParallelHelper(ctx, blobs, fn, statGate, func(br blob.Ref) (sb blob.SizedRef, err error) {
|
||||
size, err := sto.s3Client.Stat(ctx, sto.dirPrefix+br.String(), sto.bucket)
|
||||
if err == nil {
|
||||
|
|
Loading…
Reference in New Issue