pkg/blobserver/diskpacked: TODO, Context and dead code cleanup

This commit is contained in:
Tamás Gulácsi 2021-07-26 07:00:05 +02:00 committed by Tamás Gulácsi
parent 942cc31673
commit de8805f4fa
8 changed files with 79 additions and 54 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package main
import (
"context"
"errors"
"flag"
"fmt"
@ -27,6 +28,7 @@ import (
"perkeep.org/internal/osutil"
"perkeep.org/pkg/blobserver/diskpacked"
"perkeep.org/pkg/cmdmain"
"perkeep.org/pkg/env"
"perkeep.org/pkg/serverinit"
)
@ -127,5 +129,8 @@ func (c *reindexdpCmd) RunCommand(args []string) error {
}
log.Printf("indexConf: %v", indexConf)
return diskpacked.Reindex(path, c.overwrite, indexConf)
ctx := diskpacked.CtxSetVerbose(context.Background(), env.IsDebug())
ctx, cancel := context.WithCancel(ctx)
defer cancel()
return diskpacked.Reindex(ctx, path, c.overwrite, indexConf)
}

View File

@ -83,14 +83,13 @@ func (s *storage) delete(br blob.Ref) error {
if err == nil {
return nil
}
if err != errNoPunch {
if !errors.Is(err, errNoPunch) {
return err
}
// err == errNoPunch - not implemented
}
// fill with zero
n, err := f.Seek(meta.offset, os.SEEK_SET)
n, err := f.Seek(meta.offset, io.SeekStart)
if err != nil {
return err
}

View File

@ -125,9 +125,26 @@ func IsDir(dir string) (bool, error) {
// files.
func New(dir string) (blobserver.Storage, error) {
var maxSize int64
if ok, _ := IsDir(dir); ok {
// TODO: detect existing max size from size of files, if obvious,
// and set maxSize to that?
var n, atMax int
if des, err := os.ReadDir(dir); err == nil {
// Detect existing max size from size of files, if obvious, and set maxSize to that
for _, de := range des {
if nm := de.Name(); strings.HasPrefix(nm, "pack-") && strings.HasSuffix(nm, ".blobs") {
n++
if fi, err := de.Info(); err == nil {
if s := fi.Size(); s > maxSize {
maxSize, atMax = fi.Size(), 0
} else if s == maxSize {
atMax++
}
}
}
}
}
// Believe to the deduced size only if at least 2 files has that maximum size,
// and all files (except one) has the same.
if !(atMax > 1 && n == atMax+1) {
maxSize = 0
}
return newStorage(dir, maxSize, nil)
}
@ -151,7 +168,7 @@ func newStorage(root string, maxFileSize int64, indexConf jsonconfig.Obj) (s *st
return nil, fmt.Errorf("storage root %q doesn't exist", root)
}
if err != nil {
return nil, fmt.Errorf("Failed to stat directory %q: %v", root, err)
return nil, fmt.Errorf("Failed to stat directory %q: %w", root, err)
}
if !fi.IsDir() {
return nil, fmt.Errorf("storage root %q exists but is not a directory", root)
@ -185,7 +202,7 @@ func newStorage(root string, maxFileSize int64, indexConf jsonconfig.Obj) (s *st
s.mu.Lock()
defer s.mu.Unlock()
if _, _, err := s.StorageGeneration(); err != nil {
return nil, fmt.Errorf("Error initialization generation for %q: %v", root, err)
return nil, fmt.Errorf("Error initialization generation for %q: %w", root, err)
}
return s, nil
}
@ -342,19 +359,19 @@ func (s *storage) Close() error {
}
func (s *storage) Fetch(ctx context.Context, br blob.Ref) (io.ReadCloser, uint32, error) {
return s.fetch(ctx, br, 0, -1)
return s.fetch(br, 0, -1)
}
func (s *storage) SubFetch(ctx context.Context, br blob.Ref, offset, length int64) (io.ReadCloser, error) {
if offset < 0 || length < 0 {
return nil, blob.ErrNegativeSubFetch
}
rc, _, err := s.fetch(ctx, br, offset, length)
rc, _, err := s.fetch(br, offset, length)
return rc, err
}
// length of -1 means all
func (s *storage) fetch(ctx context.Context, br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
func (s *storage) fetch(br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
meta, err := s.meta(br)
if err != nil {
return nil, 0, err
@ -412,7 +429,7 @@ func (s *storage) RemoveBlobs(ctx context.Context, blobs []blob.Ref) error {
batch.Delete(br.String())
wg.Go(func() error {
defer removeGate.Done()
if err := s.delete(br); err != nil && err != os.ErrNotExist {
if err := s.delete(br); err != nil && !errors.Is(err, os.ErrNotExist) {
return err
}
return nil
@ -434,7 +451,7 @@ func (s *storage) StatBlobs(ctx context.Context, blobs []blob.Ref, fn func(blob.
if err == nil {
return m.SizedRef(br), nil
}
if err == os.ErrNotExist {
if errors.Is(err, os.ErrNotExist) {
return sb, nil
}
return sb, err
@ -508,17 +525,6 @@ func readHeader(br *bufio.Reader) (consumed int, digest []byte, size uint32, err
return len(line), line[1:sp], uint32(size64), nil
}
// Type readSeekNopCloser is an io.ReadSeeker with a no-op Close method.
type readSeekNopCloser struct {
io.ReadSeeker
}
func (readSeekNopCloser) Close() error { return nil }
func newReadSeekNopCloser(rs io.ReadSeeker) readerutil.ReadSeekCloser {
return readSeekNopCloser{rs}
}
// The header of deleted blobs has a digest in which the hash type is
// set to all 'x', the hash value is all '0', and has the correct size.
var deletedBlobRef = regexp.MustCompile(`^x+-0+$`)
@ -554,7 +560,7 @@ func (s *storage) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAn
// TODO: probably be stricter here and don't allow seek past
// the end, since we know the size of closed files and the
// size of the file diskpacked currently still writing.
_, err = fd.Seek(offset, os.SEEK_SET)
_, err = fd.Seek(offset, io.SeekStart)
if err != nil {
return err
}
@ -567,7 +573,7 @@ func (s *storage) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAn
for {
// Are we at the EOF of this pack?
if _, err := r.Peek(1); err != nil {
if err != io.EOF {
if !errors.Is(err, io.EOF) {
return err
}
// EOF case; continue to the next pack, if any.
@ -635,7 +641,6 @@ func (s *storage) StreamBlobs(ctx context.Context, dest chan<- blobserver.BlobAn
}
func (s *storage) ReceiveBlob(ctx context.Context, br blob.Ref, source io.Reader) (sbr blob.SizedRef, err error) {
// TODO: use ctx somehow?
var b bytes.Buffer
n, err := b.ReadFrom(source)
if err != nil {
@ -678,7 +683,7 @@ func (s *storage) append(br blob.SizedRef, r io.Reader) error {
}
// TODO(adg): remove this seek and the offset check once confident
offset, err := s.writer.Seek(0, os.SEEK_CUR)
offset, err := s.writer.Seek(0, io.SeekCurrent)
if err != nil {
return err
}
@ -710,7 +715,7 @@ func (s *storage) append(br blob.SizedRef, r io.Reader) error {
}
err = s.index.Set(br.Ref.String(), blobMeta{packIdx, offset, br.Size}.String())
if err != nil {
if _, seekErr := s.writer.Seek(origOffset, os.SEEK_SET); seekErr != nil {
if _, seekErr := s.writer.Seek(origOffset, io.SeekStart); seekErr != nil {
log.Printf("ERROR seeking back to the original offset: %v", seekErr)
} else if truncErr := s.writer.Truncate(origOffset); truncErr != nil {
log.Printf("ERROR truncating file after index error: %v", truncErr)
@ -725,7 +730,7 @@ func (s *storage) append(br blob.SizedRef, r io.Reader) error {
func (s *storage) meta(br blob.Ref) (m blobMeta, err error) {
ms, err := s.index.Get(br.String())
if err != nil {
if err == sorted.ErrNotFound {
if errors.Is(err, sorted.ErrNotFound) {
err = os.ErrNotExist
}
return

View File

@ -105,7 +105,9 @@ func TestDoubleReceive(t *testing.T) {
if size(0) < blobSize {
t.Fatalf("size = %d; want at least %d", size(0), blobSize)
}
sto.(*storage).nextPack()
if err = sto.(*storage).nextPack(); err != nil {
t.Fatal(err)
}
_, err = blobserver.Receive(ctxbg, sto, br, b.Reader())
if err != nil {
@ -134,7 +136,7 @@ func TestDelete(t *testing.T) {
return func() error {
sb, err := sto.ReceiveBlob(ctxbg, tb.BlobRef(), tb.Reader())
if err != nil {
return fmt.Errorf("ReceiveBlob of %s: %v", sb, err)
return fmt.Errorf("ReceiveBlob of %s: %w", sb, err)
}
if sb != tb.SizedRef() {
return fmt.Errorf("Received %v; want %v", sb, tb.SizedRef())
@ -159,7 +161,7 @@ func TestDelete(t *testing.T) {
stepDelete := func(tb *test.Blob) step {
return func() error {
if err := sto.RemoveBlobs(ctx, []blob.Ref{tb.BlobRef()}); err != nil {
return fmt.Errorf("RemoveBlob(%s): %v", tb.BlobRef(), err)
return fmt.Errorf("RemoveBlob(%s): %w", tb.BlobRef(), err)
}
return nil
}
@ -223,7 +225,7 @@ func TestDoubleReceiveFailingIndex(t *testing.T) {
_, err := blobserver.Receive(ctxbg, sto, br, b.Reader())
if err != nil {
if err != errDummy {
if !errors.Is(err, errDummy) {
t.Fatal(err)
}
t.Logf("dummy fail")

View File

@ -17,6 +17,7 @@ limitations under the License.
package diskpacked
import (
"errors"
"os"
"syscall"
)
@ -39,7 +40,7 @@ func punchHoleLinux(file *os.File, offset int64, size int64) error {
err := syscall.Fallocate(int(file.Fd()),
falloc_fl_punch_hole|falloc_fl_keep_size,
offset, size)
if err == syscall.ENOSYS || err == syscall.EOPNOTSUPP {
if errors.Is(err, syscall.ENOSYS) || errors.Is(err, syscall.EOPNOTSUPP) {
return errNoPunch
}
return err

View File

@ -20,6 +20,7 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"io"
"io/ioutil"
@ -29,7 +30,6 @@ import (
"go4.org/jsonconfig"
"perkeep.org/pkg/blob"
"perkeep.org/pkg/env"
"perkeep.org/pkg/sorted"
// possible index formats
@ -39,7 +39,7 @@ import (
)
// Reindex rewrites the index files of the diskpacked .pack files
func Reindex(root string, overwrite bool, indexConf jsonconfig.Obj) (err error) {
func Reindex(ctx context.Context, root string, overwrite bool, indexConf jsonconfig.Obj) (err error) {
// there is newStorage, but that may open a file for writing
var s = &storage{root: root}
index, err := newIndex(root, indexConf)
@ -56,7 +56,6 @@ func Reindex(root string, overwrite bool, indexConf jsonconfig.Obj) (err error)
}
}()
ctx := context.TODO() // TODO(tgulacsi): get the verbosity from context
for i := 0; i >= 0; i++ {
fh, err := os.Open(s.filename(i))
if err != nil {
@ -82,8 +81,7 @@ func (s *storage) reindexOne(ctx context.Context, index sorted.KeyValue, overwri
}
allOk := true
// TODO(tgulacsi): proper verbose from context
verbose := env.IsDebug()
verbose := ctxGetVerbose(ctx)
misses := make(map[blob.Ref]string, 8)
err := s.walkPack(verbose, packID,
func(packID int, ref blob.Ref, offset int64, size uint32) error {
@ -98,12 +96,10 @@ func (s *storage) reindexOne(ctx context.Context, index sorted.KeyValue, overwri
batch.Set(ref.String(), meta)
return nil
}
if _, ok := misses[ref]; ok { // maybe this is the last of this blob.
delete(misses, ref)
}
delete(misses, ref)
if old, err := index.Get(ref.String()); err != nil {
allOk = false
if err == sorted.ErrNotFound {
if errors.Is(err, sorted.ErrNotFound) {
log.Println(ref.String() + ": cannot find in index!")
} else {
log.Println(ref.String()+": error getting from index: ", err.Error())
@ -143,8 +139,7 @@ func (s *storage) reindexOne(ctx context.Context, index sorted.KeyValue, overwri
func (s *storage) Walk(ctx context.Context,
walker func(packID int, ref blob.Ref, offset int64, size uint32) error) error {
// TODO(tgulacsi): proper verbose flag from context
verbose := env.IsDebug()
verbose := ctxGetVerbose(ctx)
for i := 0; i >= 0; i++ {
fh, err := os.Open(s.filename(i))
@ -193,7 +188,7 @@ func (s *storage) walkPack(verbose bool, packID int,
br := bufio.NewReaderSize(fh, 512)
for {
if b, err := br.ReadByte(); err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
break
}
return errAt("error while reading", err.Error())
@ -202,7 +197,7 @@ func (s *storage) walkPack(verbose bool, packID int,
}
chunk, err := br.ReadSlice(']')
if err != nil {
if err == io.EOF {
if errors.Is(err, io.EOF) {
break
}
return errAt("error reading blob header", err.Error())
@ -245,7 +240,17 @@ func (s *storage) walkPack(verbose bool, packID int,
return errAt("", "cannot seek +"+strconv.FormatUint(size64, 10)+" bytes")
}
// drain the buffer after the underlying reader Seeks
io.CopyN(ioutil.Discard, br, int64(br.Buffered()))
_, _ = io.CopyN(ioutil.Discard, br, int64(br.Buffered()))
}
return nil
}
type verboseCtxKey struct{}
func ctxGetVerbose(ctx context.Context) bool {
b, _ := ctx.Value(verboseCtxKey{}).(bool)
return b
}
func CtxSetVerbose(ctx context.Context, verbose bool) context.Context {
return context.WithValue(ctx, verboseCtxKey{}, verbose)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package diskpacked
import (
"context"
"testing"
"perkeep.org/pkg/blob"
@ -52,7 +53,10 @@ func TestWalkPack(t *testing.T) {
return nil
}
if err := s.Walk(nil, walk); err != nil {
ctx, cancel := context.WithCancel(context.Background())
err := s.Walk(ctx, walk)
cancel()
if err != nil {
t.Fatal(err)
}

View File

@ -116,7 +116,10 @@ func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) {
writePack(t, dir, i, p)
}
if err := Reindex(dir, true, nil); err != nil {
ctx, cancel := context.WithCancel(context.Background())
err = Reindex(ctx, dir, true, nil)
cancel()
if err != nil {
t.Fatalf("Reindexing after writing pack files: %v", err)
}
s, err = newStorage(dir, 0, nil)
@ -136,11 +139,12 @@ func newTestStorage(t *testing.T, packs ...pack) (s *storage, clean func()) {
// before returning and fails the test if any of the checks fail. It
// also fails the test if StreamBlobs returns a non-nil error.
func streamAll(t *testing.T, s *storage) []*blob.Blob {
var blobs []*blob.Blob
ctx := context.TODO()
blobs := make([]*blob.Blob, 0, 1024)
ch := make(chan blobserver.BlobAndToken)
errCh := make(chan error, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() { errCh <- s.StreamBlobs(ctx, ch, "") }()
for bt := range ch {