From fcd5ff14e99ca21a61075a296e88e9c18caaefba Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 20 Sep 2017 22:06:31 -0700 Subject: [PATCH] importer/gphotos: misc rate limit cleanups Remove dynamic rate limit adjustment for now. It was racy. No need to be super fast, anyway, as long as it catches up eventually. But we can make it smarter later. I wanted to get it correct first. Change-Id: Id5b5fc946546d8d9c0720f1c0ec2f341a17cdd01 --- pkg/importer/gphotos/download.go | 53 +++++++++++--------------------- pkg/importer/gphotos/gphotos.go | 15 +++------ 2 files changed, 23 insertions(+), 45 deletions(-) diff --git a/pkg/importer/gphotos/download.go b/pkg/importer/gphotos/download.go index 2418f61a5..b1b359557 100644 --- a/pkg/importer/gphotos/download.go +++ b/pkg/importer/gphotos/download.go @@ -19,6 +19,7 @@ package gphotos import ( "context" "io" + "log" "net/http" "strings" "time" @@ -41,7 +42,12 @@ const ( defaultRateLimit = rate.Limit(10) ) -// getUser helper function to return the user's email address. +// getUser returns the authenticated Google Drive user's User value, +// containing their name, email address, and "permission ID", +// which is the "The user's ID as visible in Permission resources" according +// to https://developers.google.com/drive/v3/reference/about#resource +// The permission ID becomes the "userID" (AcctAttrUserID) value on the +// account's "importerAccount" permanode. func getUser(ctx context.Context, client *http.Client) (*drive.User, error) { srv, err := drive.New(client) if err != nil { @@ -57,7 +63,7 @@ func getUser(ctx context.Context, client *http.Client) (*drive.User, error) { } type downloader struct { - // download rate limiter + // rate is the download rate limiter. rate *rate.Limiter *drive.Service @@ -90,9 +96,6 @@ func newDownloader(client *http.Client) (*downloader, error) { // Returns a new token to watch future changes. func (dl *downloader) photos(ctx context.Context, sinceToken string) (photosCh <-chan maybePhotos, nextToken string, err error) { - // reset the rate limiter - dl.rate.SetLimit(defaultRateLimit) - var sr *drive.StartPageToken if err := dl.rateLimit(ctx, func() error { var err error @@ -291,8 +294,7 @@ func (dl *downloader) fileAsPhoto(f *drive.File) photo { } // rateLimit calls f obeying the global Rate limit. -// On "Rate Limit Exceeded" error, the rate is throttled back, -// on success the limit is raised. +// On "Rate Limit Exceeded" error, it sleeps and tries later. func (dl *downloader) rateLimit(ctx context.Context, f func() error) error { const ( msgRateLimitExceeded = "Rate Limit Exceeded" @@ -300,42 +302,22 @@ func (dl *downloader) rateLimit(ctx context.Context, f func() error) error { msgUserRateLimitExceededShort = "userRateLimitExceeded" ) - var err error - first := true // Ensure a 1 minute try limit. - ctx, _ = context.WithTimeout(ctx, time.Minute) + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() for { - now := time.Now() if err := dl.rate.Wait(ctx); err != nil { + log.Printf("gphotos: rate limit failure: %v", err) return err } - // The scheduler may interrupt here, but we don't know anything better, so risk it. - dur := time.Since(now) - if err = f(); err == nil { - if first { - // If we're limited by the rate, then raise the limit! - lim := dl.rate.Limit() - // We're rate limited iff we wait at least the rate limit time: - if dur >= time.Duration(float64(time.Second)/float64(lim)) { - // to reach 1 again after a halving, it is - // * 7 steps with 1.1, - // * 3 steps with 1.25, - // * 2 steps with 1.5. - dl.rate.SetLimit(lim * 1.1) - } - } + err := f() + if err == nil { return nil } - first = false ge, ok := err.(*googleapi.Error) if !ok || ge.Code != 403 { return err } - if ge.Message != "" && - ge.Message != msgRateLimitExceeded && - ge.Message != msgUserRateLimitExceeded { - return err - } if ge.Message == "" { var ok bool for _, e := range ge.Errors { @@ -353,8 +335,9 @@ func (dl *downloader) rateLimit(ctx context.Context, f func() error) error { return err } } - - dl.rate.SetLimit(dl.rate.Limit() / 2) // halve the rate (exponential backoff) + // Some arbitrary sleep. + log.Printf("gphotos: sleeping for 5s after 403 error, presumably due to a rate limit") + time.Sleep(5 * time.Second) + log.Printf("gphotos: retrying after sleep...") } - return err } diff --git a/pkg/importer/gphotos/gphotos.go b/pkg/importer/gphotos/gphotos.go index 8d2a3f263..c0387be32 100644 --- a/pkg/importer/gphotos/gphotos.go +++ b/pkg/importer/gphotos/gphotos.go @@ -269,7 +269,7 @@ type run struct { incremental bool // whether we've completed a run in the past photoGate *syncutil.Gate setNextToken func(string) error - *downloader + dl *downloader } var forceFullImport, _ = strconv.ParseBool(os.Getenv("CAMLI_GPHOTOS_FULL_IMPORT")) @@ -312,7 +312,7 @@ func (imp) Run(rctx *importer.RunContext) error { incremental: !forceFullImport && acctNode.Attr(importer.AcctAttrCompletedVersion) == runCompleteVersion, photoGate: syncutil.NewGate(3), setNextToken: func(nextToken string) error { return acctNode.SetAttr(acctSinceToken, nextToken) }, - downloader: dl, + dl: dl, } if err := r.importPhotos(ctx, sinceToken); err != nil { return err @@ -326,12 +326,7 @@ func (imp) Run(rctx *importer.RunContext) error { } func (r *run) importPhotos(ctx context.Context, sinceToken string) error { - select { - case <-ctx.Done(): - return ctx.Err() - default: - } - photosCh, nextToken, err := r.downloader.photos(ctx, sinceToken) + photosCh, nextToken, err := r.dl.photos(ctx, sinceToken) if err != nil { return fmt.Errorf("gphotos importer: %v", err) } @@ -440,7 +435,7 @@ func (r *run) updatePhoto(ctx context.Context, parent *importer.Object, ph photo photoNode, err := parent.ChildPathObjectOrFunc(ph.ID, func() (*importer.Object, error) { h := blob.NewHash() - rc, err := r.downloader.openPhoto(ctx, ph) + rc, err := r.dl.openPhoto(ctx, ph) if err != nil { return nil, err } @@ -476,7 +471,7 @@ func (r *run) updatePhoto(ctx context.Context, parent *importer.Object, ph photo // been interrupted. So we check for an existing camliContent. if camliContent := photoNode.Attr(nodeattr.CamliContent); camliContent == "" { // looks like an incomplete node, so we need to re-download. - rc, err := r.downloader.openPhoto(ctx, ph) + rc, err := r.dl.openPhoto(ctx, ph) if err != nil { return err }