diff --git a/pkg/importer/twitter/twitter.go b/pkg/importer/twitter/twitter.go index 376193344..b6b9b72da 100644 --- a/pkg/importer/twitter/twitter.go +++ b/pkg/importer/twitter/twitter.go @@ -277,7 +277,7 @@ func (im *imp) LongPoll(rctx *importer.RunContext) error { req.URL.RawQuery = form.Encode() req.Cancel = rctx.Context().Done() - log.Printf("Beginning twitter long poll...") + log.Printf("twitter: beginning long poll, awaiting new tweets...") res, err := http.DefaultClient.Do(req) if err != nil { return err @@ -292,17 +292,17 @@ func (im *imp) LongPoll(rctx *importer.RunContext) error { if line == "" || strings.HasPrefix(line, `{"friends`) { continue } - log.Printf("Twitter long poll saw a tweet: %s", line) + log.Printf("twitter: long poll saw activity") return nil } if err := bs.Err(); err != nil { return err } - return errors.New("got EOF without a tweet") + return errors.New("twitter: got EOF without a tweet") } func (r *run) errorf(format string, args ...interface{}) { - log.Printf(format, args...) + log.Printf("twitter: "+format, args...) r.mu.Lock() defer r.mu.Unlock() r.anyErr = true @@ -336,7 +336,7 @@ func (r *run) importTweets(userID string) error { for continueRequests { select { case <-r.Context().Done(): - r.errorf("Twitter importer: interrupted") + r.errorf("interrupted") return r.Context().Err() default: } @@ -344,10 +344,10 @@ func (r *run) importTweets(userID string) error { var resp []*apiTweetItem var err error if maxId == "" { - log.Printf("Fetching tweets for userid %s", userID) + log.Printf("twitter: fetching tweets for userid %s", userID) err = r.doAPI(&resp, userTimeLineAPIPath, attrs...) } else { - log.Printf("Fetching tweets for userid %s with max ID %s", userID, maxId) + log.Printf("twitter: fetching tweets for userid %s with max ID %s", userID, maxId) err = r.doAPI(&resp, userTimeLineAPIPath, append(attrs, "max_id", maxId)...) } @@ -383,7 +383,7 @@ func (r *run) importTweets(userID string) error { allDupMu.Unlock() } if err != nil { - r.errorf("Twitter importer: error importing tweet %s %v", tweet.Id, err) + r.errorf("error importing tweet %s %v", tweet.Id, err) } return err }) @@ -392,14 +392,14 @@ func (r *run) importTweets(userID string) error { return err } numTweets += newThisBatch - log.Printf("Imported %d tweets this batch; %d total.", newThisBatch, numTweets) + log.Printf("twitter: imported %d tweets this batch; %d total.", newThisBatch, numTweets) if r.incremental && allDups { - log.Printf("twitter incremental import found end batch") + log.Printf("twitter: incremental import found end batch") break } continueRequests = newThisBatch > 0 } - log.Printf("Successfully did full run of importing %d tweets", numTweets) + log.Printf("twitter: successfully did full run of importing %d tweets", numTweets) return nil } @@ -425,7 +425,7 @@ func tweetsFromZipFile(zf *zip.File) (tweets []*zipTweetItem, err error) { } func (r *run) importTweetsFromZip(userID string, zr *zip.Reader) error { - log.Printf("Processing zip file with %d files", len(zr.File)) + log.Printf("twitter: processing zip file with %d files", len(zr.File)) tweetsNode, err := r.getTopLevelNode("tweets") if err != nil { @@ -548,7 +548,7 @@ func (r *run) importTweet(parent *importer.Object, tweet tweetItem, viaAPI bool) return false, fmt.Errorf("HTTP status %d fetching %s for tweet %s", res.StatusCode, mediaURL, url) } if !viaAPI { - log.Printf("For zip tweet %s, reading %v", url, mediaURL) + log.Printf("twitter: for zip tweet %s, reading %v", url, mediaURL) } fileRef, err := schema.WriteFileFromReader(r.Context(), r.Host.Target(), filename, res.Body) res.Body.Close() @@ -559,7 +559,7 @@ func (r *run) importTweet(parent *importer.Object, tweet tweetItem, viaAPI bool) if i == 0 { attrs = append(attrs, "camliContentImage", fileRef.String()) } - log.Printf("Slurped %s as %s for tweet %s (%v)", mediaURL, fileRef.String(), url, tweetNode.PermanodeRef()) + log.Printf("twitter: slurped %s as %s for tweet %s (%v)", mediaURL, fileRef.String(), url, tweetNode.PermanodeRef()) gotMedia = true break } @@ -570,7 +570,7 @@ func (r *run) importTweet(parent *importer.Object, tweet tweetItem, viaAPI bool) changes, err := tweetNode.SetAttrs2(attrs...) if err == nil && changes { - log.Printf("Imported tweet %s", url) + log.Printf("twitter: imported tweet %s", url) } return !changes, err } @@ -650,7 +650,7 @@ func (im *imp) ServeCallback(w http.ResponseWriter, r *http.Request, ctx *import return } if tempToken != r.FormValue("oauth_token") { - log.Printf("unexpected oauth_token: got %v, want %v", r.FormValue("oauth_token"), tempToken) + log.Printf("twitter: unexpected oauth_token: got %v, want %v", r.FormValue("oauth_token"), tempToken) httputil.BadRequestError(w, "unexpected oauth_token") return } diff --git a/pkg/index/corpus.go b/pkg/index/corpus.go index 7aef6d1a2..4938786cf 100644 --- a/pkg/index/corpus.go +++ b/pkg/index/corpus.go @@ -110,6 +110,10 @@ type Corpus struct { ss []string } +func (c *Corpus) logf(format string, args ...interface{}) { + log.Printf("index/corpus: "+format, args...) +} + // blobMatches reports whether br is in the set. func (srs SignerRefSet) blobMatches(br blob.Ref) bool { for _, v := range srs { @@ -427,8 +431,8 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error { var ms0 *runtime.MemStats if logCorpusStats { ms0 = memstats() - log.Printf("Slurping corpus to memory from index...") - log.Printf("Slurping corpus to memory from index... (1/%d: meta rows)", len(slurpPrefixes)) + c.logf("loading into memory...") + c.logf("loading into memory... (1/%d: meta rows)", len(slurpPrefixes)) } scanmu := new(sync.Mutex) @@ -452,7 +456,7 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error { var grp syncutil.Group for i, prefix := range slurpPrefixes[2:] { if logCorpusStats { - log.Printf("Slurping corpus to memory from index... (%d/%d: prefix %q)", i+2, len(slurpPrefixes), + c.logf("loading into memory... (%d/%d: prefix %q)", i+2, len(slurpPrefixes), prefix[:len(prefix)-1]) } prefix := prefix @@ -492,7 +496,7 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error { if ms1.Alloc < ms0.Alloc { memUsed = 0 } - log.Printf("Corpus stats: %.3f MiB mem: %d blobs (%.3f GiB) (%d schema (%d permanode, %d file (%d image), ...)", + c.logf("stats: %.3f MiB mem: %d blobs (%.3f GiB) (%d schema (%d permanode, %d file (%d image), ...)", float64(memUsed)/(1<<20), len(c.blobs), float64(c.sumBlobBytes)/(1<<30), @@ -500,7 +504,7 @@ func (c *Corpus) scanFromStorage(s sorted.KeyValue) error { len(c.permanodes), len(c.files), len(c.imageInfo)) - log.Printf("Corpus scanning CPU usage: %v", cpu) + c.logf("scanning CPU usage: %v", cpu) } return nil @@ -552,7 +556,7 @@ func (c *Corpus) scanPrefix(mu *sync.Mutex, s sorted.KeyValue, prefix string) (e if typeKey == keySignerKeyID.name { signerBlobRef, ok := blob.Parse(strings.TrimPrefix(it.Key(), keySignerKeyID.name+":")) if !ok { - log.Printf("Bogus signer blob in %v row: %q", keySignerKeyID.name, it.Key()) + c.logf("WARNING: bogus signer blob in %v row: %q", keySignerKeyID.name, it.Key()) continue } if err := c.addKeyID(&mutationMap{ @@ -569,7 +573,7 @@ func (c *Corpus) scanPrefix(mu *sync.Mutex, s sorted.KeyValue, prefix string) (e } if logCorpusStats { d := time.Since(t0) - log.Printf("Scanned prefix %q: %d rows, %v", prefix[:len(prefix)-1], n, d) + c.logf("scanned prefix %q: %d rows, %v", prefix[:len(prefix)-1], n, d) } return nil } diff --git a/pkg/index/index.go b/pkg/index/index.go index 3b32593c8..86d794010 100644 --- a/pkg/index/index.go +++ b/pkg/index/index.go @@ -98,6 +98,10 @@ var ( _ Interface = (*Index)(nil) ) +func (x *Index) logf(format string, args ...interface{}) { + log.Printf("index: "+format, args...) +} + var aboutToReindex = false // SignerRefSet is the set of all blob Refs (of different hashes) that represent @@ -241,13 +245,13 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) { if x.schemaVersion() != 4 || requiredSchemaVersion != 5 { panic("fixMissingWholeRef should only be used when upgrading from v4 to v5 of the index schema") } - log.Println("index: fixing the missing wholeRef in the fileInfo rows...") + x.logf("fixing the missing wholeRef in the fileInfo rows...") defer func() { if err != nil { - log.Printf("index: fixing the fileInfo rows failed: %v", err) + x.logf("fixing the fileInfo rows failed: %v", err) return } - log.Print("index: successfully fixed wholeRef in FileInfo rows.") + x.logf("successfully fixed wholeRef in FileInfo rows.") }() // first build a reverted keyWholeToFileRef map, so we can get the wholeRef from the fileRef easily. @@ -284,7 +288,7 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) { for it.Next() { select { case <-t.C: - log.Printf("Recorded %d missing wholeRef that we'll try to fix, and %d that we can't fix.", fixedEntries, missedEntries) + x.logf("recorded %d missing wholeRef that we'll try to fix, and %d that we can't fix.", fixedEntries, missedEntries) default: } br, ok := blob.ParseBytes(it.KeyBytes()[len(keyPrefix):]) @@ -294,7 +298,7 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) { wholeRef, ok := fileRefToWholeRef[br] if !ok { missedEntries++ - log.Printf("WARNING: wholeRef for %v not found in index. You should probably rebuild the whole index.", br) + x.logf("WARNING: wholeRef for %v not found in index. You should probably rebuild the whole index.", br) continue } valPart := strutil.AppendSplitN(valA[:0], it.Value(), "|", 3) @@ -310,7 +314,7 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) { // For the "production" migrations between 0.8 and 0.9, the index should not have any wholeRef // in the keyFileInfo entries. So if something goes wrong and is somehow linked to that happening, // I'd like to know about it, hence the logging. - log.Printf("%v: %v already has a wholeRef, not fixing it", it.Key(), it.Value()) + x.logf("%v: %v already has a wholeRef, not fixing it", it.Key(), it.Value()) continue } size, err := strconv.Atoi(size_s) @@ -323,7 +327,7 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) { if err := it.Close(); err != nil { return err } - log.Printf("Starting to commit the missing wholeRef fixes (%d entries) now, this can take a while.", fixedEntries) + x.logf("starting to commit the missing wholeRef fixes (%d entries) now, this can take a while.", fixedEntries) bm := x.s.BeginBatch() for k, v := range mutations { bm.Set(k, v) @@ -333,7 +337,7 @@ func (x *Index) fixMissingWholeRef(fetcher blob.Fetcher) (err error) { return err } if missedEntries > 0 { - log.Printf("Some missing wholeRef entries were not fixed (%d), you should do a full reindex.", missedEntries) + x.logf("some missing wholeRef entries were not fixed (%d), you should do a full reindex.", missedEntries) } return nil } @@ -547,8 +551,11 @@ func (x *Index) Reindex() error { // if any of them is not found. It only returns an error if something went wrong // during the enumeration. func (x *Index) integrityCheck(timeout time.Duration) error { - log.Print("Starting index integrity check.") - defer log.Print("Index integrity check done.") + t0 := time.Now() + x.logf("starting integrity check...") + defer func() { + x.logf("integrity check done (after %v)", time.Since(t0).Round(10*time.Millisecond)) + }() if x.blobSource == nil { return errors.New("index: can't check sanity of index: no blobSource") } @@ -581,7 +588,7 @@ func (x *Index) integrityCheck(timeout time.Duration) error { } if len(notFound) > 0 { // TODO(mpl): at least on GCE, display that message and maybe more on a web UI page as well. - log.Printf("WARNING: sanity checking of the index found %d non-indexed blobs out of %d tested blobs. Reindexing is advised.", len(notFound), len(notFound)+len(seen)) + x.logf("WARNING: sanity checking of the index found %d non-indexed blobs out of %d tested blobs. Reindexing is advised.", len(notFound), len(notFound)+len(seen)) } return nil } @@ -772,11 +779,11 @@ func (x *Index) GetRecentPermanodes(ctx context.Context, dest chan<- camtypes.Re keyId, err := x.KeyId(ctx, owner) if err == sorted.ErrNotFound { - log.Printf("No recent permanodes because keyId for owner %v not found", owner) + x.logf("no recent permanodes because keyId for owner %v not found", owner) return nil } if err != nil { - log.Printf("Error fetching keyId for owner %v: %v", owner, err) + x.logf("error fetching keyId for owner %v: %v", owner, err) return err } @@ -1360,7 +1367,7 @@ func (x *Index) GetFileInfo(ctx context.Context, fileRef blob.Ref) (camtypes.Fil } valPart := strings.Split(iv, "|") if len(valPart) < 3 { - log.Printf("index: bogus key %q = %q", ikey, iv) + x.logf("bogus key %q = %q", ikey, iv) return camtypes.FileInfo{}, os.ErrNotExist } var wholeRef blob.Ref @@ -1369,7 +1376,7 @@ func (x *Index) GetFileInfo(ctx context.Context, fileRef blob.Ref) (camtypes.Fil } size, err := strconv.ParseInt(valPart[0], 10, 64) if err != nil { - log.Printf("index: bogus integer at position 0 in key %q = %q", ikey, iv) + x.logf("bogus integer at position 0 in key %q = %q", ikey, iv) return camtypes.FileInfo{}, os.ErrNotExist } fileName := urld(valPart[1]) diff --git a/pkg/server/sync.go b/pkg/server/sync.go index 54a4ca591..c13761e50 100644 --- a/pkg/server/sync.go +++ b/pkg/server/sync.go @@ -116,8 +116,12 @@ func (sh *SyncHandler) String() string { return fmt.Sprintf("[SyncHandler %v -> %v]", sh.fromName, sh.toName) } +func (sh *SyncHandler) fromToString() string { + return fmt.Sprintf("%v -> %v", sh.fromName, sh.toName) +} + func (sh *SyncHandler) logf(format string, args ...interface{}) { - log.Printf(sh.String()+" "+format, args...) + log.Printf("sync: "+sh.fromToString()+": "+format, args...) } func init() { @@ -353,7 +357,7 @@ func (sh *SyncHandler) readQueueToMemory() error { sh.addBlobToCopy(sb) n++ } - sh.logf("Added %d pending blobs from sync queue to pending list", n) + sh.logf("added %d pending blobs from sync queue to pending list", n) return <-errc } @@ -730,7 +734,7 @@ func (sh *SyncHandler) startFullValidation() { } sh.mu.Unlock() - sh.logf("Running full validation; determining validation shards...") + sh.logf("running full validation; determining validation shards...") shards := sh.shardPrefixes() sh.mu.Lock() @@ -752,7 +756,7 @@ func (sh *SyncHandler) runFullValidation() { wg.Add(len(shards)) sh.mu.Unlock() - sh.logf("full validation beginning with %d shards", len(shards)) + sh.logf("full validation beginning with %d shards...", len(shards)) const maxShardWorkers = 30 // arbitrary gate := syncutil.NewGate(maxShardWorkers) @@ -767,7 +771,7 @@ func (sh *SyncHandler) runFullValidation() { }() } wg.Wait() - sh.logf("Validation complete") + sh.logf("validation complete") } func (sh *SyncHandler) validateShardPrefix(pfx string) (err error) { @@ -1048,22 +1052,22 @@ func storageDesc(v interface{}) string { // For now, don't implement them. Wait until we need them. func (sh *SyncHandler) Fetch(context.Context, blob.Ref) (file io.ReadCloser, size uint32, err error) { - panic("Unimplemented blobserver.Fetch called") + panic("unimplemented blobserver.Fetch called") } func (sh *SyncHandler) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.SizedRef) error) error { - sh.logf("Unexpected StatBlobs call") + sh.logf("unexpected StatBlobs call") return nil } func (sh *SyncHandler) EnumerateBlobs(ctx context.Context, dest chan<- blob.SizedRef, after string, limit int) error { defer close(dest) - sh.logf("Unexpected EnumerateBlobs call") + sh.logf("unexpected EnumerateBlobs call") return nil } func (sh *SyncHandler) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error { - panic("Unimplemented RemoveBlobs") + panic("unimplemented RemoveBlobs") } var errStopEnumerating = errors.New("sentinel error: reached the hourly compare quota") diff --git a/server/perkeepd/camlistored.go b/server/perkeepd/camlistored.go index 58ea970fe..a40b21abe 100644 --- a/server/perkeepd/camlistored.go +++ b/server/perkeepd/camlistored.go @@ -243,12 +243,12 @@ func setupTLS(ws *webserver.Server, config *serverinit.Config, hostname string) HostPolicy: autocert.HostWhitelist(hostname), Cache: autocert.DirCache(osutil.DefaultLetsEncryptCache()), } + log.Printf("Starting to listen on http://0.0.0.0:80 (for Let's Encrypt challenges)") // TODO(mpl): let the http-01 port be configurable, for when behind a proxy go func() { - log.Fatalf("Could not start server for http-01 challenge: %v", + log.Fatalf("Could not start ACME http-014 challenge server: %v", http.ListenAndServe(":http", m.HTTPHandler(nil))) }() - log.Printf("TLS enabled, with Let's Encrypt for %v", hostname) ws.SetTLS(webserver.TLSSetup{ CertManager: m.GetCertificate, }) @@ -817,7 +817,7 @@ func Main(up chan<- struct{}, down <-chan struct{}) { log.Printf("Could not reach app %v: %v", appName, err) } } - log.Printf("Available on %s", urlToOpen) + log.Printf("server: available at %s", urlToOpen) // Block forever, except during tests. up <- struct{}{}