2011-03-24 23:47:51 +00:00
|
|
|
/*
|
|
|
|
Copyright 2011 Google Inc.
|
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
|
|
|
package test
|
|
|
|
|
|
|
|
import (
|
2012-12-25 20:17:45 +00:00
|
|
|
"fmt"
|
2011-05-06 04:45:24 +00:00
|
|
|
"io"
|
2012-12-25 20:17:45 +00:00
|
|
|
"io/ioutil"
|
2011-03-24 23:47:51 +00:00
|
|
|
"os"
|
2013-07-07 20:02:42 +00:00
|
|
|
"sort"
|
2012-12-26 01:27:52 +00:00
|
|
|
"strings"
|
2011-03-24 23:47:51 +00:00
|
|
|
"sync"
|
2011-05-06 04:45:24 +00:00
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
"camlistore.org/pkg/blob"
|
2013-07-07 20:02:42 +00:00
|
|
|
"camlistore.org/pkg/blobserver"
|
2013-12-02 21:20:51 +00:00
|
|
|
"camlistore.org/pkg/context"
|
2013-07-29 03:08:55 +00:00
|
|
|
"camlistore.org/pkg/types"
|
2011-03-24 23:47:51 +00:00
|
|
|
)
|
|
|
|
|
2013-07-07 20:02:42 +00:00
|
|
|
// Fetcher is an in-memory implementation of the blobserver Storage
|
|
|
|
// interface. It started as just a fetcher and grew. It also includes
|
|
|
|
// other convenience methods for testing.
|
2011-03-24 23:47:51 +00:00
|
|
|
type Fetcher struct {
|
2013-07-07 20:02:42 +00:00
|
|
|
l sync.Mutex
|
|
|
|
m map[string]*Blob // keyed by blobref string
|
|
|
|
sorted []string // blobrefs sorted
|
2013-09-22 12:10:16 +00:00
|
|
|
|
|
|
|
// FetchErr, if non-nil, specifies the error to return on the next fetch call.
|
|
|
|
// If it returns nil, fetches proceed as normal.
|
|
|
|
FetchErr func() error
|
2011-03-24 23:47:51 +00:00
|
|
|
}
|
|
|
|
|
2013-07-07 20:02:42 +00:00
|
|
|
var _ blobserver.Storage = (*Fetcher)(nil)
|
|
|
|
|
2011-03-24 23:47:51 +00:00
|
|
|
func (tf *Fetcher) AddBlob(b *Blob) {
|
|
|
|
tf.l.Lock()
|
|
|
|
defer tf.l.Unlock()
|
|
|
|
if tf.m == nil {
|
|
|
|
tf.m = make(map[string]*Blob)
|
|
|
|
}
|
2013-07-07 20:02:42 +00:00
|
|
|
key := b.BlobRef().String()
|
|
|
|
tf.m[key] = b
|
|
|
|
tf.sorted = append(tf.sorted, key)
|
|
|
|
sort.Strings(tf.sorted)
|
2011-03-24 23:47:51 +00:00
|
|
|
}
|
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
func (tf *Fetcher) FetchStreaming(ref blob.Ref) (file io.ReadCloser, size int64, err error) {
|
2011-05-06 04:45:24 +00:00
|
|
|
return tf.Fetch(ref)
|
|
|
|
}
|
|
|
|
|
2012-12-26 01:27:52 +00:00
|
|
|
var dummyCloser = ioutil.NopCloser(nil)
|
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
func (tf *Fetcher) Fetch(ref blob.Ref) (file types.ReadSeekCloser, size int64, err error) {
|
2013-09-22 12:10:16 +00:00
|
|
|
if tf.FetchErr != nil {
|
|
|
|
if err = tf.FetchErr(); err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
2011-03-24 23:47:51 +00:00
|
|
|
tf.l.Lock()
|
|
|
|
defer tf.l.Unlock()
|
|
|
|
if tf.m == nil {
|
2012-02-20 12:32:46 +00:00
|
|
|
err = os.ErrNotExist
|
2011-03-24 23:47:51 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
tb, ok := tf.m[ref.String()]
|
|
|
|
if !ok {
|
2012-02-20 12:32:46 +00:00
|
|
|
err = os.ErrNotExist
|
2011-03-24 23:47:51 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
size = int64(len(tb.Contents))
|
2012-12-26 01:27:52 +00:00
|
|
|
return struct {
|
|
|
|
*io.SectionReader
|
|
|
|
io.Closer
|
|
|
|
}{
|
|
|
|
io.NewSectionReader(strings.NewReader(tb.Contents), 0, size),
|
|
|
|
dummyCloser,
|
|
|
|
}, size, nil
|
2011-03-24 23:47:51 +00:00
|
|
|
}
|
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
func (tf *Fetcher) BlobContents(br blob.Ref) (contents string, ok bool) {
|
2012-12-25 20:17:45 +00:00
|
|
|
tf.l.Lock()
|
2013-07-07 20:02:42 +00:00
|
|
|
defer tf.l.Unlock()
|
2012-12-25 20:17:45 +00:00
|
|
|
b, ok := tf.m[br.String()]
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
return b.Contents, true
|
|
|
|
}
|
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
func (tf *Fetcher) ReceiveBlob(br blob.Ref, source io.Reader) (blob.SizedRef, error) {
|
|
|
|
sb := blob.SizedRef{}
|
2012-12-25 20:17:45 +00:00
|
|
|
h := br.Hash()
|
|
|
|
if h == nil {
|
|
|
|
return sb, fmt.Errorf("Unsupported blobref hash for %s", br)
|
|
|
|
}
|
|
|
|
all, err := ioutil.ReadAll(io.TeeReader(source, h))
|
|
|
|
if err != nil {
|
|
|
|
return sb, err
|
|
|
|
}
|
|
|
|
if !br.HashMatches(h) {
|
2013-08-21 20:57:28 +00:00
|
|
|
// This is a somewhat redundant check, since
|
2013-09-22 12:10:16 +00:00
|
|
|
// blobserver.Receive now does it. But for testing code,
|
|
|
|
// it's worth the cost.
|
2012-12-25 20:17:45 +00:00
|
|
|
return sb, fmt.Errorf("Hash mismatch receiving blob %s", br)
|
|
|
|
}
|
2013-08-04 02:54:30 +00:00
|
|
|
b := &Blob{Contents: string(all)}
|
|
|
|
tf.AddBlob(b)
|
|
|
|
return blob.SizedRef{br, int64(len(all))}, nil
|
2012-12-25 20:17:45 +00:00
|
|
|
}
|
|
|
|
|
2013-08-21 20:57:28 +00:00
|
|
|
func (tf *Fetcher) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
|
2012-12-25 20:17:45 +00:00
|
|
|
for _, br := range blobs {
|
|
|
|
tf.l.Lock()
|
|
|
|
b, ok := tf.m[br.String()]
|
|
|
|
tf.l.Unlock()
|
|
|
|
if ok {
|
2013-08-04 02:54:30 +00:00
|
|
|
dest <- blob.SizedRef{br, int64(len(b.Contents))}
|
2012-12-25 20:17:45 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
2013-07-07 20:02:42 +00:00
|
|
|
|
|
|
|
// BlobrefStrings returns the sorted stringified blobrefs stored in this fetcher.
|
|
|
|
func (tf *Fetcher) BlobrefStrings() []string {
|
|
|
|
tf.l.Lock()
|
|
|
|
defer tf.l.Unlock()
|
|
|
|
s := make([]string, len(tf.sorted))
|
|
|
|
copy(s, tf.sorted)
|
|
|
|
return s
|
|
|
|
}
|
|
|
|
|
2013-12-02 21:20:51 +00:00
|
|
|
func (tf *Fetcher) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
|
2013-07-07 20:02:42 +00:00
|
|
|
defer close(dest)
|
|
|
|
tf.l.Lock()
|
|
|
|
defer tf.l.Unlock()
|
|
|
|
n := 0
|
|
|
|
for _, k := range tf.sorted {
|
|
|
|
if k <= after {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
b := tf.m[k]
|
2013-12-02 21:20:51 +00:00
|
|
|
select {
|
|
|
|
case dest <- blob.SizedRef{b.BlobRef(), b.Size()}:
|
|
|
|
case <-ctx.Done():
|
|
|
|
return context.ErrCanceled
|
|
|
|
}
|
2013-07-07 20:02:42 +00:00
|
|
|
n++
|
|
|
|
if limit > 0 && n == limit {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2013-08-04 02:54:30 +00:00
|
|
|
func (tf *Fetcher) RemoveBlobs(blobs []blob.Ref) error {
|
2013-07-07 20:02:42 +00:00
|
|
|
tf.l.Lock()
|
|
|
|
defer tf.l.Unlock()
|
|
|
|
for _, br := range blobs {
|
|
|
|
delete(tf.m, br.String())
|
|
|
|
}
|
|
|
|
tf.sorted = tf.sorted[:0]
|
|
|
|
for k := range tf.m {
|
|
|
|
tf.sorted = append(tf.sorted, k)
|
|
|
|
}
|
|
|
|
sort.Strings(tf.sorted)
|
|
|
|
return nil
|
|
|
|
}
|