mirror of https://github.com/perkeep/perkeep.git
all: make log lines a bit more consistent
Change-Id: I06c5bbe072c9857ca3afbf97d14146b9cd96a49e
This commit is contained in:
parent
892dd7b0f3
commit
38d0075c3a
|
@ -277,7 +277,7 @@ func (im *imp) LongPoll(rctx *importer.RunContext) error {
|
|||
req.URL.RawQuery = form.Encode()
|
||||
req.Cancel = rctx.Context().Done()
|
||||
|
||||
log.Printf("Beginning twitter long poll...")
|
||||
log.Printf("twitter: beginning long poll, awaiting new tweets...")
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -292,17 +292,17 @@ func (im *imp) LongPoll(rctx *importer.RunContext) error {
|
|||
if line == "" || strings.HasPrefix(line, `{"friends`) {
|
||||
continue
|
||||
}
|
||||
log.Printf("Twitter long poll saw a tweet: %s", line)
|
||||
log.Printf("twitter: long poll saw activity")
|
||||
return nil
|
||||
}
|
||||
if err := bs.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
return errors.New("got EOF without a tweet")
|
||||
return errors.New("twitter: got EOF without a tweet")
|
||||
}
|
||||
|
||||
func (r *run) errorf(format string, args ...interface{}) {
|
||||
log.Printf(format, args...)
|
||||
log.Printf("twitter: "+format, args...)
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.anyErr = true
|
||||
|
@ -336,7 +336,7 @@ func (r *run) importTweets(userID string) error {
|
|||
for continueRequests {
|
||||
select {
|
||||
case <-r.Context().Done():
|
||||
r.errorf("Twitter importer: interrupted")
|
||||
r.errorf("interrupted")
|
||||
return r.Context().Err()
|
||||
default:
|
||||
}
|
||||
|
@ -344,10 +344,10 @@ func (r *run) importTweets(userID string) error {
|
|||
var resp []*apiTweetItem
|
||||
var err error
|
||||
if maxId == "" {
|
||||
log.Printf("Fetching tweets for userid %s", userID)
|
||||
log.Printf("twitter: fetching tweets for userid %s", userID)
|
||||
err = r.doAPI(&resp, userTimeLineAPIPath, attrs...)
|
||||
} else {
|
||||
log.Printf("Fetching tweets for userid %s with max ID %s", userID, maxId)
|
||||
log.Printf("twitter: fetching tweets for userid %s with max ID %s", userID, maxId)
|
||||
err = r.doAPI(&resp, userTimeLineAPIPath,
|
||||
append(attrs, "max_id", maxId)...)
|
||||
}
|
||||
|
@ -383,7 +383,7 @@ func (r *run) importTweets(userID string) error {
|
|||
allDupMu.Unlock()
|
||||
}
|
||||
if err != nil {
|
||||
r.errorf("Twitter importer: error importing tweet %s %v", tweet.Id, err)
|
||||
r.errorf("error importing tweet %s %v", tweet.Id, err)
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
@ -392,14 +392,14 @@ func (r *run) importTweets(userID string) error {
|
|||
return err
|
||||
}
|
||||
numTweets += newThisBatch
|
||||
log.Printf("Imported %d tweets this batch; %d total.", newThisBatch, numTweets)
|
||||
log.Printf("twitter: imported %d tweets this batch; %d total.", newThisBatch, numTweets)
|
||||
if r.incremental && allDups {
|
||||
log.Printf("twitter incremental import found end batch")
|
||||
log.Printf("twitter: incremental import found end batch")
|
||||
break
|
||||
}
|
||||
continueRequests = newThisBatch > 0
|
||||
}
|
||||
log.Printf("Successfully did full run of importing %d tweets", numTweets)
|
||||
log.Printf("twitter: successfully did full run of importing %d tweets", numTweets)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -425,7 +425,7 @@ func tweetsFromZipFile(zf *zip.File) (tweets []*zipTweetItem, err error) {
|
|||
}
|
||||
|
||||
func (r *run) importTweetsFromZip(userID string, zr *zip.Reader) error {
|
||||
log.Printf("Processing zip file with %d files", len(zr.File))
|
||||
log.Printf("twitter: processing zip file with %d files", len(zr.File))
|
||||
|
||||
tweetsNode, err := r.getTopLevelNode("tweets")
|
||||
if err != nil {
|
||||
|
@ -548,7 +548,7 @@ func (r *run) importTweet(parent *importer.Object, tweet tweetItem, viaAPI bool)
|
|||
return false, fmt.Errorf("HTTP status %d fetching %s for tweet %s", res.StatusCode, mediaURL, url)
|
||||
}
|
||||
if !viaAPI {
|
||||
log.Printf("For zip tweet %s, reading %v", url, mediaURL)
|
||||
log.Printf("twitter: for zip tweet %s, reading %v", url, mediaURL)
|
||||
}
|
||||
fileRef, err := schema.WriteFileFromReader(r.Context(), r.Host.Target(), filename, res.Body)
|
||||
res.Body.Close()
|
||||
|
@ -559,7 +559,7 @@ func (r *run) importTweet(parent *importer.Object, tweet tweetItem, viaAPI bool)
|
|||
if i == 0 {
|
||||
attrs = append(attrs, "camliContentImage", fileRef.String())
|
||||
}
|
||||
log.Printf("Slurped %s as %s for tweet %s (%v)", mediaURL, fileRef.String(), url, tweetNode.PermanodeRef())
|
||||
log.Printf("twitter: slurped %s as %s for tweet %s (%v)", mediaURL, fileRef.String(), url, tweetNode.PermanodeRef())
|
||||
gotMedia = true
|
||||
break
|
||||
}
|
||||
|
@ -570,7 +570,7 @@ func (r *run) importTweet(parent *importer.Object, tweet tweetItem, viaAPI bool)
|
|||
|
||||
changes, err := tweetNode.SetAttrs2(attrs...)
|
||||
if err == nil && changes {
|
||||
log.Printf("Imported tweet %s", url)
|
||||
log.Printf("twitter: imported tweet %s", url)
|
||||
}
|
||||
return !changes, err
|
||||
}
|
||||
|
@ -650,7 +650,7 @@ func (im *imp) ServeCallback(w http.ResponseWriter, r *http.Request, ctx *import
|
|||
return
|
||||
}
|
||||
if tempToken != r.FormValue("oauth_token") {
|
||||
log.Printf("unexpected oauth_token: got %v, want %v", r.FormValue("oauth_token"), tempToken)
|
||||
log.Printf("twitter: unexpected oauth_token: got %v, want %v", r.FormValue("oauth_token"), tempToken)
|
||||
httputil.BadRequestError(w, "unexpected oauth_token")
|
||||
return
|
||||
}
|
||||
|
|
|
@ -110,6 +110,10 @@ type Corpus struct {
|
|||
ss []string
|
||||
}
|
||||
|
||||
func (c *Corpus) logf(format string, args ...interface{}) {
|
||||
log.Printf("index/corpus: "+format, args...)
|
||||
}
|
||||
|
||||
// blobMatches reports whether br is in the set.
|
||||
func (srs SignerRefSet) blobMatches(br blob.Ref) bool {
|
||||
for _, v := range srs {
|
||||
|
@ -427,8 +431,8 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error {
|
|||
var ms0 *runtime.MemStats
|
||||
if logCorpusStats {
|
||||
ms0 = memstats()
|
||||
log.Printf("Slurping corpus to memory from index...")
|
||||
log.Printf("Slurping corpus to memory from index... (1/%d: meta rows)", len(slurpPrefixes))
|
||||
c.logf("loading into memory...")
|
||||
c.logf("loading into memory... (1/%d: meta rows)", len(slurpPrefixes))
|
||||
}
|
||||
|
||||
scanmu := new(sync.Mutex)
|
||||
|
@ -452,7 +456,7 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error {
|
|||
var grp syncutil.Group
|
||||
for i, prefix := range slurpPrefixes[2:] {
|
||||
if logCorpusStats {
|
||||
log.Printf("Slurping corpus to memory from index... (%d/%d: prefix %q)", i+2, len(slurpPrefixes),
|
||||
c.logf("loading into memory... (%d/%d: prefix %q)", i+2, len(slurpPrefixes),
|
||||
prefix[:len(prefix)-1])
|
||||
}
|
||||
prefix := prefix
|
||||
|
@ -492,7 +496,7 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error {
|
|||
if ms1.Alloc < ms0.Alloc {
|
||||
memUsed = 0
|
||||
}
|
||||
log.Printf("Corpus stats: %.3f MiB mem: %d blobs (%.3f GiB) (%d schema (%d permanode, %d file (%d image), ...)",
|
||||
c.logf("stats: %.3f MiB mem: %d blobs (%.3f GiB) (%d schema (%d permanode, %d file (%d image), ...)",
|
||||
float64(memUsed)/(1<<20),
|
||||
len(c.blobs),
|
||||
float64(c.sumBlobBytes)/(1<<30),
|
||||
|
@ -500,7 +504,7 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error {
|
|||
len(c.permanodes),
|
||||
len(c.files),
|
||||
len(c.imageInfo))
|
||||
log.Printf("Corpus scanning CPU usage: %v", cpu)
|
||||
c.logf("scanning CPU usage: %v", cpu)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
@ -552,7 +556,7 @@ func (c *Corpus) scanPrefix(mu *sync.Mutex, s sorted.KeyValue, prefix string) (e
|
|||
if typeKey == keySignerKeyID.name {
|
||||
signerBlobRef, ok := blob.Parse(strings.TrimPrefix(it.Key(), keySignerKeyID.name+":"))
|
||||
if !ok {
|
||||
log.Printf("Bogus signer blob in %v row: %q", keySignerKeyID.name, it.Key())
|
||||
c.logf("WARNING: bogus signer blob in %v row: %q", keySignerKeyID.name, it.Key())
|
||||
continue
|
||||
}
|
||||
if err := c.addKeyID(&mutationMap{
|
||||
|
@ -569,7 +573,7 @@ func (c *Corpus) scanPrefix(mu *sync.Mutex, s sorted.KeyValue, prefix string) (e
|
|||
}
|
||||
if logCorpusStats {
|
||||
d := time.Since(t0)
|
||||
log.Printf("Scanned prefix %q: %d rows, %v", prefix[:len(prefix)-1], n, d)
|
||||
c.logf("scanned prefix %q: %d rows, %v", prefix[:len(prefix)-1], n, d)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -98,6 +98,10 @@ var (
|
|||
_ Interface = (*Index)(nil)
|
||||
)
|
||||
|
||||
func (x *Index) logf(format string, args ...interface{}) {
|
||||
log.Printf("index: "+format, args...)
|
||||
}
|
||||
|
||||
var aboutToReindex = false
|
||||
|
||||
// SignerRefSet is the set of all blob Refs (of different hashes) that represent
|
||||
|
@ -241,13 +245,13 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) {
|
|||
if x.schemaVersion() != 4 || requiredSchemaVersion != 5 {
|
||||
panic("fixMissingWholeRef should only be used when upgrading from v4 to v5 of the index schema")
|
||||
}
|
||||
log.Println("index: fixing the missing wholeRef in the fileInfo rows...")
|
||||
x.logf("fixing the missing wholeRef in the fileInfo rows...")
|
||||
defer func() {
|
||||
if err != nil {
|
||||
log.Printf("index: fixing the fileInfo rows failed: %v", err)
|
||||
x.logf("fixing the fileInfo rows failed: %v", err)
|
||||
return
|
||||
}
|
||||
log.Print("index: successfully fixed wholeRef in FileInfo rows.")
|
||||
x.logf("successfully fixed wholeRef in FileInfo rows.")
|
||||
}()
|
||||
|
||||
// first build a reverted keyWholeToFileRef map, so we can get the wholeRef from the fileRef easily.
|
||||
|
@ -284,7 +288,7 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) {
|
|||
for it.Next() {
|
||||
select {
|
||||
case <-t.C:
|
||||
log.Printf("Recorded %d missing wholeRef that we'll try to fix, and %d that we can't fix.", fixedEntries, missedEntries)
|
||||
x.logf("recorded %d missing wholeRef that we'll try to fix, and %d that we can't fix.", fixedEntries, missedEntries)
|
||||
default:
|
||||
}
|
||||
br, ok := blob.ParseBytes(it.KeyBytes()[len(keyPrefix):])
|
||||
|
@ -294,7 +298,7 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) {
|
|||
wholeRef, ok := fileRefToWholeRef[br]
|
||||
if !ok {
|
||||
missedEntries++
|
||||
log.Printf("WARNING: wholeRef for %v not found in index. You should probably rebuild the whole index.", br)
|
||||
x.logf("WARNING: wholeRef for %v not found in index. You should probably rebuild the whole index.", br)
|
||||
continue
|
||||
}
|
||||
valPart := strutil.AppendSplitN(valA[:0], it.Value(), "|", 3)
|
||||
|
@ -310,7 +314,7 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) {
|
|||
// For the "production" migrations between 0.8 and 0.9, the index should not have any wholeRef
|
||||
// in the keyFileInfo entries. So if something goes wrong and is somehow linked to that happening,
|
||||
// I'd like to know about it, hence the logging.
|
||||
log.Printf("%v: %v already has a wholeRef, not fixing it", it.Key(), it.Value())
|
||||
x.logf("%v: %v already has a wholeRef, not fixing it", it.Key(), it.Value())
|
||||
continue
|
||||
}
|
||||
size, err := strconv.Atoi(size_s)
|
||||
|
@ -323,7 +327,7 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) {
|
|||
if err := it.Close(); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("Starting to commit the missing wholeRef fixes (%d entries) now, this can take a while.", fixedEntries)
|
||||
x.logf("starting to commit the missing wholeRef fixes (%d entries) now, this can take a while.", fixedEntries)
|
||||
bm := x.s.BeginBatch()
|
||||
for k, v := range mutations {
|
||||
bm.Set(k, v)
|
||||
|
@ -333,7 +337,7 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) {
|
|||
return err
|
||||
}
|
||||
if missedEntries > 0 {
|
||||
log.Printf("Some missing wholeRef entries were not fixed (%d), you should do a full reindex.", missedEntries)
|
||||
x.logf("some missing wholeRef entries were not fixed (%d), you should do a full reindex.", missedEntries)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -547,8 +551,11 @@ func (x *Index) Reindex() error {
|
|||
// if any of them is not found. It only returns an error if something went wrong
|
||||
// during the enumeration.
|
||||
func (x *Index) integrityCheck(timeout time.Duration) error {
|
||||
log.Print("Starting index integrity check.")
|
||||
defer log.Print("Index integrity check done.")
|
||||
t0 := time.Now()
|
||||
x.logf("starting integrity check...")
|
||||
defer func() {
|
||||
x.logf("integrity check done (after %v)", time.Since(t0).Round(10*time.Millisecond))
|
||||
}()
|
||||
if x.blobSource == nil {
|
||||
return errors.New("index: can't check sanity of index: no blobSource")
|
||||
}
|
||||
|
@ -581,7 +588,7 @@ func (x *Index) integrityCheck(timeout time.Duration) error {
|
|||
}
|
||||
if len(notFound) > 0 {
|
||||
// TODO(mpl): at least on GCE, display that message and maybe more on a web UI page as well.
|
||||
log.Printf("WARNING: sanity checking of the index found %d non-indexed blobs out of %d tested blobs. Reindexing is advised.", len(notFound), len(notFound)+len(seen))
|
||||
x.logf("WARNING: sanity checking of the index found %d non-indexed blobs out of %d tested blobs. Reindexing is advised.", len(notFound), len(notFound)+len(seen))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -772,11 +779,11 @@ func (x *Index) GetRecentPermanodes(ctx context.Context, dest chan<- camtypes.Re
|
|||
|
||||
keyId, err := x.KeyId(ctx, owner)
|
||||
if err == sorted.ErrNotFound {
|
||||
log.Printf("No recent permanodes because keyId for owner %v not found", owner)
|
||||
x.logf("no recent permanodes because keyId for owner %v not found", owner)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
log.Printf("Error fetching keyId for owner %v: %v", owner, err)
|
||||
x.logf("error fetching keyId for owner %v: %v", owner, err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -1360,7 +1367,7 @@ func (x *Index) GetFileInfo(ctx context.Context, fileRef blob.Ref) (camtypes.Fil
|
|||
}
|
||||
valPart := strings.Split(iv, "|")
|
||||
if len(valPart) < 3 {
|
||||
log.Printf("index: bogus key %q = %q", ikey, iv)
|
||||
x.logf("bogus key %q = %q", ikey, iv)
|
||||
return camtypes.FileInfo{}, os.ErrNotExist
|
||||
}
|
||||
var wholeRef blob.Ref
|
||||
|
@ -1369,7 +1376,7 @@ func (x *Index) GetFileInfo(ctx context.Context, fileRef blob.Ref) (camtypes.Fil
|
|||
}
|
||||
size, err := strconv.ParseInt(valPart[0], 10, 64)
|
||||
if err != nil {
|
||||
log.Printf("index: bogus integer at position 0 in key %q = %q", ikey, iv)
|
||||
x.logf("bogus integer at position 0 in key %q = %q", ikey, iv)
|
||||
return camtypes.FileInfo{}, os.ErrNotExist
|
||||
}
|
||||
fileName := urld(valPart[1])
|
||||
|
|
|
@ -116,8 +116,12 @@ func (sh *SyncHandler) String() string {
|
|||
return fmt.Sprintf("[SyncHandler %v -> %v]", sh.fromName, sh.toName)
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) fromToString() string {
|
||||
return fmt.Sprintf("%v -> %v", sh.fromName, sh.toName)
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) logf(format string, args ...interface{}) {
|
||||
log.Printf(sh.String()+" "+format, args...)
|
||||
log.Printf("sync: "+sh.fromToString()+": "+format, args...)
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -353,7 +357,7 @@ func (sh *SyncHandler) readQueueToMemory() error {
|
|||
sh.addBlobToCopy(sb)
|
||||
n++
|
||||
}
|
||||
sh.logf("Added %d pending blobs from sync queue to pending list", n)
|
||||
sh.logf("added %d pending blobs from sync queue to pending list", n)
|
||||
return <-errc
|
||||
}
|
||||
|
||||
|
@ -730,7 +734,7 @@ func (sh *SyncHandler) startFullValidation() {
|
|||
}
|
||||
sh.mu.Unlock()
|
||||
|
||||
sh.logf("Running full validation; determining validation shards...")
|
||||
sh.logf("running full validation; determining validation shards...")
|
||||
shards := sh.shardPrefixes()
|
||||
|
||||
sh.mu.Lock()
|
||||
|
@ -752,7 +756,7 @@ func (sh *SyncHandler) runFullValidation() {
|
|||
wg.Add(len(shards))
|
||||
sh.mu.Unlock()
|
||||
|
||||
sh.logf("full validation beginning with %d shards", len(shards))
|
||||
sh.logf("full validation beginning with %d shards...", len(shards))
|
||||
|
||||
const maxShardWorkers = 30 // arbitrary
|
||||
gate := syncutil.NewGate(maxShardWorkers)
|
||||
|
@ -767,7 +771,7 @@ func (sh *SyncHandler) runFullValidation() {
|
|||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
sh.logf("Validation complete")
|
||||
sh.logf("validation complete")
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) validateShardPrefix(pfx string) (err error) {
|
||||
|
@ -1048,22 +1052,22 @@ func storageDesc(v interface{}) string {
|
|||
// For now, don't implement them. Wait until we need them.
|
||||
|
||||
func (sh *SyncHandler) Fetch(context.Context, blob.Ref) (file io.ReadCloser, size uint32, err error) {
|
||||
panic("Unimplemented blobserver.Fetch called")
|
||||
panic("unimplemented blobserver.Fetch called")
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error {
|
||||
sh.logf("Unexpected StatBlobs call")
|
||||
sh.logf("unexpected StatBlobs call")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
|
||||
defer close(dest)
|
||||
sh.logf("Unexpected EnumerateBlobs call")
|
||||
sh.logf("unexpected EnumerateBlobs call")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
|
||||
panic("Unimplemented RemoveBlobs")
|
||||
panic("unimplemented RemoveBlobs")
|
||||
}
|
||||
|
||||
var errStopEnumerating = errors.New("sentinel error: reached the hourly compare quota")
|
||||
|
|
|
@ -243,12 +243,12 @@ func setupTLS(ws *webserver.Server, config *serverinit.Config, hostname string)
|
|||
HostPolicy: autocert.HostWhitelist(hostname),
|
||||
Cache: autocert.DirCache(osutil.DefaultLetsEncryptCache()),
|
||||
}
|
||||
log.Printf("Starting to listen on http://0.0.0.0:80 (for Let's Encrypt challenges)")
|
||||
// TODO(mpl): let the http-01 port be configurable, for when behind a proxy
|
||||
go func() {
|
||||
log.Fatalf("Could not start server for http-01 challenge: %v",
|
||||
log.Fatalf("Could not start ACME http-014 challenge server: %v",
|
||||
http.ListenAndServe(":http", m.HTTPHandler(nil)))
|
||||
}()
|
||||
log.Printf("TLS enabled, with Let's Encrypt for %v", hostname)
|
||||
ws.SetTLS(webserver.TLSSetup{
|
||||
CertManager: m.GetCertificate,
|
||||
})
|
||||
|
@ -817,7 +817,7 @@ func Main(up chan<- struct{}, down <-chan struct{}) {
|
|||
log.Printf("Could not reach app %v: %v", appName, err)
|
||||
}
|
||||
}
|
||||
log.Printf("Available on %s", urlToOpen)
|
||||
log.Printf("server: available at %s", urlToOpen)
|
||||
|
||||
// Block forever, except during tests.
|
||||
up <- struct{}{}
|
||||
|
|
Loading…
Reference in New Issue