diff --git a/lib/go/blobserver/interface.go b/lib/go/blobserver/interface.go index 5ca40009a..8f24eefe3 100644 --- a/lib/go/blobserver/interface.go +++ b/lib/go/blobserver/interface.go @@ -50,6 +50,10 @@ type BlobEnumerator interface { // EnumerateBobs sends at most limit SizedBlobRef into dest, // sorted, as long as they are lexigraphically greater than // after (if provided). + // waitSeconds is the max time to wait for any blobs to exist + // in the given partition, or 0 for no delay. + // EnumerateBlobs doesn't close the channel at the end but + // sends a nil. TODO: change this later to close? EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition Partition, after string, diff --git a/lib/go/blobserver/localdisk/enumerate.go b/lib/go/blobserver/localdisk/enumerate.go index 2b4e08c6b..9a0625840 100644 --- a/lib/go/blobserver/localdisk/enumerate.go +++ b/lib/go/blobserver/localdisk/enumerate.go @@ -23,6 +23,7 @@ import ( "os" "sort" "strings" + "time" ) type readBlobRequest struct { @@ -104,9 +105,6 @@ func readBlobs(opts readBlobRequest) os.Error { } } - if opts.pathInto == "" { - opts.ch <- nil - } return nil } @@ -126,10 +124,32 @@ func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition }) } doScan() - if err == nil && limitMutable == limit && waitSeconds > 0 { - // TODO: sleep w/ blobhub + select - doScan() - return nil + + // The not waiting case: + if err != nil || limitMutable != limit || waitSeconds == 0 { + dest <- nil + return err } + + // The case where we have to wait for waitSeconds for any blob + // to possibly appear. + hub := ds.GetBlobHub(partition) + ch := make(chan *blobref.BlobRef, 1) + hub.RegisterListener(ch) + defer hub.UnregisterListener(ch) + timer := time.NewTimer(int64(waitSeconds) * 1e9) + defer timer.Stop() + select { + case <-timer.C: + // Done waiting. + return nil + case <-ch: + // Don't actually care what it is, but _something_ + // arrived. We can just re-scan. + // TODO: might be better to just stat this one item + // so there's no race? But this is easier: + doScan() + } + dest <- nil return err } diff --git a/lib/go/blobserver/localdisk/enumerate_test.go b/lib/go/blobserver/localdisk/enumerate_test.go new file mode 100644 index 000000000..94a677e99 --- /dev/null +++ b/lib/go/blobserver/localdisk/enumerate_test.go @@ -0,0 +1,125 @@ +/* +Copyright 2011 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package localdisk + +import ( + "camli/blobref" + "camli/blobserver" + . "camli/testing" + "os" + "testing" + "time" +) + +func TestEnumerate(t *testing.T) { + ds := NewStorage(t) + defer cleanUp(ds) + + // For test simplicity foo, bar, and baz all have ascending + // sha1s and lengths. + foo := &testBlob{"foo"} // 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 + bar := &testBlob{"baar"} // b23361951dde70cb3eca44c0c674181673a129dc + baz := &testBlob{"bazzz"} // e0eb17003ce1c2812ca8f19089fff44ca32b3710 + foo.ExpectUploadBlob(t, ds) + bar.ExpectUploadBlob(t, ds) + baz.ExpectUploadBlob(t, ds) + + limit := uint(5000) + waitSeconds := 0 + ch := make(chan *blobref.SizedBlobRef) + errCh := make(chan os.Error) + go func() { + errCh <- ds.EnumerateBlobs(ch, blobserver.DefaultPartition, + "", limit, waitSeconds) + }() + + var sb *blobref.SizedBlobRef + sb = <-ch + Assert(t, sb != nil, "got 1st blob") + ExpectInt(t, 3, int(sb.Size), "1st blob size") + sb = <-ch + Assert(t, sb != nil, "got 2nd blob") + ExpectInt(t, 4, int(sb.Size), "2nd blob size") + sb = <-ch + Assert(t, sb != nil, "got 3rd blob") + ExpectInt(t, 5, int(sb.Size), "3rd blob size") + sb = <-ch + Assert(t, sb == nil, "got final nil") + ExpectNil(t, <-errCh, "EnumerateBlobs return value") + + // Now again, but skipping foo's blob + go func() { + errCh <- ds.EnumerateBlobs(ch, blobserver.DefaultPartition, + foo.BlobRef().String(), + limit, waitSeconds) + }() + sb = <-ch + Assert(t, sb != nil, "got 1st blob, skipping foo") + ExpectInt(t, 4, int(sb.Size), "blob size") + sb = <-ch + Assert(t, sb != nil, "got 2nd blob, skipping foo") + ExpectInt(t, 5, int(sb.Size), "blob size") + sb = <-ch + Assert(t, sb == nil, "got final nil") + ExpectNil(t, <-errCh, "EnumerateBlobs return value") +} + +func TestEnumerateEmpty(t *testing.T) { + ds := NewStorage(t) + defer cleanUp(ds) + + limit := uint(5000) + waitSeconds := 0 + ch := make(chan *blobref.SizedBlobRef) + errCh := make(chan os.Error) + go func() { + errCh <- ds.EnumerateBlobs(ch, blobserver.DefaultPartition, + "", limit, waitSeconds) + }() + + Expect(t, (<-ch) == nil, "no first blob") + ExpectNil(t, <-errCh, "EnumerateBlobs return value") +} + +func TestEnumerateEmptyLongPoll(t *testing.T) { + ds := NewStorage(t) + defer cleanUp(ds) + + limit := uint(5000) + waitSeconds := 1 + ch := make(chan *blobref.SizedBlobRef) + errCh := make(chan os.Error) + go func() { + errCh <- ds.EnumerateBlobs(ch, blobserver.DefaultPartition, + "", limit, waitSeconds) + }() + + foo := &testBlob{"foo"} // 0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33 + go func() { + time.Sleep(100e6) // 100 ms + foo.ExpectUploadBlob(t, ds) + }() + + sb := <-ch + Assert(t, sb != nil, "got a blob") + ExpectInt(t, 3, int(sb.Size), "blob size") + ExpectString(t, "sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", sb.BlobRef.String(), "got the right blob") + + Expect(t, (<-ch) == nil, "only one blob returned") + ExpectNil(t, <-errCh, "EnumerateBlobs return value") +} + diff --git a/lib/go/testing/testing.go b/lib/go/testing/testing.go index d120c8514..1607c9702 100644 --- a/lib/go/testing/testing.go +++ b/lib/go/testing/testing.go @@ -26,30 +26,42 @@ func Expect(t *testing.T, got bool, what string) { } } -func ExpectBool(t *testing.T, expect, got bool, what string) { - if expect != got { - t.Errorf("%s: got %v; expected %v", what, got, expect) - } -} - -func ExpectInt(t *testing.T, expect, got int, what string) { - if expect != got { - t.Errorf("%s: got %d; expected %d", what, got, expect) - } -} - func Assert(t *testing.T, got bool, what string) { if !got { t.Fatalf("%s: got %v; expected %v", what, got, true) } } +func ExpectString(t *testing.T, expect, got string, what string) { + if expect != got { + t.Errorf("%s: got %v; expected %v", what, got, expect) + } +} + +func AssertString(t *testing.T, expect, got string, what string) { + if expect != got { + t.Fatalf("%s: got %v; expected %v", what, got, expect) + } +} + +func ExpectBool(t *testing.T, expect, got bool, what string) { + if expect != got { + t.Errorf("%s: got %v; expected %v", what, got, expect) + } +} + func AssertBool(t *testing.T, expect, got bool, what string) { if expect != got { t.Fatalf("%s: got %v; expected %v", what, got, expect) } } +func ExpectInt(t *testing.T, expect, got int, what string) { + if expect != got { + t.Errorf("%s: got %d; expected %d", what, got, expect) + } +} + func AssertInt(t *testing.T, expect, got int, what string) { if expect != got { t.Fatalf("%s: got %d; expected %d", what, got, expect)