diff --git a/pkg/blobserver/diskpacked/dele.go b/pkg/blobserver/diskpacked/dele.go new file mode 100644 index 000000000..682dc8565 --- /dev/null +++ b/pkg/blobserver/diskpacked/dele.go @@ -0,0 +1,111 @@ +/* +Copyright 2013 The Camlistore Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package diskpacked + +import ( + "bytes" + "errors" + "fmt" + "io" + "os" + "strconv" + + "camlistore.org/pkg/blob" +) + +var errNoPunch = errors.New("punchHole not supported") + +// punchHole, if non-nil, punches a hole in f from offset to offset+size. +var punchHole func(file *os.File, offset int64, size int64) error + +func (s *storage) delete(br blob.Ref) error { + meta, err := s.meta(br) + if err != nil { + return err + } + f, err := os.OpenFile(s.filename(meta.file), os.O_RDWR, 0666) + if err != nil { + return err + } + defer f.Close() + + // walk back, find the header, and overwrite the hash with xxxx-000000... + k := 1 + len(br.String()) + 1 + len(strconv.FormatUint(uint64(meta.size), 10)) + 1 + off := meta.offset - int64(k) + b := make([]byte, k) + if k, err = f.ReadAt(b, off); err != nil { + return err + } + if b[0] != byte('[') || b[k-1] != byte(']') { + return fmt.Errorf("delete: cannot find header surroundings, found %q", b) + } + b = b[1 : k-1] // "sha1-xxxxxxxxxxxxxxxxxx nnnn" - everything between [] + off += 1 + + // Replace b with "xxxx-000000000" + dash := bytes.IndexByte(b, '-') + if dash < 0 { + return fmt.Errorf("delete: cannot find dash in ref %q", b) + } + space := bytes.IndexByte(b[dash+1:], ' ') + if space < 0 { + return fmt.Errorf("delete: cannot find space in header %q", b) + } + for i := 0; i < dash; i++ { + b[i] = 'x' + } + for i := dash + 1; i < dash+1+space; i++ { + b[i] = '0' + } + + // write back + if _, err = f.WriteAt(b, off); err != nil { + return err + } + + // punch hole, if possible + if punchHole != nil { + err = punchHole(f, meta.offset, int64(meta.size)) + if err == nil { + return nil + } + if err != errNoPunch { + return err + } + // err == errNoPunch - not implemented + } + + // fill with zero + n, err := f.Seek(meta.offset, os.SEEK_SET) + if err != nil { + return err + } + if n != meta.offset { + return fmt.Errorf("error seeking to %d: got %d", meta.offset, n) + } + _, err = io.CopyN(f, zeroReader{}, int64(meta.size)) + return err +} + +type zeroReader struct{} + +func (z zeroReader) Read(p []byte) (n int, err error) { + for i := range p { + p[i] = 0 + } + return len(p), nil +} diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index ff10b1bf1..e2c8bca64 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -355,9 +355,30 @@ func (s *storage) filename(file int) string { return filepath.Join(s.root, fmt.Sprintf("pack-%05d.blobs", file)) } +var removeGate = syncutil.NewGate(20) // arbitrary + +// RemoveBlobs removes the blobs from index and pads data with zero bytes func (s *storage) RemoveBlobs(blobs []blob.Ref) error { - // TODO(adg): remove blob from index and pad data with spaces - return blobserver.ErrNotImplemented + batch := s.index.BeginBatch() + var wg syncutil.Group + for _, br := range blobs { + br := br + removeGate.Start() + batch.Delete(br.String()) + wg.Go(func() error { + defer removeGate.Done() + if err := s.delete(br); err != nil { + return err + } + return nil + }) + } + err1 := wg.Err() + err2 := s.index.CommitBatch(batch) + if err1 != nil { + return err1 + } + return err2 } var statGate = syncutil.NewGate(20) // arbitrary diff --git a/pkg/blobserver/diskpacked/diskpacked_test.go b/pkg/blobserver/diskpacked/diskpacked_test.go index 7edb6dab9..8326c27ae 100644 --- a/pkg/blobserver/diskpacked/diskpacked_test.go +++ b/pkg/blobserver/diskpacked/diskpacked_test.go @@ -17,11 +17,13 @@ limitations under the License. package diskpacked import ( + "fmt" "io/ioutil" "os" "strings" "testing" + "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/blobserver/storagetest" "camlistore.org/pkg/jsonconfig" @@ -50,7 +52,12 @@ func newTempDiskpackedWithIndex(t *testing.T, indexConf jsonconfig.Obj) (sto blo } return s, func() { s.Close() - os.RemoveAll(dir) + + if camliDebug { + t.Logf("CAMLI_DEBUG set, skipping cleanup of dir %q", dir) + } else { + os.RemoveAll(dir) + } } } @@ -107,3 +114,85 @@ func TestDoubleReceive(t *testing.T) { t.Fatalf("after packfile delete + reupload, not big enough. want size of a blob") } } + +func TestDelete(t *testing.T) { + sto, cleanup := newTempDiskpacked(t) + defer cleanup() + + var ( + A = &test.Blob{Contents: "some small blob"} + B = &test.Blob{Contents: strings.Repeat("some middle blob", 100)} + C = &test.Blob{Contents: strings.Repeat("A 8192 bytes length largish blob", 8192/32)} + ) + + type step func() error + + stepAdd := func(tb *test.Blob) step { // add the blob + return func() error { + sb, err := sto.ReceiveBlob(tb.BlobRef(), tb.Reader()) + if err != nil { + return fmt.Errorf("ReceiveBlob of %s: %v", sb, err) + } + if sb != tb.SizedRef() { + return fmt.Errorf("Received %v; want %v", sb, tb.SizedRef()) + } + return nil + } + } + + stepCheck := func(want ...*test.Blob) step { // check the blob + wantRefs := make([]blob.SizedRef, len(want)) + for i, tb := range want { + wantRefs[i] = tb.SizedRef() + } + return func() error { + if err := storagetest.CheckEnumerate(sto, wantRefs); err != nil { + return err + } + return nil + } + } + + stepDelete := func(tb *test.Blob) step { + return func() error { + if err := sto.RemoveBlobs([]blob.Ref{tb.BlobRef()}); err != nil { + return fmt.Errorf("RemoveBlob(%s): %v", tb.BlobRef(), err) + } + return nil + } + } + + var deleteTests = [][]step{ + []step{ + stepAdd(A), + stepDelete(A), + stepCheck(), + stepAdd(B), + stepCheck(B), + stepDelete(B), + stepCheck(), + stepAdd(C), + stepCheck(C), + stepAdd(A), + stepCheck(A, C), + stepDelete(A), + stepDelete(C), + stepCheck(), + }, + []step{ + stepAdd(A), + stepAdd(B), + stepAdd(C), + stepCheck(A, B, C), + stepDelete(C), + stepCheck(A, B), + }, + } + for i, steps := range deleteTests { + for j, s := range steps { + if err := s(); err != nil { + t.Errorf("error at test %d, step %d: %v", i+1, j+1, err) + } + } + } +} diff --git a/pkg/blobserver/diskpacked/punch_linux.go b/pkg/blobserver/diskpacked/punch_linux.go new file mode 100644 index 000000000..d5e1a85d7 --- /dev/null +++ b/pkg/blobserver/diskpacked/punch_linux.go @@ -0,0 +1,46 @@ +/* +Copyright 2013 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package diskpacked + +import ( + "os" + "syscall" +) + +const ( + // FALLOC_FL_KEEP_SIZE: default is extend size + falloc_fl_keep_size = 0x01 + + // FALLOC_FL_PUNCH_HOLE: de-allocates range + falloc_fl_punch_hole = 0x02 +) + +func init() { + punchHole = punchHoleLinux +} + +// puncHoleLinux punches a hole into the given file starting at offset, +// measuring "size" bytes +func punchHoleLinux(file *os.File, offset int64, size int64) error { + err := syscall.Fallocate(int(file.Fd()), + falloc_fl_punch_hole|falloc_fl_keep_size, + offset, size) + if err == syscall.ENOSYS || err == syscall.EOPNOTSUPP { + return errNoPunch + } + return err +} diff --git a/pkg/blobserver/diskpacked/reindex.go b/pkg/blobserver/diskpacked/reindex.go index a058b27c4..e9f746f01 100644 --- a/pkg/blobserver/diskpacked/reindex.go +++ b/pkg/blobserver/diskpacked/reindex.go @@ -28,11 +28,13 @@ import ( "strconv" "camlistore.org/pkg/blob" + "camlistore.org/pkg/context" "camlistore.org/pkg/sorted" "camlistore.org/pkg/sorted/kvfile" - "camlistore.org/third_party/github.com/camlistore/lock" ) +var camliDebug, _ = strconv.ParseBool(os.Getenv("CAMLI_DEBUG")) + // Reindex rewrites the index files of the diskpacked .pack files func Reindex(root string, overwrite bool) (err error) { // there is newStorage, but that may open a file for writing @@ -51,7 +53,7 @@ func Reindex(root string, overwrite bool) (err error) { } }() - verbose := false // TODO: use env var? + ctx := context.TODO() // TODO(tgulacsi): get the verbosity from context for i := 0; i >= 0; i++ { fh, err := os.Open(s.filename(i)) if err != nil { @@ -60,7 +62,7 @@ func Reindex(root string, overwrite bool) (err error) { } return err } - err = reindexOne(index, overwrite, verbose, fh, fh.Name(), i) + err = s.reindexOne(ctx, index, overwrite, i) fh.Close() if err != nil { return err @@ -69,11 +71,98 @@ func Reindex(root string, overwrite bool) (err error) { return nil } -func reindexOne(index sorted.KeyValue, overwrite, verbose bool, r io.ReadSeeker, name string, packId int) error { - l, err := lock.Lock(name + ".lock") - defer l.Close() +func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwrite bool, packID int) error { - var pos int64 + var batch sorted.BatchMutation + if overwrite { + batch = index.BeginBatch() + } + allOk := true + + // TODO(tgulacsi): proper verbose from context + verbose := camliDebug + err := s.walkPack(verbose, packID, + func(packID int, ref blob.Ref, offset int64, size uint32) error { + if !ref.Valid() { + if camliDebug { + log.Printf("found deleted blob in %d at %d with size %d", packID, offset, size) + } + return nil + } + meta := blobMeta{packID, offset, size}.String() + if overwrite && batch != nil { + batch.Set(ref.String(), meta) + } else { + if old, err := index.Get(ref.String()); err != nil { + allOk = false + if err == sorted.ErrNotFound { + log.Println(ref.String() + ": cannot find in index!") + } else { + log.Println(ref.String()+": error getting from index: ", err.Error()) + } + } else if old != meta { + allOk = false + log.Printf("%s: index mismatch - index=%s data=%s", ref.String(), old, meta) + } + } + return nil + }) + if err != nil { + return err + } + + if overwrite && batch != nil { + log.Printf("overwriting %s from %d", index, packID) + if err = index.CommitBatch(batch); err != nil { + return err + } + } else if !allOk { + return fmt.Errorf("index does not match data in %d", packID) + } + return nil +} + +// Walk walks the storage and calls the walker callback with each blobref +// stops if walker returns non-nil error, and returns that +func (s *storage) Walk(ctx *context.Context, + walker func(packID int, ref blob.Ref, offset int64, size uint32) error) error { + + // TODO(tgulacsi): proper verbose flag from context + verbose := camliDebug + + for i := 0; i >= 0; i++ { + fh, err := os.Open(s.filename(i)) + if err != nil { + if os.IsNotExist(err) { + break + } + return err + } + fh.Close() + if err = s.walkPack(verbose, i, walker); err != nil { + return err + } + } + return nil +} + +// walkPack walks the given pack and calls the walker callback with each blobref. +// Stops if walker returns non-nil error and returns that. +func (s *storage) walkPack(verbose bool, packID int, + walker func(packID int, ref blob.Ref, offset int64, size uint32) error) error { + + fh, err := os.Open(s.filename(packID)) + if err != nil { + return err + } + defer fh.Close() + name := fh.Name() + + var ( + pos int64 + size uint32 + ref blob.Ref + ) errAt := func(prefix, suffix string) error { if prefix != "" { @@ -85,13 +174,7 @@ func reindexOne(index sorted.KeyValue, overwrite, verbose bool, r io.ReadSeeker, return fmt.Errorf(prefix+"at %d (0x%x) in %q:"+suffix, pos, pos, name) } - var batch sorted.BatchMutation - if overwrite { - batch = index.BeginBatch() - } - - allOk := true - br := bufio.NewReaderSize(r, 512) + br := bufio.NewReaderSize(fh, 512) for { if b, err := br.ReadByte(); err != nil { if err == io.EOF { @@ -114,52 +197,61 @@ func reindexOne(index sorted.KeyValue, overwrite, verbose bool, r io.ReadSeeker, if i <= 0 { return errAt("", fmt.Sprintf("bad header format (no space in %q)", chunk)) } - size, err := strconv.ParseUint(string(chunk[i+1:]), 10, 32) + size64, err := strconv.ParseUint(string(chunk[i+1:]), 10, 32) if err != nil { return errAt(fmt.Sprintf("cannot parse size %q as int", chunk[i+1:]), err.Error()) } - ref, ok := blob.Parse(string(chunk[:i])) - if !ok { - return errAt("", fmt.Sprintf("cannot parse %q as blobref", chunk[:i])) - } - if verbose { - log.Printf("found %s at %d", ref, pos) - } + size = uint32(size64) - meta := blobMeta{packId, pos + 1 + int64(m), uint32(size)}.String() - if overwrite && batch != nil { - batch.Set(ref.String(), meta) - } else { - if old, err := index.Get(ref.String()); err != nil { - allOk = false - if err == sorted.ErrNotFound { - log.Println(ref.String() + ": cannot find in index!") - } else { - log.Println(ref.String()+": error getting from index: ", err.Error()) + // maybe deleted? + state, deleted := 0, true + if chunk[0] == 'x' { + Loop: + for _, c := range chunk[:i] { + switch state { + case 0: + if c != 'x' { + if c == '-' { + state++ + } else { + deleted = false + break Loop + } + } + case 1: + if c != '0' { + deleted = false + break Loop + } } - } else if old != meta { - allOk = false - log.Printf("%s: index mismatch - index=%s data=%s", ref.String(), old, meta) } } + if deleted { + ref = blob.Ref{} + if verbose { + log.Printf("found deleted at %d", pos) + } + } else { + ref, ok := blob.Parse(string(chunk[:i])) + if !ok { + return errAt("", fmt.Sprintf("cannot parse %q as blobref", chunk[:i])) + } + if verbose { + log.Printf("found %s at %d", ref, pos) + } + } + if err = walker(packID, ref, pos+1+int64(m), size); err != nil { + return err + } pos += 1 + int64(m) - // TODO(tgulacsi78): not just seek, but check the hashes of the files + // TODO(tgulacsi): not just seek, but check the hashes of the files // maybe with a different command-line flag, only. - if pos, err = r.Seek(pos+int64(size), 0); err != nil { - return errAt("", "cannot seek +"+strconv.FormatUint(size, 10)+" bytes") + if pos, err = fh.Seek(pos+int64(size), 0); err != nil { + return errAt("", "cannot seek +"+strconv.FormatUint(size64, 10)+" bytes") } // drain the buffer after the underlying reader Seeks io.CopyN(ioutil.Discard, br, int64(br.Buffered())) } - - if overwrite && batch != nil { - log.Printf("overwriting %s from %s", index, name) - if err = index.CommitBatch(batch); err != nil { - return err - } - } else if !allOk { - return fmt.Errorf("index does not match data in %q", name) - } return nil } diff --git a/pkg/blobserver/storagetest/storagetest.go b/pkg/blobserver/storagetest/storagetest.go index 3e682f470..f9bb104e2 100644 --- a/pkg/blobserver/storagetest/storagetest.go +++ b/pkg/blobserver/storagetest/storagetest.go @@ -158,7 +158,7 @@ func testSizedBlob(t *testing.T, r io.Reader, b1 blob.Ref, size int64) { } } -func testEnumerate(t *testing.T, sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) { +func CheckEnumerate(sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) error { var after string var n = 1000 for _, opt := range opts { @@ -207,16 +207,22 @@ func testEnumerate(t *testing.T, sto blobserver.Storage, wantUnsorted []blob.Siz }) if err := grp.Err(); err != nil { - t.Fatalf("Enumerate error: %v", err) - return + return fmt.Errorf("Enumerate error: %v", err) } if len(got) == 0 && len(want) == 0 { - return + return nil } if !reflect.DeepEqual(got, want) { - t.Fatalf("Enumerate mismatch. Got %d; want %d.\n Got: %v\nWant: %v\n", + return fmt.Errorf("Enumerate mismatch. Got %d; want %d.\n Got: %v\nWant: %v\n", len(got), len(want), got, want) } + return nil +} + +func testEnumerate(t *testing.T, sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) { + if err := CheckEnumerate(sto, wantUnsorted, opts...); err != nil { + t.Fatalf("%v", err) + } } func testStat(t *testing.T, enum <-chan blob.SizedRef, want []blob.SizedRef) {