diff --git a/pkg/blobserver/blobpacked/blobpacked.go b/pkg/blobserver/blobpacked/blobpacked.go index fda0c9c12..9cf933007 100644 --- a/pkg/blobserver/blobpacked/blobpacked.go +++ b/pkg/blobserver/blobpacked/blobpacked.go @@ -96,7 +96,6 @@ import ( "sync" "time" - "go4.org/jsonconfig" "perkeep.org/internal/pools" "perkeep.org/pkg/blob" "perkeep.org/pkg/blobserver" @@ -105,6 +104,7 @@ import ( "perkeep.org/pkg/schema" "perkeep.org/pkg/sorted" + "go4.org/jsonconfig" "go4.org/strutil" "go4.org/syncutil" ) @@ -139,27 +139,34 @@ const ( zipManifestPath = "camlistore/camlistore-pack-manifest.json" ) +// RecoveryMode is the mode in which the blobpacked server starts. +type RecoveryMode int + +const ( + // NoRecovery means blobpacked does not attempt to repair its index on startup. + // It is the default. + NoRecovery RecoveryMode = iota + // FastRecovery populates the blobpacked index, without erasing any existing one. + FastRecovery + // FullRecovery erases the existing blobpacked index, then rebuilds it. + FullRecovery +) + var ( recoveryMu sync.Mutex - recovery bool + recovery = NoRecovery ) // TODO(mpl): make SetRecovery a method of type storage if we ever export it. -// SetRecovery notes that the user ran the camlistored binary with the --recovery flag. -// It means that any blobpacked storage subsequently initialized will -// automatically start with rebuilding its meta index of zip files. -func SetRecovery() { +// SetRecovery sets the recovery mode for the blobpacked package. +// If set to one of the modes other than NoRecovery, it means that any +// blobpacked storage subsequently initialized will automatically start with +// rebuilding its meta index of zip files, in accordance with the selected mode. +func SetRecovery(mode RecoveryMode) { recoveryMu.Lock() defer recoveryMu.Unlock() - recovery = true -} - -// UnsetRecovery resets the recovery state to false if set by SetRecovery. -func UnsetRecovery() { - recoveryMu.Lock() - defer recoveryMu.Unlock() - recovery = false + recovery = mode } type subFetcherStorage interface { @@ -173,7 +180,7 @@ type storage struct { // meta key -> value rows are: // - // For logical blobs packed within a large blog, "b:" prefix: + // For logical blobs packed within a large blob, "b:" prefix: // b:sha1-xxxx -> " " // // For wholerefs: (wholeMetaPrefix) @@ -275,31 +282,106 @@ func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storag recoveryMu.Lock() defer recoveryMu.Unlock() - if recovery { + var newKv func() (sorted.KeyValue, error) + switch recovery { + case FastRecovery: + newKv = func() (sorted.KeyValue, error) { + return sorted.NewKeyValue(metaConf) + } + case FullRecovery: + newKv = func() (sorted.KeyValue, error) { + kv, err := sorted.NewKeyValue(metaConf) + if err != nil { + return nil, err + } + wiper, ok := kv.(sorted.Wiper) + if !ok { + return nil, fmt.Errorf("blobpacked meta index of type %T needs to be wiped, but does not support automatic wiping. It should be removed manually.", kv) + } + if err := wiper.Wipe(); err != nil { + return nil, fmt.Errorf("blobpacked meta index of type %T could not be wiped: %v", kv, err) + } + return kv, nil + } + } + if newKv != nil { + // i.e. we're in one of the recovery modes log.Print("Starting recovery of blobpacked index") if err := meta.Close(); err != nil { return nil, err } - if err := sto.reindex(context.TODO(), func() (sorted.KeyValue, error) { - return sorted.NewKeyValue(metaConf) - }); err != nil { + if err := sto.reindex(context.TODO(), newKv); err != nil { return nil, err } return sto, nil } + // Check for a weird state: zip files exist, but no metadata about them // is recorded. This is probably a corrupt state, and the user likely // wants to recover. if !sto.anyMeta() && sto.anyZipPacks() { if env.OnGCE() { - log.Fatal("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please switch to recovery mode: add the \"camlistore-recovery = true\" key/value to the Custom metadata of your instance. And restart the instance.") + // TODO(mpl): make web UI page/mode that informs about this error. + log.Fatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", FastRecovery) } - log.Fatal("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please re-start in recovery mode with -recovery.") + log.Fatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please re-start in recovery mode with -recovery=%d", FastRecovery) + } + + if mode, err := sto.checkLargeIntegrity(); err != nil { + if mode <= NoRecovery { + log.Fatal(err) + } + if env.OnGCE() { + // TODO(mpl): make web UI page/mode that informs about this error. + log.Fatalf("Error: %v. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", err, mode) + } + log.Fatalf("Error: %v. Please re-start in recovery mode with -recovery=%d", err, mode) } return sto, nil } +// checkLargeIntegrity verifies that all large blobs in the large storage are +// indexed in meta, and vice-versa, that all rows in meta referring to a large blob +// correspond to an existing large blob in the large storage. If any of the above +// is not true, it returns the recovery mode that should be used to fix the +// problem, as well as the error detailing the problem. It does not perform any +// check about the contents of the large blobs themselves. +func (s *storage) checkLargeIntegrity() (RecoveryMode, error) { + t := s.meta.Find(blobMetaPrefix, blobMetaPrefixLimit) + inMeta := make(map[blob.Ref]bool) + for t.Next() { + meta, err := parseMetaRow(t.ValueBytes()) + if err != nil { + t.Close() + return FullRecovery, fmt.Errorf("corrupted blobpacked meta: %v") + } + inMeta[meta.largeRef] = true + } + if err := t.Close(); err != nil { + return NoRecovery, err + } + inLarge := make(map[blob.Ref]bool) + if err := blobserver.EnumerateAllFrom(context.Background(), s.large, "", func(sb blob.SizedRef) error { + if _, ok := inMeta[sb.Ref]; !ok { + return fmt.Errorf("packed blobRef %v is not in meta for blobpacked storage", sb.Ref) + } + inLarge[sb.Ref] = true + delete(inMeta, sb.Ref) + return nil + }); err != nil { + return FastRecovery, err + } + // we could simply check whether len(inMeta) > 0, but this below allows us to be + // more precise about which blob is problematic. + for k, _ := range inMeta { + if _, ok := inLarge[k]; !ok { + return FullRecovery, fmt.Errorf("packed blobRef %v in blobpacked meta does not actually exist in large storage", k) + } + } + return NoRecovery, nil +} + // wholeMetaPrefixInfo is the info needed to write the wholeMetaPrefix entries // when Reindexing. For a given file, spread over several zips, each zip has a // corresponding wholeMetaPrefixInfo. The wholeMetaPrefix entries pertaining to a diff --git a/server/camlistored/camlistored.go b/server/camlistored/camlistored.go index c6da21fe8..dbdc9ee02 100644 --- a/server/camlistored/camlistored.go +++ b/server/camlistored/camlistored.go @@ -109,7 +109,7 @@ var ( flagListen = flag.String("listen", "", "host:port to listen on, or :0 to auto-select. If blank, the value in the config will be used instead.") flagOpenBrowser = flag.Bool("openbrowser", true, "Launches the UI on startup") flagReindex = flag.Bool("reindex", false, "Reindex all blobs on startup") - flagRecovery = flag.Bool("recovery", false, "Recovery mode: rebuild the blobpacked meta index. The tasks performed by the recovery mode might change in the future.") + flagRecovery = flag.Int("recovery", 0, "Recovery mode: it corresponds for now to the recovery modes of the blobpacked package. Which means: 0 does nothing, 1 rebuilds the blobpacked index without erasing it, and 2 wipes the blobpacked index before rebuilding it.") flagSyslog = flag.Bool("syslog", false, "Log everything only to syslog. It is an error to use this flag on windows.") flagPollParent bool ) @@ -676,8 +676,8 @@ func setupLogging() io.Closer { } func checkRecovery() { - if *flagRecovery { - blobpacked.SetRecovery() + if blobpacked.RecoveryMode(*flagRecovery) > blobpacked.NoRecovery { + blobpacked.SetRecovery(blobpacked.RecoveryMode(*flagRecovery)) return } if !env.OnGCE() { @@ -693,12 +693,12 @@ func checkRecovery() { if recovery == "" { return } - doRecovery, err := strconv.ParseBool(recovery) + mode, err := strconv.Atoi(recovery) if err != nil { - log.Printf("invalid bool value for \"camlistore-recovery\": %v", err) + log.Printf("invalid int value for \"camlistore-recovery\": %v", err) } - if doRecovery { - blobpacked.SetRecovery() + if blobpacked.RecoveryMode(mode) > blobpacked.NoRecovery { + blobpacked.SetRecovery(blobpacked.RecoveryMode(mode)) } }