importer: periodic import

Change-Id: Id324890833cb8f5672a0fa3a8df342dc05e6298c
This commit is contained in:
Brad Fitzpatrick 2014-05-14 10:21:13 -07:00
parent bf618b030d
commit bf8e52a889
1 changed files with 36 additions and 11 deletions

View File

@ -433,7 +433,7 @@ func (h *Host) startPeriodicImporters() {
continue continue
} }
attrs := desBlob.Permanode.Attr attrs := desBlob.Permanode.Attr
if attrs.Get("camliNodeType") != "importerAccount" { if attrs.Get(attrNodeType) != nodeTypeImporterAccount {
panic("Search result returned non-importerAccount") panic("Search result returned non-importerAccount")
} }
impType := attrs.Get("importerType") impType := attrs.Get("importerType")
@ -441,21 +441,41 @@ func (h *Host) startPeriodicImporters() {
if !ok { if !ok {
continue continue
} }
duration, err := time.ParseDuration(attrs.Get(attrImportAuto))
if duration == 0 || err != nil {
continue
}
ia, err := imp.account(blob) ia, err := imp.account(blob)
if err != nil { if err != nil {
log.Printf("Can't load importer account %v for regular importing: %v", blob, err) log.Printf("Can't load importer account %v for regular importing: %v", blob, err)
continue continue
} }
log.Printf("Starting regular periodic %v import for account %v: %v", impType, blob, ia) go ia.maybeStart()
go ia.start()
// TODO: do it more than once on start-up.
} }
} }
func (ia *importerAcct) maybeStart() {
acctObj, err := ia.im.host.ObjectFromRef(ia.acct.PermanodeRef())
if err != nil {
log.Printf("Error maybe starting %v: %v", ia.acct.PermanodeRef(), err)
return
}
duration, err := time.ParseDuration(acctObj.Attr(attrImportAuto))
if duration == 0 || err != nil {
return
}
ia.mu.Lock()
defer ia.mu.Unlock()
if ia.current != nil {
return
}
if ia.lastRunDone.After(time.Now().Add(-duration)) {
sleepFor := ia.lastRunDone.Add(duration).Sub(time.Now())
log.Printf("%v ran recently enough. Sleeping for %v.", ia, sleepFor)
time.AfterFunc(sleepFor, ia.maybeStart)
return
}
log.Printf("Starting regular periodic import for %v", ia)
go ia.start()
}
// BaseURL returns the root of the whole server, without trailing // BaseURL returns the root of the whole server, without trailing
// slash. // slash.
func (h *Host) BaseURL() string { func (h *Host) BaseURL() string {
@ -731,6 +751,10 @@ type importerAcct struct {
lastRunDone time.Time lastRunDone time.Time
} }
func (ia *importerAcct) String() string {
return fmt.Sprintf("%v importer account, %v", ia.im.name, ia.acct.PermanodeRef())
}
func (ia *importerAcct) delete() error { func (ia *importerAcct) delete() error {
if err := ia.acct.SetAttrs( if err := ia.acct.SetAttrs(
attrNodeType, nodeTypeImporter+"-deleted", attrNodeType, nodeTypeImporter+"-deleted",
@ -873,12 +897,12 @@ func (ia *importerAcct) start() {
ia.stopped = false ia.stopped = false
ia.lastRunStart = time.Now() ia.lastRunStart = time.Now()
go func() { go func() {
log.Printf("Starting importer %s: %s", ia.im.name, ia.AccountLinkSummary()) log.Printf("Starting %v: %s", ia, ia.AccountLinkSummary())
err := ia.im.impl.Run(rc) err := ia.im.impl.Run(rc)
if err != nil { if err != nil {
log.Printf("Importer %s error: %v", ia.im.name, err) log.Printf("%v error: %v", ia, err)
} else { } else {
log.Printf("Importer %s finished.", ia.im.name) log.Printf("%v finished.", ia)
} }
ia.mu.Lock() ia.mu.Lock()
defer ia.mu.Unlock() defer ia.mu.Unlock()
@ -886,6 +910,7 @@ func (ia *importerAcct) start() {
ia.stopped = false ia.stopped = false
ia.lastRunDone = time.Now() ia.lastRunDone = time.Now()
ia.lastRunErr = err ia.lastRunErr = err
go ia.maybeStart()
}() }()
} }