importer/gphotos: misc rate limit cleanups

Remove dynamic rate limit adjustment for now. It was racy.

No need to be super fast, anyway, as long as it catches up eventually.

But we can make it smarter later. I wanted to get it correct first.

Change-Id: Id5b5fc946546d8d9c0720f1c0ec2f341a17cdd01
This commit is contained in:
Brad Fitzpatrick 2017-09-20 22:06:31 -07:00
parent 286dbacc62
commit fcd5ff14e9
2 changed files with 23 additions and 45 deletions

View File

@ -19,6 +19,7 @@ package gphotos
import (
"context"
"io"
"log"
"net/http"
"strings"
"time"
@ -41,7 +42,12 @@ const (
defaultRateLimit = rate.Limit(10)
)
// getUser helper function to return the user's email address.
// getUser returns the authenticated Google Drive user's User value,
// containing their name, email address, and "permission ID",
// which is the "The user's ID as visible in Permission resources" according
// to https://developers.google.com/drive/v3/reference/about#resource
// The permission ID becomes the "userID" (AcctAttrUserID) value on the
// account's "importerAccount" permanode.
func getUser(ctx context.Context, client *http.Client) (*drive.User, error) {
srv, err := drive.New(client)
if err != nil {
@ -57,7 +63,7 @@ func getUser(ctx context.Context, client *http.Client) (*drive.User, error) {
}
type downloader struct {
// download rate limiter
// rate is the download rate limiter.
rate *rate.Limiter
*drive.Service
@ -90,9 +96,6 @@ func newDownloader(client *http.Client) (*downloader, error) {
// Returns a new token to watch future changes.
func (dl *downloader) photos(ctx context.Context, sinceToken string) (photosCh <-chan maybePhotos, nextToken string, err error) {
// reset the rate limiter
dl.rate.SetLimit(defaultRateLimit)
var sr *drive.StartPageToken
if err := dl.rateLimit(ctx, func() error {
var err error
@ -291,8 +294,7 @@ func (dl *downloader) fileAsPhoto(f *drive.File) photo {
}
// rateLimit calls f obeying the global Rate limit.
// On "Rate Limit Exceeded" error, the rate is throttled back,
// on success the limit is raised.
// On "Rate Limit Exceeded" error, it sleeps and tries later.
func (dl *downloader) rateLimit(ctx context.Context, f func() error) error {
const (
msgRateLimitExceeded = "Rate Limit Exceeded"
@ -300,42 +302,22 @@ func (dl *downloader) rateLimit(ctx context.Context, f func() error) error {
msgUserRateLimitExceededShort = "userRateLimitExceeded"
)
var err error
first := true
// Ensure a 1 minute try limit.
ctx, _ = context.WithTimeout(ctx, time.Minute)
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
for {
now := time.Now()
if err := dl.rate.Wait(ctx); err != nil {
log.Printf("gphotos: rate limit failure: %v", err)
return err
}
// The scheduler may interrupt here, but we don't know anything better, so risk it.
dur := time.Since(now)
if err = f(); err == nil {
if first {
// If we're limited by the rate, then raise the limit!
lim := dl.rate.Limit()
// We're rate limited iff we wait at least the rate limit time:
if dur >= time.Duration(float64(time.Second)/float64(lim)) {
// to reach 1 again after a halving, it is
// * 7 steps with 1.1,
// * 3 steps with 1.25,
// * 2 steps with 1.5.
dl.rate.SetLimit(lim * 1.1)
}
}
err := f()
if err == nil {
return nil
}
first = false
ge, ok := err.(*googleapi.Error)
if !ok || ge.Code != 403 {
return err
}
if ge.Message != "" &&
ge.Message != msgRateLimitExceeded &&
ge.Message != msgUserRateLimitExceeded {
return err
}
if ge.Message == "" {
var ok bool
for _, e := range ge.Errors {
@ -353,8 +335,9 @@ func (dl *downloader) rateLimit(ctx context.Context, f func() error) error {
return err
}
}
dl.rate.SetLimit(dl.rate.Limit() / 2) // halve the rate (exponential backoff)
// Some arbitrary sleep.
log.Printf("gphotos: sleeping for 5s after 403 error, presumably due to a rate limit")
time.Sleep(5 * time.Second)
log.Printf("gphotos: retrying after sleep...")
}
return err
}

View File

@ -269,7 +269,7 @@ type run struct {
incremental bool // whether we've completed a run in the past
photoGate *syncutil.Gate
setNextToken func(string) error
*downloader
dl *downloader
}
var forceFullImport, _ = strconv.ParseBool(os.Getenv("CAMLI_GPHOTOS_FULL_IMPORT"))
@ -312,7 +312,7 @@ func (imp) Run(rctx *importer.RunContext) error {
incremental: !forceFullImport && acctNode.Attr(importer.AcctAttrCompletedVersion) == runCompleteVersion,
photoGate: syncutil.NewGate(3),
setNextToken: func(nextToken string) error { return acctNode.SetAttr(acctSinceToken, nextToken) },
downloader: dl,
dl: dl,
}
if err := r.importPhotos(ctx, sinceToken); err != nil {
return err
@ -326,12 +326,7 @@ func (imp) Run(rctx *importer.RunContext) error {
}
func (r *run) importPhotos(ctx context.Context, sinceToken string) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
photosCh, nextToken, err := r.downloader.photos(ctx, sinceToken)
photosCh, nextToken, err := r.dl.photos(ctx, sinceToken)
if err != nil {
return fmt.Errorf("gphotos importer: %v", err)
}
@ -440,7 +435,7 @@ func (r *run) updatePhoto(ctx context.Context, parent *importer.Object, ph photo
photoNode, err := parent.ChildPathObjectOrFunc(ph.ID, func() (*importer.Object, error) {
h := blob.NewHash()
rc, err := r.downloader.openPhoto(ctx, ph)
rc, err := r.dl.openPhoto(ctx, ph)
if err != nil {
return nil, err
}
@ -476,7 +471,7 @@ func (r *run) updatePhoto(ctx context.Context, parent *importer.Object, ph photo
// been interrupted. So we check for an existing camliContent.
if camliContent := photoNode.Attr(nodeattr.CamliContent); camliContent == "" {
// looks like an incomplete node, so we need to re-download.
rc, err := r.downloader.openPhoto(ctx, ph)
rc, err := r.dl.openPhoto(ctx, ph)
if err != nil {
return err
}