enumerate: tests & long poll support

This commit is contained in:
Brad Fitzpatrick 2011-02-27 17:42:20 -08:00
parent 2e2fea784b
commit b3f85acd47
4 changed files with 180 additions and 19 deletions

View File

@ -50,6 +50,10 @@ type BlobEnumerator interface {
// EnumerateBobs sends at most limit SizedBlobRef into dest,
// sorted, as long as they are lexigraphically greater than
// after (if provided).
// waitSeconds is the max time to wait for any blobs to exist
// in the given partition, or 0 for no delay.
// EnumerateBlobs doesn't close the channel at the end but
// sends a nil. TODO: change this later to close?
EnumerateBlobs(dest chan *blobref.SizedBlobRef,
partition Partition,
after string,

View File

@ -23,6 +23,7 @@ import (
"os"
"sort"
"strings"
"time"
)
type readBlobRequest struct {
@ -104,9 +105,6 @@ func readBlobs(opts readBlobRequest) os.Error {
}
}
if opts.pathInto == "" {
opts.ch <- nil
}
return nil
}
@ -126,10 +124,32 @@ func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition
})
}
doScan()
if err == nil && limitMutable == limit && waitSeconds > 0 {
// TODO: sleep w/ blobhub + select
doScan()
return nil
// The not waiting case:
if err != nil || limitMutable != limit || waitSeconds == 0 {
dest <- nil
return err
}
// The case where we have to wait for waitSeconds for any blob
// to possibly appear.
hub := ds.GetBlobHub(partition)
ch := make(chan *blobref.BlobRef, 1)
hub.RegisterListener(ch)
defer hub.UnregisterListener(ch)
timer := time.NewTimer(int64(waitSeconds) * 1e9)
defer timer.Stop()
select {
case <-timer.C:
// Done waiting.
return nil
case <-ch:
// Don't actually care what it is, but _something_
// arrived. We can just re-scan.
// TODO: might be better to just stat this one item
// so there's no race? But this is easier:
doScan()
}
dest <- nil
return err
}

View File

@ -0,0 +1,125 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package localdisk
import (
"camli/blobref"
"camli/blobserver"
. "camli/testing"
"os"
"testing"
"time"
)
func TestEnumerate(t *testing.T) {
ds := NewStorage(t)
defer cleanUp(ds)
// For test simplicity foo, bar, and baz all have ascending
// sha1s and lengths.
foo := &testBlob{"foo"} // 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33
bar := &testBlob{"baar"} // b23361951dde70cb3eca44c0c674181673a129dc
baz := &testBlob{"bazzz"} // e0eb17003ce1c2812ca8f19089fff44ca32b3710
foo.ExpectUploadBlob(t, ds)
bar.ExpectUploadBlob(t, ds)
baz.ExpectUploadBlob(t, ds)
limit := uint(5000)
waitSeconds := 0
ch := make(chan *blobref.SizedBlobRef)
errCh := make(chan os.Error)
go func() {
errCh <- ds.EnumerateBlobs(ch, blobserver.DefaultPartition,
"", limit, waitSeconds)
}()
var sb *blobref.SizedBlobRef
sb = <-ch
Assert(t, sb != nil, "got 1st blob")
ExpectInt(t, 3, int(sb.Size), "1st blob size")
sb = <-ch
Assert(t, sb != nil, "got 2nd blob")
ExpectInt(t, 4, int(sb.Size), "2nd blob size")
sb = <-ch
Assert(t, sb != nil, "got 3rd blob")
ExpectInt(t, 5, int(sb.Size), "3rd blob size")
sb = <-ch
Assert(t, sb == nil, "got final nil")
ExpectNil(t, <-errCh, "EnumerateBlobs return value")
// Now again, but skipping foo's blob
go func() {
errCh <- ds.EnumerateBlobs(ch, blobserver.DefaultPartition,
foo.BlobRef().String(),
limit, waitSeconds)
}()
sb = <-ch
Assert(t, sb != nil, "got 1st blob, skipping foo")
ExpectInt(t, 4, int(sb.Size), "blob size")
sb = <-ch
Assert(t, sb != nil, "got 2nd blob, skipping foo")
ExpectInt(t, 5, int(sb.Size), "blob size")
sb = <-ch
Assert(t, sb == nil, "got final nil")
ExpectNil(t, <-errCh, "EnumerateBlobs return value")
}
func TestEnumerateEmpty(t *testing.T) {
ds := NewStorage(t)
defer cleanUp(ds)
limit := uint(5000)
waitSeconds := 0
ch := make(chan *blobref.SizedBlobRef)
errCh := make(chan os.Error)
go func() {
errCh <- ds.EnumerateBlobs(ch, blobserver.DefaultPartition,
"", limit, waitSeconds)
}()
Expect(t, (<-ch) == nil, "no first blob")
ExpectNil(t, <-errCh, "EnumerateBlobs return value")
}
func TestEnumerateEmptyLongPoll(t *testing.T) {
ds := NewStorage(t)
defer cleanUp(ds)
limit := uint(5000)
waitSeconds := 1
ch := make(chan *blobref.SizedBlobRef)
errCh := make(chan os.Error)
go func() {
errCh <- ds.EnumerateBlobs(ch, blobserver.DefaultPartition,
"", limit, waitSeconds)
}()
foo := &testBlob{"foo"} // 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33
go func() {
time.Sleep(100e6) // 100 ms
foo.ExpectUploadBlob(t, ds)
}()
sb := <-ch
Assert(t, sb != nil, "got a blob")
ExpectInt(t, 3, int(sb.Size), "blob size")
ExpectString(t, "sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", sb.BlobRef.String(), "got the right blob")
Expect(t, (<-ch) == nil, "only one blob returned")
ExpectNil(t, <-errCh, "EnumerateBlobs return value")
}

View File

@ -26,30 +26,42 @@ func Expect(t *testing.T, got bool, what string) {
}
}
func ExpectBool(t *testing.T, expect, got bool, what string) {
if expect != got {
t.Errorf("%s: got %v; expected %v", what, got, expect)
}
}
func ExpectInt(t *testing.T, expect, got int, what string) {
if expect != got {
t.Errorf("%s: got %d; expected %d", what, got, expect)
}
}
func Assert(t *testing.T, got bool, what string) {
if !got {
t.Fatalf("%s: got %v; expected %v", what, got, true)
}
}
func ExpectString(t *testing.T, expect, got string, what string) {
if expect != got {
t.Errorf("%s: got %v; expected %v", what, got, expect)
}
}
func AssertString(t *testing.T, expect, got string, what string) {
if expect != got {
t.Fatalf("%s: got %v; expected %v", what, got, expect)
}
}
func ExpectBool(t *testing.T, expect, got bool, what string) {
if expect != got {
t.Errorf("%s: got %v; expected %v", what, got, expect)
}
}
func AssertBool(t *testing.T, expect, got bool, what string) {
if expect != got {
t.Fatalf("%s: got %v; expected %v", what, got, expect)
}
}
func ExpectInt(t *testing.T, expect, got int, what string) {
if expect != got {
t.Errorf("%s: got %d; expected %d", what, got, expect)
}
}
func AssertInt(t *testing.T, expect, got int, what string) {
if expect != got {
t.Fatalf("%s: got %d; expected %d", what, got, expect)