mirror of https://github.com/perkeep/perkeep.git
pk/blobserver/blobpacked: check large storage integrity
Check that all blobpacked zips are in the blobpacked meta, and vice-versa (that all entries in meta do exist as a zip in large storage). As the current recovery code would not fix the case of stale entries (large blobRefs in the blobpacked index of large blobs that don't exist anymore), this change also adds a new recovery mode, which wipes the existing blobpacked index, before rebuilding it. In doing so, the recovery var in blobpacked pkg, as well as the flagRecovery in camlistored.go have been changed to ints instead of bools, to take into account that we now have several modes of operation for recovery. Fixes #946 Change-Id: I1fe76b805af34933e362d70c9f27bfd5403e3f3a
This commit is contained in:
parent
ecfd0851f5
commit
0d96057201
|
@ -96,7 +96,6 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"go4.org/jsonconfig"
|
||||
"perkeep.org/internal/pools"
|
||||
"perkeep.org/pkg/blob"
|
||||
"perkeep.org/pkg/blobserver"
|
||||
|
@ -105,6 +104,7 @@ import (
|
|||
"perkeep.org/pkg/schema"
|
||||
"perkeep.org/pkg/sorted"
|
||||
|
||||
"go4.org/jsonconfig"
|
||||
"go4.org/strutil"
|
||||
"go4.org/syncutil"
|
||||
)
|
||||
|
@ -139,27 +139,34 @@ const (
|
|||
zipManifestPath = "camlistore/camlistore-pack-manifest.json"
|
||||
)
|
||||
|
||||
// RecoveryMode is the mode in which the blobpacked server starts.
|
||||
type RecoveryMode int
|
||||
|
||||
const (
|
||||
// NoRecovery means blobpacked does not attempt to repair its index on startup.
|
||||
// It is the default.
|
||||
NoRecovery RecoveryMode = iota
|
||||
// FastRecovery populates the blobpacked index, without erasing any existing one.
|
||||
FastRecovery
|
||||
// FullRecovery erases the existing blobpacked index, then rebuilds it.
|
||||
FullRecovery
|
||||
)
|
||||
|
||||
var (
|
||||
recoveryMu sync.Mutex
|
||||
recovery bool
|
||||
recovery = NoRecovery
|
||||
)
|
||||
|
||||
// TODO(mpl): make SetRecovery a method of type storage if we ever export it.
|
||||
|
||||
// SetRecovery notes that the user ran the camlistored binary with the --recovery flag.
|
||||
// It means that any blobpacked storage subsequently initialized will
|
||||
// automatically start with rebuilding its meta index of zip files.
|
||||
func SetRecovery() {
|
||||
// SetRecovery sets the recovery mode for the blobpacked package.
|
||||
// If set to one of the modes other than NoRecovery, it means that any
|
||||
// blobpacked storage subsequently initialized will automatically start with
|
||||
// rebuilding its meta index of zip files, in accordance with the selected mode.
|
||||
func SetRecovery(mode RecoveryMode) {
|
||||
recoveryMu.Lock()
|
||||
defer recoveryMu.Unlock()
|
||||
recovery = true
|
||||
}
|
||||
|
||||
// UnsetRecovery resets the recovery state to false if set by SetRecovery.
|
||||
func UnsetRecovery() {
|
||||
recoveryMu.Lock()
|
||||
defer recoveryMu.Unlock()
|
||||
recovery = false
|
||||
recovery = mode
|
||||
}
|
||||
|
||||
type subFetcherStorage interface {
|
||||
|
@ -173,7 +180,7 @@ type storage struct {
|
|||
|
||||
// meta key -> value rows are:
|
||||
//
|
||||
// For logical blobs packed within a large blog, "b:" prefix:
|
||||
// For logical blobs packed within a large blob, "b:" prefix:
|
||||
// b:sha1-xxxx -> "<size> <big-blobref> <offset_u32>"
|
||||
//
|
||||
// For wholerefs: (wholeMetaPrefix)
|
||||
|
@ -275,31 +282,106 @@ func newFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (blobserver.Storag
|
|||
|
||||
recoveryMu.Lock()
|
||||
defer recoveryMu.Unlock()
|
||||
if recovery {
|
||||
var newKv func() (sorted.KeyValue, error)
|
||||
switch recovery {
|
||||
case FastRecovery:
|
||||
newKv = func() (sorted.KeyValue, error) {
|
||||
return sorted.NewKeyValue(metaConf)
|
||||
}
|
||||
case FullRecovery:
|
||||
newKv = func() (sorted.KeyValue, error) {
|
||||
kv, err := sorted.NewKeyValue(metaConf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
wiper, ok := kv.(sorted.Wiper)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("blobpacked meta index of type %T needs to be wiped, but does not support automatic wiping. It should be removed manually.", kv)
|
||||
}
|
||||
if err := wiper.Wipe(); err != nil {
|
||||
return nil, fmt.Errorf("blobpacked meta index of type %T could not be wiped: %v", kv, err)
|
||||
}
|
||||
return kv, nil
|
||||
}
|
||||
}
|
||||
if newKv != nil {
|
||||
// i.e. we're in one of the recovery modes
|
||||
log.Print("Starting recovery of blobpacked index")
|
||||
if err := meta.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := sto.reindex(context.TODO(), func() (sorted.KeyValue, error) {
|
||||
return sorted.NewKeyValue(metaConf)
|
||||
}); err != nil {
|
||||
if err := sto.reindex(context.TODO(), newKv); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return sto, nil
|
||||
}
|
||||
|
||||
// Check for a weird state: zip files exist, but no metadata about them
|
||||
// is recorded. This is probably a corrupt state, and the user likely
|
||||
// wants to recover.
|
||||
if !sto.anyMeta() && sto.anyZipPacks() {
|
||||
if env.OnGCE() {
|
||||
log.Fatal("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please switch to recovery mode: add the \"camlistore-recovery = true\" key/value to the Custom metadata of your instance. And restart the instance.")
|
||||
// TODO(mpl): make web UI page/mode that informs about this error.
|
||||
log.Fatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", FastRecovery)
|
||||
}
|
||||
log.Fatal("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please re-start in recovery mode with -recovery.")
|
||||
log.Fatalf("Error: blobpacked storage detects non-zero packed zips, but no metadata. Please re-start in recovery mode with -recovery=%d", FastRecovery)
|
||||
}
|
||||
|
||||
if mode, err := sto.checkLargeIntegrity(); err != nil {
|
||||
if mode <= NoRecovery {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if env.OnGCE() {
|
||||
// TODO(mpl): make web UI page/mode that informs about this error.
|
||||
log.Fatalf("Error: %v. Please switch to recovery mode: add the \"camlistore-recovery = %d\" key/value to the Custom metadata of your instance. And restart the instance.", err, mode)
|
||||
}
|
||||
log.Fatalf("Error: %v. Please re-start in recovery mode with -recovery=%d", err, mode)
|
||||
}
|
||||
|
||||
return sto, nil
|
||||
}
|
||||
|
||||
// checkLargeIntegrity verifies that all large blobs in the large storage are
|
||||
// indexed in meta, and vice-versa, that all rows in meta referring to a large blob
|
||||
// correspond to an existing large blob in the large storage. If any of the above
|
||||
// is not true, it returns the recovery mode that should be used to fix the
|
||||
// problem, as well as the error detailing the problem. It does not perform any
|
||||
// check about the contents of the large blobs themselves.
|
||||
func (s *storage) checkLargeIntegrity() (RecoveryMode, error) {
|
||||
t := s.meta.Find(blobMetaPrefix, blobMetaPrefixLimit)
|
||||
inMeta := make(map[blob.Ref]bool)
|
||||
for t.Next() {
|
||||
meta, err := parseMetaRow(t.ValueBytes())
|
||||
if err != nil {
|
||||
t.Close()
|
||||
return FullRecovery, fmt.Errorf("corrupted blobpacked meta: %v")
|
||||
}
|
||||
inMeta[meta.largeRef] = true
|
||||
}
|
||||
if err := t.Close(); err != nil {
|
||||
return NoRecovery, err
|
||||
}
|
||||
inLarge := make(map[blob.Ref]bool)
|
||||
if err := blobserver.EnumerateAllFrom(context.Background(), s.large, "", func(sb blob.SizedRef) error {
|
||||
if _, ok := inMeta[sb.Ref]; !ok {
|
||||
return fmt.Errorf("packed blobRef %v is not in meta for blobpacked storage", sb.Ref)
|
||||
}
|
||||
inLarge[sb.Ref] = true
|
||||
delete(inMeta, sb.Ref)
|
||||
return nil
|
||||
}); err != nil {
|
||||
return FastRecovery, err
|
||||
}
|
||||
// we could simply check whether len(inMeta) > 0, but this below allows us to be
|
||||
// more precise about which blob is problematic.
|
||||
for k, _ := range inMeta {
|
||||
if _, ok := inLarge[k]; !ok {
|
||||
return FullRecovery, fmt.Errorf("packed blobRef %v in blobpacked meta does not actually exist in large storage", k)
|
||||
}
|
||||
}
|
||||
return NoRecovery, nil
|
||||
}
|
||||
|
||||
// wholeMetaPrefixInfo is the info needed to write the wholeMetaPrefix entries
|
||||
// when Reindexing. For a given file, spread over several zips, each zip has a
|
||||
// corresponding wholeMetaPrefixInfo. The wholeMetaPrefix entries pertaining to a
|
||||
|
|
|
@ -109,7 +109,7 @@ var (
|
|||
flagListen = flag.String("listen", "", "host:port to listen on, or :0 to auto-select. If blank, the value in the config will be used instead.")
|
||||
flagOpenBrowser = flag.Bool("openbrowser", true, "Launches the UI on startup")
|
||||
flagReindex = flag.Bool("reindex", false, "Reindex all blobs on startup")
|
||||
flagRecovery = flag.Bool("recovery", false, "Recovery mode: rebuild the blobpacked meta index. The tasks performed by the recovery mode might change in the future.")
|
||||
flagRecovery = flag.Int("recovery", 0, "Recovery mode: it corresponds for now to the recovery modes of the blobpacked package. Which means: 0 does nothing, 1 rebuilds the blobpacked index without erasing it, and 2 wipes the blobpacked index before rebuilding it.")
|
||||
flagSyslog = flag.Bool("syslog", false, "Log everything only to syslog. It is an error to use this flag on windows.")
|
||||
flagPollParent bool
|
||||
)
|
||||
|
@ -676,8 +676,8 @@ func setupLogging() io.Closer {
|
|||
}
|
||||
|
||||
func checkRecovery() {
|
||||
if *flagRecovery {
|
||||
blobpacked.SetRecovery()
|
||||
if blobpacked.RecoveryMode(*flagRecovery) > blobpacked.NoRecovery {
|
||||
blobpacked.SetRecovery(blobpacked.RecoveryMode(*flagRecovery))
|
||||
return
|
||||
}
|
||||
if !env.OnGCE() {
|
||||
|
@ -693,12 +693,12 @@ func checkRecovery() {
|
|||
if recovery == "" {
|
||||
return
|
||||
}
|
||||
doRecovery, err := strconv.ParseBool(recovery)
|
||||
mode, err := strconv.Atoi(recovery)
|
||||
if err != nil {
|
||||
log.Printf("invalid bool value for \"camlistore-recovery\": %v", err)
|
||||
log.Printf("invalid int value for \"camlistore-recovery\": %v", err)
|
||||
}
|
||||
if doRecovery {
|
||||
blobpacked.SetRecovery()
|
||||
if blobpacked.RecoveryMode(mode) > blobpacked.NoRecovery {
|
||||
blobpacked.SetRecovery(blobpacked.RecoveryMode(mode))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue