mirror of https://github.com/perkeep/perkeep.git
blobpacked: more streaming tests
Updates #532 Change-Id: I4cf75d81dfc842296cf4efa728e67bd0ad3e3b40
This commit is contained in:
parent
bb11d07720
commit
f493ad54f4
|
@ -456,7 +456,8 @@ func TestStreamBlobs(t *testing.T) {
|
|||
s.init()
|
||||
|
||||
all := map[blob.Ref]bool{}
|
||||
for i := 0; i < 10; i++ {
|
||||
const nBlobs = 10
|
||||
for i := 0; i < nBlobs; i++ {
|
||||
b := &test.Blob{strconv.Itoa(i)}
|
||||
b.MustUpload(t, small)
|
||||
all[b.BlobRef()] = true
|
||||
|
@ -482,6 +483,36 @@ func TestStreamBlobs(t *testing.T) {
|
|||
if !reflect.DeepEqual(got, all) {
|
||||
t.Errorf("Got blobs %v; want %v", got, all)
|
||||
}
|
||||
storagetest.TestStreamer(t, s, storagetest.WantN(nBlobs))
|
||||
}
|
||||
|
||||
func TestStreamBlobs_Loose_Enumerate(t *testing.T) {
|
||||
testStreamBlobsLoose(t, false)
|
||||
}
|
||||
|
||||
func TestStreamBlobs_Loose_Streamed(t *testing.T) {
|
||||
testStreamBlobsLoose(t, true)
|
||||
}
|
||||
|
||||
func testStreamBlobsLoose(t *testing.T, streamed bool) {
|
||||
var small blobserver.Storage = new(test.Fetcher)
|
||||
if !streamed {
|
||||
// Hide the BlobStreamer interface impl.
|
||||
small = struct{ blobserver.Storage }{small}
|
||||
|
||||
}
|
||||
s := &storage{
|
||||
small: small,
|
||||
large: new(test.Fetcher), // unused
|
||||
meta: sorted.NewMemoryKeyValue(),
|
||||
log: test.NewLogger(t, "blobpacked: "),
|
||||
}
|
||||
s.init()
|
||||
const nBlobs = 10
|
||||
for i := 0; i < nBlobs; i++ {
|
||||
(&test.Blob{strconv.Itoa(i)}).MustUpload(t, small)
|
||||
}
|
||||
storagetest.TestStreamer(t, s, storagetest.WantN(nBlobs))
|
||||
}
|
||||
|
||||
func TestForeachZipBlob(t *testing.T) {
|
||||
|
|
|
@ -63,54 +63,3 @@ func (st largeBlobStreamer) StreamBlobs(ctx *context.Context, dest chan<- blobse
|
|||
// TODO(bradfitz): implement
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO: move some ofthis old pre-NewMultiBlobStreamer code into
|
||||
// blobserver. in particular, transparently using enumerate for
|
||||
// BlobStreamer when the storage doesn't support it should be provided
|
||||
// by the blobserver package. inevitably others will want that.
|
||||
/*
|
||||
enumErrc := make(chan error, 1)
|
||||
go func() {
|
||||
defer close(sbc)
|
||||
enumErrc <- blobserver.EnumerateAllFrom(enumCtx, s.small, strings.TrimPrefix(contToken, "after:"), func(sb blob.SizedRef) error {
|
||||
select {
|
||||
case sbc <- sb:
|
||||
return nil
|
||||
case <-enumDone:
|
||||
return context.ErrCanceled
|
||||
}
|
||||
})
|
||||
}()
|
||||
var sent int64
|
||||
var lastRef blob.Ref
|
||||
for {
|
||||
sb, ok := <-sbc
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
opener := func() types.ReadSeekCloser {
|
||||
return blob.NewLazyReadSeekCloser(s.small, sb.Ref)
|
||||
}
|
||||
select {
|
||||
case dest <- blob.NewBlob(sb.Ref, sb.Size, opener):
|
||||
lastRef = sb.Ref
|
||||
sent += int64(sb.Size)
|
||||
case <-ctx.Done():
|
||||
return "", context.ErrCanceled
|
||||
}
|
||||
}
|
||||
|
||||
enumCtx.Cancel() // redundant if sbc was already closed, but harmless.
|
||||
enumErr := <-enumErrc
|
||||
if enumErr == nil {
|
||||
return "l:", nil
|
||||
}
|
||||
|
||||
// See if we didn't send anything due to enumeration errors.
|
||||
if sent == 0 {
|
||||
enumCtx.Cancel()
|
||||
return "l:", enumErr
|
||||
}
|
||||
return "s:after:" + lastRef.String(), nil
|
||||
}
|
||||
*/
|
||||
|
|
Loading…
Reference in New Issue