Diskpacked: support deletion

Delete from index and zero out blob's data.
Use FALLOC_FL_PUNCH_HOLE if available (linux >= 2.6.38).

Change-Id: I0c194fd6734a35b520212398d93dfdc5cabadeb5
This commit is contained in:
Tamás Gulácsi 2013-10-11 21:26:18 +02:00
parent 673e82815b
commit bc39d9888c
6 changed files with 420 additions and 55 deletions

View File

@ -0,0 +1,111 @@
/*
Copyright 2013 The Camlistore Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diskpacked
import (
"bytes"
"errors"
"fmt"
"io"
"os"
"strconv"
"camlistore.org/pkg/blob"
)
var errNoPunch = errors.New("punchHole not supported")
// punchHole, if non-nil, punches a hole in f from offset to offset+size.
var punchHole func(file *os.File, offset int64, size int64) error
func (s *storage) delete(br blob.Ref) error {
meta, err := s.meta(br)
if err != nil {
return err
}
f, err := os.OpenFile(s.filename(meta.file), os.O_RDWR, 0666)
if err != nil {
return err
}
defer f.Close()
// walk back, find the header, and overwrite the hash with xxxx-000000...
k := 1 + len(br.String()) + 1 + len(strconv.FormatUint(uint64(meta.size), 10)) + 1
off := meta.offset - int64(k)
b := make([]byte, k)
if k, err = f.ReadAt(b, off); err != nil {
return err
}
if b[0] != byte('[') || b[k-1] != byte(']') {
return fmt.Errorf("delete: cannot find header surroundings, found %q", b)
}
b = b[1 : k-1] // "sha1-xxxxxxxxxxxxxxxxxx nnnn" - everything between []
off += 1
// Replace b with "xxxx-000000000"
dash := bytes.IndexByte(b, '-')
if dash < 0 {
return fmt.Errorf("delete: cannot find dash in ref %q", b)
}
space := bytes.IndexByte(b[dash+1:], ' ')
if space < 0 {
return fmt.Errorf("delete: cannot find space in header %q", b)
}
for i := 0; i < dash; i++ {
b[i] = 'x'
}
for i := dash + 1; i < dash+1+space; i++ {
b[i] = '0'
}
// write back
if _, err = f.WriteAt(b, off); err != nil {
return err
}
// punch hole, if possible
if punchHole != nil {
err = punchHole(f, meta.offset, int64(meta.size))
if err == nil {
return nil
}
if err != errNoPunch {
return err
}
// err == errNoPunch - not implemented
}
// fill with zero
n, err := f.Seek(meta.offset, os.SEEK_SET)
if err != nil {
return err
}
if n != meta.offset {
return fmt.Errorf("error seeking to %d: got %d", meta.offset, n)
}
_, err = io.CopyN(f, zeroReader{}, int64(meta.size))
return err
}
type zeroReader struct{}
func (z zeroReader) Read(p []byte) (n int, err error) {
for i := range p {
p[i] = 0
}
return len(p), nil
}

View File

@ -355,9 +355,30 @@ func (s *storage) filename(file int) string {
return filepath.Join(s.root, fmt.Sprintf("pack-%05d.blobs", file))
}
var removeGate = syncutil.NewGate(20) // arbitrary
// RemoveBlobs removes the blobs from index and pads data with zero bytes
func (s *storage) RemoveBlobs(blobs []blob.Ref) error {
// TODO(adg): remove blob from index and pad data with spaces
return blobserver.ErrNotImplemented
batch := s.index.BeginBatch()
var wg syncutil.Group
for _, br := range blobs {
br := br
removeGate.Start()
batch.Delete(br.String())
wg.Go(func() error {
defer removeGate.Done()
if err := s.delete(br); err != nil {
return err
}
return nil
})
}
err1 := wg.Err()
err2 := s.index.CommitBatch(batch)
if err1 != nil {
return err1
}
return err2
}
var statGate = syncutil.NewGate(20) // arbitrary

View File

@ -17,11 +17,13 @@ limitations under the License.
package diskpacked
import (
"fmt"
"io/ioutil"
"os"
"strings"
"testing"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/storagetest"
"camlistore.org/pkg/jsonconfig"
@ -50,7 +52,12 @@ func newTempDiskpackedWithIndex(t *testing.T, indexConf jsonconfig.Obj) (sto blo
}
return s, func() {
s.Close()
os.RemoveAll(dir)
if camliDebug {
t.Logf("CAMLI_DEBUG set, skipping cleanup of dir %q", dir)
} else {
os.RemoveAll(dir)
}
}
}
@ -107,3 +114,85 @@ func TestDoubleReceive(t *testing.T) {
t.Fatalf("after packfile delete + reupload, not big enough. want size of a blob")
}
}
func TestDelete(t *testing.T) {
sto, cleanup := newTempDiskpacked(t)
defer cleanup()
var (
A = &test.Blob{Contents: "some small blob"}
B = &test.Blob{Contents: strings.Repeat("some middle blob", 100)}
C = &test.Blob{Contents: strings.Repeat("A 8192 bytes length largish blob", 8192/32)}
)
type step func() error
stepAdd := func(tb *test.Blob) step { // add the blob
return func() error {
sb, err := sto.ReceiveBlob(tb.BlobRef(), tb.Reader())
if err != nil {
return fmt.Errorf("ReceiveBlob of %s: %v", sb, err)
}
if sb != tb.SizedRef() {
return fmt.Errorf("Received %v; want %v", sb, tb.SizedRef())
}
return nil
}
}
stepCheck := func(want ...*test.Blob) step { // check the blob
wantRefs := make([]blob.SizedRef, len(want))
for i, tb := range want {
wantRefs[i] = tb.SizedRef()
}
return func() error {
if err := storagetest.CheckEnumerate(sto, wantRefs); err != nil {
return err
}
return nil
}
}
stepDelete := func(tb *test.Blob) step {
return func() error {
if err := sto.RemoveBlobs([]blob.Ref{tb.BlobRef()}); err != nil {
return fmt.Errorf("RemoveBlob(%s): %v", tb.BlobRef(), err)
}
return nil
}
}
var deleteTests = [][]step{
[]step{
stepAdd(A),
stepDelete(A),
stepCheck(),
stepAdd(B),
stepCheck(B),
stepDelete(B),
stepCheck(),
stepAdd(C),
stepCheck(C),
stepAdd(A),
stepCheck(A, C),
stepDelete(A),
stepDelete(C),
stepCheck(),
},
[]step{
stepAdd(A),
stepAdd(B),
stepAdd(C),
stepCheck(A, B, C),
stepDelete(C),
stepCheck(A, B),
},
}
for i, steps := range deleteTests {
for j, s := range steps {
if err := s(); err != nil {
t.Errorf("error at test %d, step %d: %v", i+1, j+1, err)
}
}
}
}

View File

@ -0,0 +1,46 @@
/*
Copyright 2013 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package diskpacked
import (
"os"
"syscall"
)
const (
// FALLOC_FL_KEEP_SIZE: default is extend size
falloc_fl_keep_size = 0x01
// FALLOC_FL_PUNCH_HOLE: de-allocates range
falloc_fl_punch_hole = 0x02
)
func init() {
punchHole = punchHoleLinux
}
// puncHoleLinux punches a hole into the given file starting at offset,
// measuring "size" bytes
func punchHoleLinux(file *os.File, offset int64, size int64) error {
err := syscall.Fallocate(int(file.Fd()),
falloc_fl_punch_hole|falloc_fl_keep_size,
offset, size)
if err == syscall.ENOSYS || err == syscall.EOPNOTSUPP {
return errNoPunch
}
return err
}

View File

@ -28,11 +28,13 @@ import (
"strconv"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/context"
"camlistore.org/pkg/sorted"
"camlistore.org/pkg/sorted/kvfile"
"camlistore.org/third_party/github.com/camlistore/lock"
)
var camliDebug, _ = strconv.ParseBool(os.Getenv("CAMLI_DEBUG"))
// Reindex rewrites the index files of the diskpacked .pack files
func Reindex(root string, overwrite bool) (err error) {
// there is newStorage, but that may open a file for writing
@ -51,7 +53,7 @@ func Reindex(root string, overwrite bool) (err error) {
}
}()
verbose := false // TODO: use env var?
ctx := context.TODO() // TODO(tgulacsi): get the verbosity from context
for i := 0; i >= 0; i++ {
fh, err := os.Open(s.filename(i))
if err != nil {
@ -60,7 +62,7 @@ func Reindex(root string, overwrite bool) (err error) {
}
return err
}
err = reindexOne(index, overwrite, verbose, fh, fh.Name(), i)
err = s.reindexOne(ctx, index, overwrite, i)
fh.Close()
if err != nil {
return err
@ -69,11 +71,98 @@ func Reindex(root string, overwrite bool) (err error) {
return nil
}
func reindexOne(index sorted.KeyValue, overwrite, verbose bool, r io.ReadSeeker, name string, packId int) error {
l, err := lock.Lock(name + ".lock")
defer l.Close()
func (s *storage) reindexOne(ctx *context.Context, index sorted.KeyValue, overwrite bool, packID int) error {
var pos int64
var batch sorted.BatchMutation
if overwrite {
batch = index.BeginBatch()
}
allOk := true
// TODO(tgulacsi): proper verbose from context
verbose := camliDebug
err := s.walkPack(verbose, packID,
func(packID int, ref blob.Ref, offset int64, size uint32) error {
if !ref.Valid() {
if camliDebug {
log.Printf("found deleted blob in %d at %d with size %d", packID, offset, size)
}
return nil
}
meta := blobMeta{packID, offset, size}.String()
if overwrite && batch != nil {
batch.Set(ref.String(), meta)
} else {
if old, err := index.Get(ref.String()); err != nil {
allOk = false
if err == sorted.ErrNotFound {
log.Println(ref.String() + ": cannot find in index!")
} else {
log.Println(ref.String()+": error getting from index: ", err.Error())
}
} else if old != meta {
allOk = false
log.Printf("%s: index mismatch - index=%s data=%s", ref.String(), old, meta)
}
}
return nil
})
if err != nil {
return err
}
if overwrite && batch != nil {
log.Printf("overwriting %s from %d", index, packID)
if err = index.CommitBatch(batch); err != nil {
return err
}
} else if !allOk {
return fmt.Errorf("index does not match data in %d", packID)
}
return nil
}
// Walk walks the storage and calls the walker callback with each blobref
// stops if walker returns non-nil error, and returns that
func (s *storage) Walk(ctx *context.Context,
walker func(packID int, ref blob.Ref, offset int64, size uint32) error) error {
// TODO(tgulacsi): proper verbose flag from context
verbose := camliDebug
for i := 0; i >= 0; i++ {
fh, err := os.Open(s.filename(i))
if err != nil {
if os.IsNotExist(err) {
break
}
return err
}
fh.Close()
if err = s.walkPack(verbose, i, walker); err != nil {
return err
}
}
return nil
}
// walkPack walks the given pack and calls the walker callback with each blobref.
// Stops if walker returns non-nil error and returns that.
func (s *storage) walkPack(verbose bool, packID int,
walker func(packID int, ref blob.Ref, offset int64, size uint32) error) error {
fh, err := os.Open(s.filename(packID))
if err != nil {
return err
}
defer fh.Close()
name := fh.Name()
var (
pos int64
size uint32
ref blob.Ref
)
errAt := func(prefix, suffix string) error {
if prefix != "" {
@ -85,13 +174,7 @@ func reindexOne(index sorted.KeyValue, overwrite, verbose bool, r io.ReadSeeker,
return fmt.Errorf(prefix+"at %d (0x%x) in %q:"+suffix, pos, pos, name)
}
var batch sorted.BatchMutation
if overwrite {
batch = index.BeginBatch()
}
allOk := true
br := bufio.NewReaderSize(r, 512)
br := bufio.NewReaderSize(fh, 512)
for {
if b, err := br.ReadByte(); err != nil {
if err == io.EOF {
@ -114,52 +197,61 @@ func reindexOne(index sorted.KeyValue, overwrite, verbose bool, r io.ReadSeeker,
if i <= 0 {
return errAt("", fmt.Sprintf("bad header format (no space in %q)", chunk))
}
size, err := strconv.ParseUint(string(chunk[i+1:]), 10, 32)
size64, err := strconv.ParseUint(string(chunk[i+1:]), 10, 32)
if err != nil {
return errAt(fmt.Sprintf("cannot parse size %q as int", chunk[i+1:]), err.Error())
}
ref, ok := blob.Parse(string(chunk[:i]))
if !ok {
return errAt("", fmt.Sprintf("cannot parse %q as blobref", chunk[:i]))
}
if verbose {
log.Printf("found %s at %d", ref, pos)
}
size = uint32(size64)
meta := blobMeta{packId, pos + 1 + int64(m), uint32(size)}.String()
if overwrite && batch != nil {
batch.Set(ref.String(), meta)
} else {
if old, err := index.Get(ref.String()); err != nil {
allOk = false
if err == sorted.ErrNotFound {
log.Println(ref.String() + ": cannot find in index!")
} else {
log.Println(ref.String()+": error getting from index: ", err.Error())
// maybe deleted?
state, deleted := 0, true
if chunk[0] == 'x' {
Loop:
for _, c := range chunk[:i] {
switch state {
case 0:
if c != 'x' {
if c == '-' {
state++
} else {
deleted = false
break Loop
}
}
case 1:
if c != '0' {
deleted = false
break Loop
}
}
} else if old != meta {
allOk = false
log.Printf("%s: index mismatch - index=%s data=%s", ref.String(), old, meta)
}
}
if deleted {
ref = blob.Ref{}
if verbose {
log.Printf("found deleted at %d", pos)
}
} else {
ref, ok := blob.Parse(string(chunk[:i]))
if !ok {
return errAt("", fmt.Sprintf("cannot parse %q as blobref", chunk[:i]))
}
if verbose {
log.Printf("found %s at %d", ref, pos)
}
}
if err = walker(packID, ref, pos+1+int64(m), size); err != nil {
return err
}
pos += 1 + int64(m)
// TODO(tgulacsi78): not just seek, but check the hashes of the files
// TODO(tgulacsi): not just seek, but check the hashes of the files
// maybe with a different command-line flag, only.
if pos, err = r.Seek(pos+int64(size), 0); err != nil {
return errAt("", "cannot seek +"+strconv.FormatUint(size, 10)+" bytes")
if pos, err = fh.Seek(pos+int64(size), 0); err != nil {
return errAt("", "cannot seek +"+strconv.FormatUint(size64, 10)+" bytes")
}
// drain the buffer after the underlying reader Seeks
io.CopyN(ioutil.Discard, br, int64(br.Buffered()))
}
if overwrite && batch != nil {
log.Printf("overwriting %s from %s", index, name)
if err = index.CommitBatch(batch); err != nil {
return err
}
} else if !allOk {
return fmt.Errorf("index does not match data in %q", name)
}
return nil
}

View File

@ -158,7 +158,7 @@ func testSizedBlob(t *testing.T, r io.Reader, b1 blob.Ref, size int64) {
}
}
func testEnumerate(t *testing.T, sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) {
func CheckEnumerate(sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) error {
var after string
var n = 1000
for _, opt := range opts {
@ -207,16 +207,22 @@ func testEnumerate(t *testing.T, sto blobserver.Storage, wantUnsorted []blob.Siz
})
if err := grp.Err(); err != nil {
t.Fatalf("Enumerate error: %v", err)
return
return fmt.Errorf("Enumerate error: %v", err)
}
if len(got) == 0 && len(want) == 0 {
return
return nil
}
if !reflect.DeepEqual(got, want) {
t.Fatalf("Enumerate mismatch. Got %d; want %d.\n Got: %v\nWant: %v\n",
return fmt.Errorf("Enumerate mismatch. Got %d; want %d.\n Got: %v\nWant: %v\n",
len(got), len(want), got, want)
}
return nil
}
func testEnumerate(t *testing.T, sto blobserver.Storage, wantUnsorted []blob.SizedRef, opts ...interface{}) {
if err := CheckEnumerate(sto, wantUnsorted, opts...); err != nil {
t.Fatalf("%v", err)
}
}
func testStat(t *testing.T, enum <-chan blob.SizedRef, want []blob.SizedRef) {