localdisk: fix enumeration bug, walking into partitions.

This commit is contained in:
Brad Fitzpatrick 2011-03-12 21:11:03 -08:00
parent 12e3730294
commit ca92bc3903
2 changed files with 70 additions and 1 deletions

View File

@ -61,7 +61,9 @@ func readBlobs(opts readBlobRequest) os.Error {
if *opts.remain == 0 { if *opts.remain == 0 {
return nil return nil
} }
if name == "partition" {
continue
}
fullPath := dirFullPath + "/" + name fullPath := dirFullPath + "/" + name
fi, err := os.Stat(fullPath) fi, err := os.Stat(fullPath)
if err != nil { if err != nil {

View File

@ -20,7 +20,11 @@ import (
"camli/blobref" "camli/blobref"
"camli/blobserver" "camli/blobserver"
. "camli/testing" . "camli/testing"
"fmt"
"io/ioutil"
"os" "os"
"sort"
"testing" "testing"
"time" "time"
) )
@ -124,3 +128,66 @@ func TestEnumerateEmptyLongPoll(t *testing.T) {
ExpectNil(t, <-errCh, "EnumerateBlobs return value") ExpectNil(t, <-errCh, "EnumerateBlobs return value")
} }
type SortedSizedBlobs []*blobref.SizedBlobRef
func (sb SortedSizedBlobs) Len() int {
return len(sb)
}
func (sb SortedSizedBlobs) Less(i, j int) bool {
return sb[i].BlobRef.String() < sb[j].BlobRef.String()
}
func (sb SortedSizedBlobs) Swap(i, j int) {
panic("not needed")
}
func TestEnumerateIsSorted(t *testing.T) {
ds := NewStorage(t)
defer cleanUp(ds)
const blobsToMake = 250
t.Logf("Uploading test blobs...")
for i := 0; i < blobsToMake; i++ {
blob := &testBlob{fmt.Sprintf("blob-%d", i)}
blob.ExpectUploadBlob(t, ds)
}
// Make some fake blobs in other partitions to confuse the
// enumerate code.
fakeDir := ds.root + "/partition/queue-indexer/sha1/1f0/710"
ExpectNil(t, os.MkdirAll(fakeDir, 0755), "creating fakeDir")
ExpectNil(t, ioutil.WriteFile(fakeDir + "/sha1-1f07105465650aa243cfc1b1bbb1c68ea95c6812.dat",
[]byte("fake file"), 0644), "writing fake blob")
var tests = []struct { limit int; after string; }{
{ 200, "" },
{ blobsToMake, "" },
{ 200, "sha1-2" },
{ 200, "sha1-3" },
{ 200, "sha1-4" },
{ 200, "sha1-5" },
{ 200, "sha1-e" },
{ 200, "sha1-f" },
{ 200, "sha1-ff" },
}
for _, test := range tests {
limit := uint(test.limit)
ch := make(chan *blobref.SizedBlobRef)
errCh := make(chan os.Error)
go func() {
errCh <- ds.EnumerateBlobs(ch, defaultPartition, test.after, limit, 0)
}()
var got = make([]*blobref.SizedBlobRef, 0, blobsToMake)
for {
sb := <-ch
if sb == nil {
break
}
got = append(got, sb)
}
if !sort.IsSorted(SortedSizedBlobs(got)) {
t.Errorf("expected sorted; offset=%q, limit=%d", test.after, limit)
}
}
}