This commit is contained in:
Brad Fitzpatrick 2014-06-26 11:36:56 -07:00
commit 1cddedd7d8
4 changed files with 279 additions and 40 deletions

View File

@ -24,6 +24,7 @@ import (
"io" "io"
"math" "math"
"net/http" "net/http"
"os"
"camlistore.org/third_party/code.google.com/p/goauth2/oauth" "camlistore.org/third_party/code.google.com/p/goauth2/oauth"
client "camlistore.org/third_party/code.google.com/p/google-api-go-client/drive/v2" client "camlistore.org/third_party/code.google.com/p/google-api-go-client/drive/v2"
@ -42,28 +43,38 @@ type DriveService struct {
parentId string parentId string
} }
// New initiates a new DriveService. // New initiates a new DriveService. parentId is the ID of the directory
// that will be used as the current directory in methods on the returned
// DriveService (such as Get). If empty, it defaults to the root of the
// drive.
func New(transport *oauth.Transport, parentId string) (*DriveService, error) { func New(transport *oauth.Transport, parentId string) (*DriveService, error) {
apiservice, err := client.New(transport.Client()) apiservice, err := client.New(transport.Client())
if err != nil { if err != nil {
return nil, err return nil, err
} }
if parentId == "" {
// because "root" is known as a special alias for the root directory in drive.
parentId = "root"
}
service := &DriveService{transport: transport, apiservice: apiservice, parentId: parentId} service := &DriveService{transport: transport, apiservice: apiservice, parentId: parentId}
return service, err return service, err
} }
// Get retrieves a file with its title // Get retrieves a file with its title equal to the provided id and a child of
// the parentId as given to New. If not found, os.ErrNotExist is returned.
func (s *DriveService) Get(id string) (*client.File, error) { func (s *DriveService) Get(id string) (*client.File, error) {
req := s.apiservice.Files.List() req := s.apiservice.Files.List()
// TODO: use field selectors // TODO: use field selectors
query := fmt.Sprintf("'%s' in parents and title = '%s'", s.parentId, id) query := fmt.Sprintf("'%s' in parents and title = '%s'", s.parentId, id)
req.Q(query) req.Q(query)
files, err := req.Do() files, err := req.Do()
if err != nil {
if err != nil || len(files.Items) < 1 {
return nil, err return nil, err
} }
return files.Items[0], err if len(files.Items) < 1 {
return nil, os.ErrNotExist
}
return files.Items[0], nil
} }
// Lists the folder identified by parentId. // Lists the folder identified by parentId.
@ -104,14 +115,25 @@ func (s *DriveService) Upsert(id string, data io.Reader) (file *client.File, err
return s.apiservice.Files.Update(file.Id, file).Media(data).Do() return s.apiservice.Files.Update(file.Id, file).Media(data).Do()
} }
var errNoDownload = errors.New("file can not be downloaded directly (conversion needed?)")
// Fetch retrieves the metadata and contents of a file. // Fetch retrieves the metadata and contents of a file.
func (s *DriveService) Fetch(id string) (body io.ReadCloser, size uint32, err error) { func (s *DriveService) Fetch(id string) (body io.ReadCloser, size uint32, err error) {
file, err := s.Get(id) file, err := s.Get(id)
if err != nil {
return
}
// TODO: maybe in the case of no download link, remove the file. // TODO: maybe in the case of no download link, remove the file.
// The file should have malformed or converted to a Docs file // The file should have malformed or converted to a Docs file
// unwantedly. // unwantedly.
if err != nil || file == nil || file.DownloadUrl != "" { // TODO(mpl): I do not think the above comment is accurate. It
// looks like at least one case we do not get a DownloadUrl is when
// the UI would make you pick a conversion format first (spreadsheet,
// doc, etc). -> we should see if the API offers the possibility to do
// that conversion. and we could pass the type(s) we want (pdf, xls, doc...)
// as arguments (in an options struct) to Fetch.
if file.DownloadUrl == "" {
err = errNoDownload
return return
} }

View File

@ -104,6 +104,9 @@ type Corpus struct {
mediaTags map[blob.Ref]map[string]string // wholeref -> "album" -> "foo" mediaTags map[blob.Ref]map[string]string // wholeref -> "album" -> "foo"
permanodesByTime *lazySortedPermanodes // cache of permanodes sorted by creation time.
permanodesByModtime *lazySortedPermanodes // cache of permanodes sorted by modtime.
// scratch string slice // scratch string slice
ss []string ss []string
} }
@ -146,7 +149,7 @@ type PermanodeMeta struct {
} }
func newCorpus() *Corpus { func newCorpus() *Corpus {
return &Corpus{ c := &Corpus{
blobs: make(map[blob.Ref]*camtypes.BlobMeta), blobs: make(map[blob.Ref]*camtypes.BlobMeta),
camBlobs: make(map[string]map[blob.Ref]*camtypes.BlobMeta), camBlobs: make(map[string]map[blob.Ref]*camtypes.BlobMeta),
files: make(map[blob.Ref]camtypes.FileInfo), files: make(map[blob.Ref]camtypes.FileInfo),
@ -161,6 +164,15 @@ func newCorpus() *Corpus {
deletes: make(map[blob.Ref][]deletion), deletes: make(map[blob.Ref][]deletion),
claimBack: make(map[blob.Ref][]*camtypes.Claim), claimBack: make(map[blob.Ref][]*camtypes.Claim),
} }
c.permanodesByModtime = &lazySortedPermanodes{
c: c,
pnTime: c.PermanodeModtimeLocked,
}
c.permanodesByTime = &lazySortedPermanodes{
c: c,
pnTime: c.PermanodeAnyTimeLocked,
}
return c
} }
func NewCorpusFromStorage(s sorted.KeyValue) (*Corpus, error) { func NewCorpusFromStorage(s sorted.KeyValue) (*Corpus, error) {
@ -708,16 +720,73 @@ func (s byPermanodeTime) Less(i, j int) bool {
return s[i].t.Before(s[j].t) return s[i].t.Before(s[j].t)
} }
func (c *Corpus) permanodesByModtimeLocked() []pnAndTime { type lazySortedPermanodes struct {
pns := make([]pnAndTime, 0, len(c.permanodes)) c *Corpus
for pn := range c.permanodes { pnTime func(blob.Ref) (time.Time, bool) // returns permanode's time (if any) to sort on
if c.IsDeletedLocked(pn) {
continue mu sync.Mutex // guards sortedCache and ofGen
sortedCache []pnAndTime // nil if invalidated
sortedCacheReversed []pnAndTime // nil if invalidated
ofGen int64 // the Corpus.gen from which sortedCache was built
}
func reversedCopy(original []pnAndTime) []pnAndTime {
l := len(original)
reversed := make([]pnAndTime, l)
for k, v := range original {
reversed[l-1-k] = v
}
return reversed
}
// The Corpus must already be locked with RLock.
func (lsp *lazySortedPermanodes) sorted(reverse bool) []pnAndTime {
lsp.mu.Lock()
defer lsp.mu.Unlock()
if lsp.ofGen == lsp.c.gen {
// corpus hasn't changed -> caches are still valid, if they exist.
if reverse {
if lsp.sortedCacheReversed != nil {
return lsp.sortedCacheReversed
}
if lsp.sortedCache != nil {
// using sortedCache to quickly build sortedCacheReversed
lsp.sortedCacheReversed = reversedCopy(lsp.sortedCache)
return lsp.sortedCacheReversed
}
} }
if modt, ok := c.PermanodeModtimeLocked(pn); ok { if !reverse {
pns = append(pns, pnAndTime{pn, modt}) if lsp.sortedCache != nil {
return lsp.sortedCache
}
if lsp.sortedCacheReversed != nil {
// using sortedCacheReversed to quickly build sortedCache
lsp.sortedCache = reversedCopy(lsp.sortedCacheReversed)
return lsp.sortedCache
}
} }
} }
// invalidate the caches
lsp.sortedCache = nil
lsp.sortedCacheReversed = nil
pns := make([]pnAndTime, 0, len(lsp.c.permanodes))
for pn := range lsp.c.permanodes {
if lsp.c.IsDeletedLocked(pn) {
continue
}
if pt, ok := lsp.pnTime(pn); ok {
pns = append(pns, pnAndTime{pn, pt})
}
}
// and rebuild one of them
if reverse {
sort.Sort(sort.Reverse(byPermanodeTime(pns)))
lsp.sortedCacheReversed = pns
} else {
sort.Sort(byPermanodeTime(pns))
lsp.sortedCache = pns
}
lsp.ofGen = lsp.c.gen
return pns return pns
} }
@ -745,23 +814,7 @@ func (c *Corpus) sendPermanodes(ctx *context.Context, ch chan<- camtypes.BlobMet
func (c *Corpus) EnumeratePermanodesLastModifiedLocked(ctx *context.Context, ch chan<- camtypes.BlobMeta) error { func (c *Corpus) EnumeratePermanodesLastModifiedLocked(ctx *context.Context, ch chan<- camtypes.BlobMeta) error {
defer close(ch) defer close(ch)
pns := c.permanodesByModtimeLocked() return c.sendPermanodes(ctx, ch, c.permanodesByModtime.sorted(true))
sort.Sort(sort.Reverse(byPermanodeTime(pns)))
return c.sendPermanodes(ctx, ch, pns)
}
func (c *Corpus) permanodesByTimeLocked() []pnAndTime {
// TODO: cache this
pns := make([]pnAndTime, 0, len(c.permanodes))
for pn := range c.permanodes {
if c.IsDeletedLocked(pn) {
continue
}
if pt, ok := c.PermanodeAnyTimeLocked(pn); ok {
pns = append(pns, pnAndTime{pn, pt})
}
}
return pns
} }
// EnumeratePermanodesCreatedLocked sends all permanodes to ch, or until ctx is done. // EnumeratePermanodesCreatedLocked sends all permanodes to ch, or until ctx is done.
@ -772,14 +825,7 @@ func (c *Corpus) permanodesByTimeLocked() []pnAndTime {
func (c *Corpus) EnumeratePermanodesCreatedLocked(ctx *context.Context, ch chan<- camtypes.BlobMeta, newestFirst bool) error { func (c *Corpus) EnumeratePermanodesCreatedLocked(ctx *context.Context, ch chan<- camtypes.BlobMeta, newestFirst bool) error {
defer close(ch) defer close(ch)
pns := c.permanodesByTimeLocked() return c.sendPermanodes(ctx, ch, c.permanodesByTime.sorted(newestFirst))
if newestFirst {
sort.Sort(sort.Reverse(byPermanodeTime(pns)))
} else {
sort.Sort(byPermanodeTime(pns))
}
return c.sendPermanodes(ctx, ch, pns)
} }
func (c *Corpus) GetBlobMeta(br blob.Ref) (camtypes.BlobMeta, error) { func (c *Corpus) GetBlobMeta(br blob.Ref) (camtypes.BlobMeta, error) {

View File

@ -17,6 +17,7 @@ limitations under the License.
package index_test package index_test
import ( import (
"fmt"
"reflect" "reflect"
"testing" "testing"
"time" "time"
@ -339,3 +340,146 @@ func testEnumerateOrder(t *testing.T,
} }
} }
} }
// should be run with -race
func TestCacheSortedPermanodes_ModtimeRace(t *testing.T) {
testCacheSortedPermanodesRace(t,
func(c *index.Corpus, ctx *context.Context, ch chan<- camtypes.BlobMeta) error {
return c.EnumeratePermanodesLastModifiedLocked(ctx, ch)
},
)
}
// should be run with -race
func TestCacheSortedPermanodes_CreateTimeRace(t *testing.T) {
testCacheSortedPermanodesRace(t,
func(c *index.Corpus, ctx *context.Context, ch chan<- camtypes.BlobMeta) error {
return c.EnumeratePermanodesCreatedLocked(ctx, ch, true)
},
)
}
func testCacheSortedPermanodesRace(t *testing.T,
enumFunc func(*index.Corpus, *context.Context, chan<- camtypes.BlobMeta) error) {
idx := index.NewMemoryIndex()
idxd := indextest.NewIndexDeps(idx)
idxd.Fataler = t
c, err := idxd.Index.KeepInMemory()
if err != nil {
t.Fatalf("error slurping index to memory: %v", err)
}
donec := make(chan struct{})
go func() {
for i := 0; i < 100; i++ {
nth := fmt.Sprintf("%d", i)
pn := idxd.NewPlannedPermanode(nth)
idxd.SetAttribute(pn, "tag", nth)
}
donec <- struct{}{}
}()
go func() {
for i := 0; i < 10; i++ {
ch := make(chan camtypes.BlobMeta, 10)
errc := make(chan error, 1)
c.RLock()
go func() { errc <- enumFunc(c, context.TODO(), ch) }()
for _ = range ch {
}
err := <-errc
c.RUnlock()
if err != nil {
t.Fatalf("Could not enumerate permanodes: %v", err)
}
}
donec <- struct{}{}
}()
<-donec
<-donec
}
func TestLazySortedPermanodes(t *testing.T) {
idx := index.NewMemoryIndex()
idxd := indextest.NewIndexDeps(idx)
idxd.Fataler = t
c, err := idxd.Index.KeepInMemory()
if err != nil {
t.Fatalf("error slurping index to memory: %v", err)
}
lsp := c.Exp_LSPByTime(false)
if len(lsp) != 0 {
t.Fatal("LazySortedPermanodes cache should be empty on startup")
}
pn := idxd.NewPlannedPermanode("one")
idxd.SetAttribute(pn, "tag", "one")
enum := func(reverse bool) {
ch := make(chan camtypes.BlobMeta, 10)
errc := make(chan error, 1)
c.RLock()
go func() { errc <- c.EnumeratePermanodesCreatedLocked(context.TODO(), ch, reverse) }()
for _ = range ch {
}
err := <-errc
c.RUnlock()
if err != nil {
t.Fatalf("Could not enumerate permanodes: %v", err)
}
}
enum(false)
lsp = c.Exp_LSPByTime(false)
if len(lsp) != 1 {
t.Fatalf("LazySortedPermanodes after 1st enum: got %v items, wanted 1", len(lsp))
}
lsp = c.Exp_LSPByTime(true)
if len(lsp) != 0 {
t.Fatalf("LazySortedPermanodes reversed after 1st enum: got %v items, wanted 0", len(lsp))
}
enum(true)
lsp = c.Exp_LSPByTime(false)
if len(lsp) != 1 {
t.Fatalf("LazySortedPermanodes after 2nd enum: got %v items, wanted 1", len(lsp))
}
lsp = c.Exp_LSPByTime(true)
if len(lsp) != 1 {
t.Fatalf("LazySortedPermanodes reversed after 2nd enum: got %v items, wanted 1", len(lsp))
}
pn = idxd.NewPlannedPermanode("two")
idxd.SetAttribute(pn, "tag", "two")
enum(true)
lsp = c.Exp_LSPByTime(false)
if len(lsp) != 0 {
t.Fatalf("LazySortedPermanodes after 2nd permanode: got %v items, wanted 0 because of cache invalidation", len(lsp))
}
lsp = c.Exp_LSPByTime(true)
if len(lsp) != 2 {
t.Fatalf("LazySortedPermanodes reversed after 2nd permanode: got %v items, wanted 2", len(lsp))
}
pn = idxd.NewPlannedPermanode("three")
idxd.SetAttribute(pn, "tag", "three")
enum(false)
lsp = c.Exp_LSPByTime(true)
if len(lsp) != 0 {
t.Fatalf("LazySortedPermanodes reversed after 3rd permanode: got %v items, wanted 0 because of cache invalidation", len(lsp))
}
lsp = c.Exp_LSPByTime(false)
if len(lsp) != 3 {
t.Fatalf("LazySortedPermanodes after 3rd permanode: got %v items, wanted 3", len(lsp))
}
enum(true)
lsp = c.Exp_LSPByTime(false)
if len(lsp) != 3 {
t.Fatalf("LazySortedPermanodes after 5th enum: got %v items, wanted 3", len(lsp))
}
lsp = c.Exp_LSPByTime(true)
if len(lsp) != 3 {
t.Fatalf("LazySortedPermanodes reversed after 5th enum: got %v items, wanted 3", len(lsp))
}
}

View File

@ -79,3 +79,30 @@ func (x *Index) Exp_AwaitReindexing(t *testing.T) {
} }
t.Fatal("timeout waiting for readyReindex to drain") t.Fatal("timeout waiting for readyReindex to drain")
} }
type ExpPnAndTime pnAndTime
// Exp_LSPByTime returns the sorted cache lazySortedPermanodes for
// permanodesByTime (or the reverse sorted one).
func (c *Corpus) Exp_LSPByTime(reverse bool) []ExpPnAndTime {
if c.permanodesByTime == nil {
return nil
}
var pn []ExpPnAndTime
if reverse {
if c.permanodesByTime.sortedCacheReversed != nil {
for _, v := range c.permanodesByTime.sortedCacheReversed {
pn = append(pn, ExpPnAndTime(v))
}
return pn
}
} else {
if c.permanodesByTime.sortedCache != nil {
for _, v := range c.permanodesByTime.sortedCache {
pn = append(pn, ExpPnAndTime(v))
}
return pn
}
}
return nil
}