diff --git a/pkg/importer/gphotos/download.go b/pkg/importer/gphotos/download.go index 2418f61a5..b1b359557 100644 --- a/pkg/importer/gphotos/download.go +++ b/pkg/importer/gphotos/download.go @@ -19,6 +19,7 @@ package gphotos import ( "context" "io" + "log" "net/http" "strings" "time" @@ -41,7 +42,12 @@ const ( defaultRateLimit = rate.Limit(10) ) -// getUser helper function to return the user's email address. +// getUser returns the authenticated Google Drive user's User value, +// containing their name, email address, and "permission ID", +// which is the "The user's ID as visible in Permission resources" according +// to https://developers.google.com/drive/v3/reference/about#resource +// The permission ID becomes the "userID" (AcctAttrUserID) value on the +// account's "importerAccount" permanode. func getUser(ctx context.Context, client *http.Client) (*drive.User, error) { srv, err := drive.New(client) if err != nil { @@ -57,7 +63,7 @@ func getUser(ctx context.Context, client *http.Client) (*drive.User, error) { } type downloader struct { - // download rate limiter + // rate is the download rate limiter. rate *rate.Limiter *drive.Service @@ -90,9 +96,6 @@ func newDownloader(client *http.Client) (*downloader, error) { // Returns a new token to watch future changes. func (dl *downloader) photos(ctx context.Context, sinceToken string) (photosCh <-chan maybePhotos, nextToken string, err error) { - // reset the rate limiter - dl.rate.SetLimit(defaultRateLimit) - var sr *drive.StartPageToken if err := dl.rateLimit(ctx, func() error { var err error @@ -291,8 +294,7 @@ func (dl *downloader) fileAsPhoto(f *drive.File) photo { } // rateLimit calls f obeying the global Rate limit. -// On "Rate Limit Exceeded" error, the rate is throttled back, -// on success the limit is raised. +// On "Rate Limit Exceeded" error, it sleeps and tries later. func (dl *downloader) rateLimit(ctx context.Context, f func() error) error { const ( msgRateLimitExceeded = "Rate Limit Exceeded" @@ -300,42 +302,22 @@ func (dl *downloader) rateLimit(ctx context.Context, f func() error) error { msgUserRateLimitExceededShort = "userRateLimitExceeded" ) - var err error - first := true // Ensure a 1 minute try limit. - ctx, _ = context.WithTimeout(ctx, time.Minute) + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() for { - now := time.Now() if err := dl.rate.Wait(ctx); err != nil { + log.Printf("gphotos: rate limit failure: %v", err) return err } - // The scheduler may interrupt here, but we don't know anything better, so risk it. - dur := time.Since(now) - if err = f(); err == nil { - if first { - // If we're limited by the rate, then raise the limit! - lim := dl.rate.Limit() - // We're rate limited iff we wait at least the rate limit time: - if dur >= time.Duration(float64(time.Second)/float64(lim)) { - // to reach 1 again after a halving, it is - // * 7 steps with 1.1, - // * 3 steps with 1.25, - // * 2 steps with 1.5. - dl.rate.SetLimit(lim * 1.1) - } - } + err := f() + if err == nil { return nil } - first = false ge, ok := err.(*googleapi.Error) if !ok || ge.Code != 403 { return err } - if ge.Message != "" && - ge.Message != msgRateLimitExceeded && - ge.Message != msgUserRateLimitExceeded { - return err - } if ge.Message == "" { var ok bool for _, e := range ge.Errors { @@ -353,8 +335,9 @@ func (dl *downloader) rateLimit(ctx context.Context, f func() error) error { return err } } - - dl.rate.SetLimit(dl.rate.Limit() / 2) // halve the rate (exponential backoff) + // Some arbitrary sleep. + log.Printf("gphotos: sleeping for 5s after 403 error, presumably due to a rate limit") + time.Sleep(5 * time.Second) + log.Printf("gphotos: retrying after sleep...") } - return err } diff --git a/pkg/importer/gphotos/gphotos.go b/pkg/importer/gphotos/gphotos.go index 8d2a3f263..c0387be32 100644 --- a/pkg/importer/gphotos/gphotos.go +++ b/pkg/importer/gphotos/gphotos.go @@ -269,7 +269,7 @@ type run struct { incremental bool // whether we've completed a run in the past photoGate *syncutil.Gate setNextToken func(string) error - *downloader + dl *downloader } var forceFullImport, _ = strconv.ParseBool(os.Getenv("CAMLI_GPHOTOS_FULL_IMPORT")) @@ -312,7 +312,7 @@ func (imp) Run(rctx *importer.RunContext) error { incremental: !forceFullImport && acctNode.Attr(importer.AcctAttrCompletedVersion) == runCompleteVersion, photoGate: syncutil.NewGate(3), setNextToken: func(nextToken string) error { return acctNode.SetAttr(acctSinceToken, nextToken) }, - downloader: dl, + dl: dl, } if err := r.importPhotos(ctx, sinceToken); err != nil { return err @@ -326,12 +326,7 @@ func (imp) Run(rctx *importer.RunContext) error { } func (r *run) importPhotos(ctx context.Context, sinceToken string) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - photosCh, nextToken, err := r.downloader.photos(ctx, sinceToken) + photosCh, nextToken, err := r.dl.photos(ctx, sinceToken) if err != nil { return fmt.Errorf("gphotos importer: %v", err) } @@ -440,7 +435,7 @@ func (r *run) updatePhoto(ctx context.Context, parent *importer.Object, ph photo photoNode, err := parent.ChildPathObjectOrFunc(ph.ID, func() (*importer.Object, error) { h := blob.NewHash() - rc, err := r.downloader.openPhoto(ctx, ph) + rc, err := r.dl.openPhoto(ctx, ph) if err != nil { return nil, err } @@ -476,7 +471,7 @@ func (r *run) updatePhoto(ctx context.Context, parent *importer.Object, ph photo // been interrupted. So we check for an existing camliContent. if camliContent := photoNode.Attr(nodeattr.CamliContent); camliContent == "" { // looks like an incomplete node, so we need to re-download. - rc, err := r.downloader.openPhoto(ctx, ph) + rc, err := r.dl.openPhoto(ctx, ph) if err != nil { return err }