importer/pinboard: add incremental import

Enable "SupportsIncremental"

Fixes #1182

Change-Id: I75202fba95f5e481f68cfd95f20e40d32e33b377
This commit is contained in:
Niklas Merz 2018-06-01 11:06:55 +02:00
parent d6c616c615
commit 2e1b075acf
No known key found for this signature in database
GPG Key ID: 9D0402820FB84764
1 changed files with 70 additions and 24 deletions

View File

@ -50,6 +50,7 @@ import (
"log"
"net/http"
"strings"
"sync"
"time"
"perkeep.org/internal/httputil"
@ -68,6 +69,14 @@ func init() {
const (
fetchUrl = "https://api.pinboard.in/v1/posts/all?auth_token=%s&format=json&results=%d&todt=%s"
// runCompleteVersion is a cache-busting version number of the
// importer code. It should be incremented whenever the
// behavior of this importer is updated enough to warrant a
// complete run. Otherwise, if the importer runs to
// completion, this version number is recorded on the account
// permanode and subsequent importers can stop early.
runCompleteVersion = "1"
timeFormat = "2006-01-02T15:04:05Z"
// pauseInterval is the time we wait between fetching batches (for
@ -80,6 +89,11 @@ const (
attrAuthToken = "authToken"
// attrPostMeta is the attribute to store the meta tag of a pinboard post.
// It is used as the signal for duplicate detection, as the meta tag
// changes whenever a post is mutated.
attrPostMeta = "pinboard.in:meta"
// StatusTooManyRequests is the http status code returned by
// pinboard servers if we have made too many requests for a
// particular user. If we receive this status code, we should
@ -105,7 +119,7 @@ func (imp) Properties() importer.Properties {
return importer.Properties{
Title: "Pinboard",
Description: "import your pinboard.in posts",
SupportsIncremental: false,
SupportsIncremental: true,
NeedsAPIKey: false,
}
}
@ -165,16 +179,20 @@ func (im imp) ServeCallback(w http.ResponseWriter, r *http.Request, ctx *importe
func (im imp) Run(ctx *importer.RunContext) (err error) {
log.Printf("pinboard: Running importer.")
r := &run{
RunContext: ctx,
im: im,
postGate: syncutil.NewGate(3),
nextCursor: time.Now().Format(timeFormat),
nextAfter: time.Now(),
lastPause: pauseInterval,
RunContext: ctx,
im: im,
postGate: syncutil.NewGate(3),
nextCursor: time.Now().Format(timeFormat),
nextAfter: time.Now(),
lastPause: pauseInterval,
incremental: ctx.AccountNode().Attr(importer.AcctAttrCompletedVersion) == runCompleteVersion,
}
_, err = r.importPosts()
err = r.importPosts()
log.Printf("pinboard: Importer returned %v.", err)
return
if err != nil {
return err
}
return r.AccountNode().SetAttrs(importer.AcctAttrCompletedVersion, runCompleteVersion)
}
func (im imp) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@ -186,7 +204,8 @@ type run struct {
im imp
postGate *syncutil.Gate
// Return only bookmarks created before this time (exclusive bound)
// nextCursor is the exclusive bound, to fetch only bookmarks created before this time.
// The pinboard API returns most recent posts first.
nextCursor string
// We should not fetch the next batch until this time (exclusive bound)
@ -197,6 +216,8 @@ type run struct {
// pinboard, it gets doubled. It will be used to calculate the
// next time we fetch from pinboard.
lastPause time.Duration
incremental bool // whether we've completed a run in the past
}
func (r *run) getPostsNode() (*importer.Object, error) {
@ -215,22 +236,22 @@ func (r *run) getPostsNode() (*importer.Object, error) {
return obj, obj.SetAttr(nodeattr.Title, title)
}
func (r *run) importPosts() (*importer.Object, error) {
func (r *run) importPosts() error {
authToken := r.AccountNode().Attr(attrAuthToken)
parent, err := r.getPostsNode()
if err != nil {
return nil, err
return err
}
keepTrying := true
for keepTrying {
keepTrying, err = r.importBatch(authToken, parent)
if err != nil {
return nil, err
return err
}
}
return parent, nil
return nil
}
// Used to parse json
@ -294,7 +315,11 @@ func (r *run) importBatch(authToken string, parent *importer.Object) (keepTrying
}
log.Printf("pinboard: Importing %d posts...", postCount)
var grp syncutil.Group
var (
allDupMu sync.Mutex
allDups = true
grp syncutil.Group
)
for _, post := range postBatch {
select {
case <-r.Context().Done():
@ -307,28 +332,47 @@ func (r *run) importBatch(authToken string, parent *importer.Object) (keepTrying
r.postGate.Start()
grp.Go(func() error {
defer r.postGate.Done()
return r.importPost(&post, parent)
dup, err := r.importPost(&post, parent)
if !dup {
allDupMu.Lock()
allDups = false
allDupMu.Unlock()
}
return err
})
}
if err := grp.Err(); err != nil {
return false, err
}
log.Printf("pinboard: Imported batch of %d posts in %s.", postCount, time.Since(start))
if r.incremental && allDups {
log.Printf("pinboard: incremental import found end batch")
return false, nil
}
r.nextCursor = postBatch[postCount-1].Time
r.lastPause = pauseInterval
r.nextAfter = time.Now().Add(pauseInterval)
tryAgain := postCount == batchLimit
return tryAgain, grp.Err()
return tryAgain, nil
}
func (r *run) importPost(post *apiPost, parent *importer.Object) error {
func (r *run) importPost(post *apiPost, parent *importer.Object) (dup bool, err error) {
postNode, err := parent.ChildPathObject(post.Hash)
if err != nil {
return err
return false, err
}
//Check for duplicates
if post.Meta != "" && postNode.Attr(attrPostMeta) == post.Meta {
return true, nil
}
t, err := time.Parse(timeFormat, post.Time)
if err != nil {
return err
return false, err
}
attrs := []string{
@ -338,16 +382,18 @@ func (r *run) importPost(post *apiPost, parent *importer.Object) error {
nodeattr.Title, post.Description,
nodeattr.URL, post.Href,
"pinboard.in:extended", post.Extended,
"pinboard.in:meta", post.Meta,
"pinboard.in:shared", post.Shared,
"pinboard.in:toread", post.ToRead,
}
if err = postNode.SetAttrs(attrs...); err != nil {
return err
return false, err
}
if err = postNode.SetAttrValues("tag", strings.Split(post.Tags, " ")); err != nil {
return err
return false, err
}
if err = postNode.SetAttr(attrPostMeta, post.Meta); err != nil {
return false, err
}
return nil
return false, nil
}