From bf8e52a88936935884c71f9444348e02c30c7ae1 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 14 May 2014 10:21:13 -0700 Subject: [PATCH] importer: periodic import Change-Id: Id324890833cb8f5672a0fa3a8df342dc05e6298c --- pkg/importer/importer.go | 47 ++++++++++++++++++++++++++++++---------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/pkg/importer/importer.go b/pkg/importer/importer.go index 6b9726676..c376a77ae 100644 --- a/pkg/importer/importer.go +++ b/pkg/importer/importer.go @@ -433,7 +433,7 @@ func (h *Host) startPeriodicImporters() { continue } attrs := desBlob.Permanode.Attr - if attrs.Get("camliNodeType") != "importerAccount" { + if attrs.Get(attrNodeType) != nodeTypeImporterAccount { panic("Search result returned non-importerAccount") } impType := attrs.Get("importerType") @@ -441,21 +441,41 @@ func (h *Host) startPeriodicImporters() { if !ok { continue } - duration, err := time.ParseDuration(attrs.Get(attrImportAuto)) - if duration == 0 || err != nil { - continue - } ia, err := imp.account(blob) if err != nil { log.Printf("Can't load importer account %v for regular importing: %v", blob, err) continue } - log.Printf("Starting regular periodic %v import for account %v: %v", impType, blob, ia) - go ia.start() - // TODO: do it more than once on start-up. + go ia.maybeStart() } } +func (ia *importerAcct) maybeStart() { + acctObj, err := ia.im.host.ObjectFromRef(ia.acct.PermanodeRef()) + if err != nil { + log.Printf("Error maybe starting %v: %v", ia.acct.PermanodeRef(), err) + return + } + duration, err := time.ParseDuration(acctObj.Attr(attrImportAuto)) + if duration == 0 || err != nil { + return + } + ia.mu.Lock() + defer ia.mu.Unlock() + if ia.current != nil { + return + } + if ia.lastRunDone.After(time.Now().Add(-duration)) { + sleepFor := ia.lastRunDone.Add(duration).Sub(time.Now()) + log.Printf("%v ran recently enough. Sleeping for %v.", ia, sleepFor) + time.AfterFunc(sleepFor, ia.maybeStart) + return + } + + log.Printf("Starting regular periodic import for %v", ia) + go ia.start() +} + // BaseURL returns the root of the whole server, without trailing // slash. func (h *Host) BaseURL() string { @@ -731,6 +751,10 @@ type importerAcct struct { lastRunDone time.Time } +func (ia *importerAcct) String() string { + return fmt.Sprintf("%v importer account, %v", ia.im.name, ia.acct.PermanodeRef()) +} + func (ia *importerAcct) delete() error { if err := ia.acct.SetAttrs( attrNodeType, nodeTypeImporter+"-deleted", @@ -873,12 +897,12 @@ func (ia *importerAcct) start() { ia.stopped = false ia.lastRunStart = time.Now() go func() { - log.Printf("Starting importer %s: %s", ia.im.name, ia.AccountLinkSummary()) + log.Printf("Starting %v: %s", ia, ia.AccountLinkSummary()) err := ia.im.impl.Run(rc) if err != nil { - log.Printf("Importer %s error: %v", ia.im.name, err) + log.Printf("%v error: %v", ia, err) } else { - log.Printf("Importer %s finished.", ia.im.name) + log.Printf("%v finished.", ia) } ia.mu.Lock() defer ia.mu.Unlock() @@ -886,6 +910,7 @@ func (ia *importerAcct) start() { ia.stopped = false ia.lastRunDone = time.Now() ia.lastRunErr = err + go ia.maybeStart() }() }