From cb6f423eeb3521d0b947a31e1d9102f4c12449fd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tam=C3=A1s=20Gul=C3=A1csi?= Date: Thu, 3 Oct 2013 22:10:26 +0200 Subject: [PATCH] Add diskpacked-reindex subcommand to camtool For checking and/or rebuilding index.kv of diskpacked packs. Change-Id: I1ad87974b2daf58b1e767bb1df6f7b64e8b6359a --- cmd/camtool/dp_idx_rebuild.go | 104 +++++++++++++++ pkg/blobserver/diskpacked/diskpacked.go | 8 +- pkg/blobserver/diskpacked/reindex.go | 164 ++++++++++++++++++++++++ 3 files changed, 274 insertions(+), 2 deletions(-) create mode 100644 cmd/camtool/dp_idx_rebuild.go create mode 100644 pkg/blobserver/diskpacked/reindex.go diff --git a/cmd/camtool/dp_idx_rebuild.go b/cmd/camtool/dp_idx_rebuild.go new file mode 100644 index 000000000..bfe3b17e2 --- /dev/null +++ b/cmd/camtool/dp_idx_rebuild.go @@ -0,0 +1,104 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "errors" + "flag" + "fmt" + "os" + "log" + + "camlistore.org/pkg/blobserver/diskpacked" + "camlistore.org/pkg/cmdmain" + "camlistore.org/pkg/jsonconfig" + "camlistore.org/pkg/osutil" + "camlistore.org/pkg/serverconfig" +) + +type reindexdpCmd struct { + overwrite, verbose bool +} + +func init() { + cmdmain.RegisterCommand("reindex-diskpacked", + func(flags *flag.FlagSet) cmdmain.CommandRunner { + cmd := new(reindexdpCmd) + flags.BoolVar(&cmd.overwrite, "overwrite", false, + "Overwrite the existing index.kv? If not, than only checking is made.") + return cmd + }) +} + +func (c *reindexdpCmd) Describe() string { + return "Rebuild the index of the diskpacked blob store" +} + +func (c *reindexdpCmd) Usage() { + fmt.Fprintln(os.Stderr, "Usage: camtool [globalopts] reindex-diskpacked [reindex-opts]") + fmt.Fprintln(os.Stderr, " camtool reindex-diskpacked") + fmt.Fprintln(os.Stderr, " camtool reindex-diskpacked --overwrite") +} + +func (c *reindexdpCmd) RunCommand(args []string) error { + var path string + switch { + case len(args) == 0: + cfg, err := serverconfig.Load(osutil.UserServerConfigPath()) + if err != nil { + return err + } + prefixes := cfg.RequiredObject("prefixes") + if err := cfg.Validate(); err != nil { + return fmt.Errorf("configuration error in root object's keys: %v", err) + } + for prefix, vei := range prefixes { + pmap, ok := vei.(map[string]interface{}) + if !ok { + log.Printf("prefix %q value is a %T, not an object", prefix, vei) + continue + } + pconf := jsonconfig.Obj(pmap) + handlerType := pconf.RequiredString("handler") + handlerArgs := pconf.OptionalObject("handlerArgs") + // no pconf.Validate, as this is a recover tool + if handlerType != "storage-diskpacked" { + continue + } + if handlerArgs == nil { + log.Printf("no handlerArgs for %q", prefix) + continue + } + aconf := jsonconfig.Obj(handlerArgs) + path = aconf.RequiredString("path") + // no aconv.Validate, as this is a recover tool + if path != "" { + break + } + } + + case len(args) == 1: + path = args[0] + default: + return errors.New("More than 1 argument not allowed") + } + if path == "" { + return errors.New("no path is given/found") + } + + return diskpacked.Reindex(path, c.overwrite) +} diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index 6d3e04fe6..de91bb141 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -88,8 +88,12 @@ func newStorage(root string, maxFileSize int64) (s *storage, err error) { return nil, err } defer func() { - if err != nil { - index.Close() + closeErr := index.Close() + // just returning the first error - if the index or disk is corrupt + // and can't close, it's very likely these two errors are related and + // have the same root cause. + if err == nil { + err = closeErr } }() if maxFileSize <= 0 { diff --git a/pkg/blobserver/diskpacked/reindex.go b/pkg/blobserver/diskpacked/reindex.go new file mode 100644 index 000000000..e7f9fd95a --- /dev/null +++ b/pkg/blobserver/diskpacked/reindex.go @@ -0,0 +1,164 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package diskpacked + +import ( + "bufio" + "bytes" + "fmt" + "io" + "io/ioutil" + "log" + "os" + "path/filepath" + "strconv" + + "camlistore.org/pkg/blob" + "camlistore.org/pkg/index/kvfile" + "camlistore.org/pkg/sorted" + "camlistore.org/third_party/github.com/camlistore/lock" +) + +// Reindex rewrites the index files of the diskpacked .pack files +func Reindex(root string, overwrite bool) (err error) { + // there is newStorage, but that may open a file for writing + var s = &storage{root: root} + index, err := kvfile.NewStorage(filepath.Join(root, "index.kv")) + if err != nil { + return err + } + defer func() { + closeErr := index.Close() + // just returning the first error - if the index or disk is corrupt + // and can't close, it's very likely these two errors are related and + // have the same root cause. + if err == nil { + err = closeErr + } + }() + + verbose := false // TODO: use env var? + for i := int64(0); i >= 0; i++ { + fh, err := os.Open(s.filename(i)) + if err != nil { + if os.IsNotExist(err) { + break + } + return err + } + err = reindexOne(index, overwrite, verbose, fh, fh.Name(), i) + fh.Close() + if err != nil { + return err + } + } + return nil +} + +func reindexOne(index sorted.KeyValue, overwrite, verbose bool, r io.ReadSeeker, name string, packId int64) error { + l, err := lock.Lock(name + ".lock") + defer l.Close() + + var pos, size int64 + + errAt := func(prefix, suffix string) error { + if prefix != "" { + prefix = prefix + " " + } + if suffix != "" { + suffix = " " + suffix + } + return fmt.Errorf(prefix+"at %d (0x%x) in %q:"+suffix, pos, pos, name) + } + + var batch sorted.BatchMutation + if overwrite { + batch = index.BeginBatch() + } + + allOk := true + br := bufio.NewReaderSize(r, 512) + for { + if b, err := br.ReadByte(); err != nil { + if err == io.EOF { + break + } + return errAt("error while reading", err.Error()) + } else if b != '[' { + return errAt(fmt.Sprintf("found byte 0x%x", b), "but '[' should be here!") + } + chunk, err := br.ReadSlice(']') + if err != nil { + if err == io.EOF { + break + } + return errAt("error reading blob header", err.Error()) + } + m := len(chunk) + chunk = chunk[:m-1] + i := bytes.IndexByte(chunk, byte(' ')) + if i <= 0 { + return errAt("", fmt.Sprintf("bad header format (no space in %q)", chunk)) + } + if size, err = strconv.ParseInt(string(chunk[i+1:]), 10, 64); err != nil { + return errAt(fmt.Sprintf("cannot parse size %q as int", chunk[i+1:]), err.Error()) + } + ref, ok := blob.Parse(string(chunk[:i])) + if !ok { + return errAt("", fmt.Sprintf("cannot parse %q as blobref", chunk[:i])) + } + if verbose { + log.Printf("found %s at %d", ref, pos) + } + + meta := blobMeta{packId, pos + 1 + int64(m), size}.String() + if overwrite && batch != nil { + batch.Set(ref.String(), meta) + } else { + if old, err := index.Get(ref.String()); err != nil { + allOk = false + if err == sorted.ErrNotFound { + log.Println(ref.String() + ": cannot find in index!") + } else { + log.Println(ref.String()+": error getting from index: ", err.Error()) + } + } else if old != meta { + allOk = false + log.Printf("%s: index mismatch - index=%s data=%s", ref.String(), old, meta) + } + } + + pos += 1 + int64(m) + // TODO(tgulacsi78): not just seek, but check the hashes of the files + // maybe with a different command-line flag, only. + if pos, err = r.Seek(pos+size, 0); err != nil { + return errAt("", "cannot seek +"+strconv.FormatInt(size, 10)+" bytes") + } + // drain the buffer after the underlying reader Seeks + io.CopyN(ioutil.Discard, br, int64(br.Buffered())) + } + + if overwrite && batch != nil { + log.Printf("overwriting %s from %s", index, name) + if err = index.CommitBatch(batch); err != nil { + return err + } + } else if !allOk { + return fmt.Errorf("index does not match data in %q", name) + } + return nil +}