diff --git a/pkg/blobserver/google/drive/service/service.go b/pkg/blobserver/google/drive/service/service.go index 8cbd5ca70..2ea062ecc 100644 --- a/pkg/blobserver/google/drive/service/service.go +++ b/pkg/blobserver/google/drive/service/service.go @@ -24,6 +24,7 @@ import ( "io" "math" "net/http" + "os" "camlistore.org/third_party/code.google.com/p/goauth2/oauth" client "camlistore.org/third_party/code.google.com/p/google-api-go-client/drive/v2" @@ -42,28 +43,38 @@ type DriveService struct { parentId string } -// New initiates a new DriveService. +// New initiates a new DriveService. parentId is the ID of the directory +// that will be used as the current directory in methods on the returned +// DriveService (such as Get). If empty, it defaults to the root of the +// drive. func New(transport *oauth.Transport, parentId string) (*DriveService, error) { apiservice, err := client.New(transport.Client()) if err != nil { return nil, err } + if parentId == "" { + // because "root" is known as a special alias for the root directory in drive. + parentId = "root" + } service := &DriveService{transport: transport, apiservice: apiservice, parentId: parentId} return service, err } -// Get retrieves a file with its title +// Get retrieves a file with its title equal to the provided id and a child of +// the parentId as given to New. If not found, os.ErrNotExist is returned. func (s *DriveService) Get(id string) (*client.File, error) { req := s.apiservice.Files.List() // TODO: use field selectors query := fmt.Sprintf("'%s' in parents and title = '%s'", s.parentId, id) req.Q(query) files, err := req.Do() - - if err != nil || len(files.Items) < 1 { + if err != nil { return nil, err } - return files.Items[0], err + if len(files.Items) < 1 { + return nil, os.ErrNotExist + } + return files.Items[0], nil } // Lists the folder identified by parentId. @@ -104,14 +115,25 @@ func (s *DriveService) Upsert(id string, data io.Reader) (file *client.File, err return s.apiservice.Files.Update(file.Id, file).Media(data).Do() } +var errNoDownload = errors.New("file can not be downloaded directly (conversion needed?)") + // Fetch retrieves the metadata and contents of a file. func (s *DriveService) Fetch(id string) (body io.ReadCloser, size uint32, err error) { file, err := s.Get(id) - + if err != nil { + return + } // TODO: maybe in the case of no download link, remove the file. // The file should have malformed or converted to a Docs file // unwantedly. - if err != nil || file == nil || file.DownloadUrl != "" { + // TODO(mpl): I do not think the above comment is accurate. It + // looks like at least one case we do not get a DownloadUrl is when + // the UI would make you pick a conversion format first (spreadsheet, + // doc, etc). -> we should see if the API offers the possibility to do + // that conversion. and we could pass the type(s) we want (pdf, xls, doc...) + // as arguments (in an options struct) to Fetch. + if file.DownloadUrl == "" { + err = errNoDownload return } diff --git a/pkg/index/corpus.go b/pkg/index/corpus.go index a5476b9b9..779dbd81b 100644 --- a/pkg/index/corpus.go +++ b/pkg/index/corpus.go @@ -104,6 +104,9 @@ type Corpus struct { mediaTags map[blob.Ref]map[string]string // wholeref -> "album" -> "foo" + permanodesByTime *lazySortedPermanodes // cache of permanodes sorted by creation time. + permanodesByModtime *lazySortedPermanodes // cache of permanodes sorted by modtime. + // scratch string slice ss []string } @@ -146,7 +149,7 @@ type PermanodeMeta struct { } func newCorpus() *Corpus { - return &Corpus{ + c := &Corpus{ blobs: make(map[blob.Ref]*camtypes.BlobMeta), camBlobs: make(map[string]map[blob.Ref]*camtypes.BlobMeta), files: make(map[blob.Ref]camtypes.FileInfo), @@ -161,6 +164,15 @@ func newCorpus() *Corpus { deletes: make(map[blob.Ref][]deletion), claimBack: make(map[blob.Ref][]*camtypes.Claim), } + c.permanodesByModtime = &lazySortedPermanodes{ + c: c, + pnTime: c.PermanodeModtimeLocked, + } + c.permanodesByTime = &lazySortedPermanodes{ + c: c, + pnTime: c.PermanodeAnyTimeLocked, + } + return c } func NewCorpusFromStorage(s sorted.KeyValue) (*Corpus, error) { @@ -708,16 +720,73 @@ func (s byPermanodeTime) Less(i, j int) bool { return s[i].t.Before(s[j].t) } -func (c *Corpus) permanodesByModtimeLocked() []pnAndTime { - pns := make([]pnAndTime, 0, len(c.permanodes)) - for pn := range c.permanodes { - if c.IsDeletedLocked(pn) { - continue +type lazySortedPermanodes struct { + c *Corpus + pnTime func(blob.Ref) (time.Time, bool) // returns permanode's time (if any) to sort on + + mu sync.Mutex // guards sortedCache and ofGen + sortedCache []pnAndTime // nil if invalidated + sortedCacheReversed []pnAndTime // nil if invalidated + ofGen int64 // the Corpus.gen from which sortedCache was built +} + +func reversedCopy(original []pnAndTime) []pnAndTime { + l := len(original) + reversed := make([]pnAndTime, l) + for k, v := range original { + reversed[l-1-k] = v + } + return reversed +} + +// The Corpus must already be locked with RLock. +func (lsp *lazySortedPermanodes) sorted(reverse bool) []pnAndTime { + lsp.mu.Lock() + defer lsp.mu.Unlock() + if lsp.ofGen == lsp.c.gen { + // corpus hasn't changed -> caches are still valid, if they exist. + if reverse { + if lsp.sortedCacheReversed != nil { + return lsp.sortedCacheReversed + } + if lsp.sortedCache != nil { + // using sortedCache to quickly build sortedCacheReversed + lsp.sortedCacheReversed = reversedCopy(lsp.sortedCache) + return lsp.sortedCacheReversed + } } - if modt, ok := c.PermanodeModtimeLocked(pn); ok { - pns = append(pns, pnAndTime{pn, modt}) + if !reverse { + if lsp.sortedCache != nil { + return lsp.sortedCache + } + if lsp.sortedCacheReversed != nil { + // using sortedCacheReversed to quickly build sortedCache + lsp.sortedCache = reversedCopy(lsp.sortedCacheReversed) + return lsp.sortedCache + } } } + // invalidate the caches + lsp.sortedCache = nil + lsp.sortedCacheReversed = nil + pns := make([]pnAndTime, 0, len(lsp.c.permanodes)) + for pn := range lsp.c.permanodes { + if lsp.c.IsDeletedLocked(pn) { + continue + } + if pt, ok := lsp.pnTime(pn); ok { + pns = append(pns, pnAndTime{pn, pt}) + } + } + // and rebuild one of them + if reverse { + sort.Sort(sort.Reverse(byPermanodeTime(pns))) + lsp.sortedCacheReversed = pns + } else { + sort.Sort(byPermanodeTime(pns)) + lsp.sortedCache = pns + } + lsp.ofGen = lsp.c.gen return pns } @@ -745,23 +814,7 @@ func (c *Corpus) sendPermanodes(ctx *context.Context, ch chan<- camtypes.BlobMet func (c *Corpus) EnumeratePermanodesLastModifiedLocked(ctx *context.Context, ch chan<- camtypes.BlobMeta) error { defer close(ch) - pns := c.permanodesByModtimeLocked() - sort.Sort(sort.Reverse(byPermanodeTime(pns))) - return c.sendPermanodes(ctx, ch, pns) -} - -func (c *Corpus) permanodesByTimeLocked() []pnAndTime { - // TODO: cache this - pns := make([]pnAndTime, 0, len(c.permanodes)) - for pn := range c.permanodes { - if c.IsDeletedLocked(pn) { - continue - } - if pt, ok := c.PermanodeAnyTimeLocked(pn); ok { - pns = append(pns, pnAndTime{pn, pt}) - } - } - return pns + return c.sendPermanodes(ctx, ch, c.permanodesByModtime.sorted(true)) } // EnumeratePermanodesCreatedLocked sends all permanodes to ch, or until ctx is done. @@ -772,14 +825,7 @@ func (c *Corpus) permanodesByTimeLocked() []pnAndTime { func (c *Corpus) EnumeratePermanodesCreatedLocked(ctx *context.Context, ch chan<- camtypes.BlobMeta, newestFirst bool) error { defer close(ch) - pns := c.permanodesByTimeLocked() - if newestFirst { - sort.Sort(sort.Reverse(byPermanodeTime(pns))) - } else { - sort.Sort(byPermanodeTime(pns)) - } - - return c.sendPermanodes(ctx, ch, pns) + return c.sendPermanodes(ctx, ch, c.permanodesByTime.sorted(newestFirst)) } func (c *Corpus) GetBlobMeta(br blob.Ref) (camtypes.BlobMeta, error) { diff --git a/pkg/index/corpus_test.go b/pkg/index/corpus_test.go index 2b91d150d..8227a0aff 100644 --- a/pkg/index/corpus_test.go +++ b/pkg/index/corpus_test.go @@ -17,6 +17,7 @@ limitations under the License. package index_test import ( + "fmt" "reflect" "testing" "time" @@ -339,3 +340,146 @@ func testEnumerateOrder(t *testing.T, } } } + +// should be run with -race +func TestCacheSortedPermanodes_ModtimeRace(t *testing.T) { + testCacheSortedPermanodesRace(t, + func(c *index.Corpus, ctx *context.Context, ch chan<- camtypes.BlobMeta) error { + return c.EnumeratePermanodesLastModifiedLocked(ctx, ch) + }, + ) +} + +// should be run with -race +func TestCacheSortedPermanodes_CreateTimeRace(t *testing.T) { + testCacheSortedPermanodesRace(t, + func(c *index.Corpus, ctx *context.Context, ch chan<- camtypes.BlobMeta) error { + return c.EnumeratePermanodesCreatedLocked(ctx, ch, true) + }, + ) +} + +func testCacheSortedPermanodesRace(t *testing.T, + enumFunc func(*index.Corpus, *context.Context, chan<- camtypes.BlobMeta) error) { + idx := index.NewMemoryIndex() + idxd := indextest.NewIndexDeps(idx) + idxd.Fataler = t + c, err := idxd.Index.KeepInMemory() + if err != nil { + t.Fatalf("error slurping index to memory: %v", err) + } + donec := make(chan struct{}) + go func() { + for i := 0; i < 100; i++ { + nth := fmt.Sprintf("%d", i) + pn := idxd.NewPlannedPermanode(nth) + idxd.SetAttribute(pn, "tag", nth) + } + donec <- struct{}{} + }() + go func() { + for i := 0; i < 10; i++ { + ch := make(chan camtypes.BlobMeta, 10) + errc := make(chan error, 1) + c.RLock() + go func() { errc <- enumFunc(c, context.TODO(), ch) }() + for _ = range ch { + } + err := <-errc + c.RUnlock() + if err != nil { + t.Fatalf("Could not enumerate permanodes: %v", err) + } + } + donec <- struct{}{} + }() + <-donec + <-donec +} + +func TestLazySortedPermanodes(t *testing.T) { + idx := index.NewMemoryIndex() + idxd := indextest.NewIndexDeps(idx) + idxd.Fataler = t + c, err := idxd.Index.KeepInMemory() + if err != nil { + t.Fatalf("error slurping index to memory: %v", err) + } + + lsp := c.Exp_LSPByTime(false) + if len(lsp) != 0 { + t.Fatal("LazySortedPermanodes cache should be empty on startup") + } + + pn := idxd.NewPlannedPermanode("one") + idxd.SetAttribute(pn, "tag", "one") + + enum := func(reverse bool) { + ch := make(chan camtypes.BlobMeta, 10) + errc := make(chan error, 1) + c.RLock() + go func() { errc <- c.EnumeratePermanodesCreatedLocked(context.TODO(), ch, reverse) }() + for _ = range ch { + } + err := <-errc + c.RUnlock() + if err != nil { + t.Fatalf("Could not enumerate permanodes: %v", err) + } + } + enum(false) + lsp = c.Exp_LSPByTime(false) + if len(lsp) != 1 { + t.Fatalf("LazySortedPermanodes after 1st enum: got %v items, wanted 1", len(lsp)) + } + lsp = c.Exp_LSPByTime(true) + if len(lsp) != 0 { + t.Fatalf("LazySortedPermanodes reversed after 1st enum: got %v items, wanted 0", len(lsp)) + } + + enum(true) + lsp = c.Exp_LSPByTime(false) + if len(lsp) != 1 { + t.Fatalf("LazySortedPermanodes after 2nd enum: got %v items, wanted 1", len(lsp)) + } + lsp = c.Exp_LSPByTime(true) + if len(lsp) != 1 { + t.Fatalf("LazySortedPermanodes reversed after 2nd enum: got %v items, wanted 1", len(lsp)) + } + + pn = idxd.NewPlannedPermanode("two") + idxd.SetAttribute(pn, "tag", "two") + + enum(true) + lsp = c.Exp_LSPByTime(false) + if len(lsp) != 0 { + t.Fatalf("LazySortedPermanodes after 2nd permanode: got %v items, wanted 0 because of cache invalidation", len(lsp)) + } + lsp = c.Exp_LSPByTime(true) + if len(lsp) != 2 { + t.Fatalf("LazySortedPermanodes reversed after 2nd permanode: got %v items, wanted 2", len(lsp)) + } + + pn = idxd.NewPlannedPermanode("three") + idxd.SetAttribute(pn, "tag", "three") + + enum(false) + lsp = c.Exp_LSPByTime(true) + if len(lsp) != 0 { + t.Fatalf("LazySortedPermanodes reversed after 3rd permanode: got %v items, wanted 0 because of cache invalidation", len(lsp)) + } + lsp = c.Exp_LSPByTime(false) + if len(lsp) != 3 { + t.Fatalf("LazySortedPermanodes after 3rd permanode: got %v items, wanted 3", len(lsp)) + } + + enum(true) + lsp = c.Exp_LSPByTime(false) + if len(lsp) != 3 { + t.Fatalf("LazySortedPermanodes after 5th enum: got %v items, wanted 3", len(lsp)) + } + lsp = c.Exp_LSPByTime(true) + if len(lsp) != 3 { + t.Fatalf("LazySortedPermanodes reversed after 5th enum: got %v items, wanted 3", len(lsp)) + } +} diff --git a/pkg/index/export_test.go b/pkg/index/export_test.go index 2da0c2f35..46ab15b8e 100644 --- a/pkg/index/export_test.go +++ b/pkg/index/export_test.go @@ -79,3 +79,30 @@ func (x *Index) Exp_AwaitReindexing(t *testing.T) { } t.Fatal("timeout waiting for readyReindex to drain") } + +type ExpPnAndTime pnAndTime + +// Exp_LSPByTime returns the sorted cache lazySortedPermanodes for +// permanodesByTime (or the reverse sorted one). +func (c *Corpus) Exp_LSPByTime(reverse bool) []ExpPnAndTime { + if c.permanodesByTime == nil { + return nil + } + var pn []ExpPnAndTime + if reverse { + if c.permanodesByTime.sortedCacheReversed != nil { + for _, v := range c.permanodesByTime.sortedCacheReversed { + pn = append(pn, ExpPnAndTime(v)) + } + return pn + } + } else { + if c.permanodesByTime.sortedCache != nil { + for _, v := range c.permanodesByTime.sortedCache { + pn = append(pn, ExpPnAndTime(v)) + } + return pn + } + } + return nil +}