localdisk, diskpacked, test.Fetcher: implement SubFetch

Change-Id: If9d09ab59a5bb73650c2668a8e0236d461287ef7
This commit is contained in:
Brad Fitzpatrick 2014-08-25 16:47:05 -07:00
parent aa9bc4785c
commit 31d6f81d12
3 changed files with 79 additions and 5 deletions

View File

@ -324,6 +324,19 @@ func (s *storage) Close() error {
}
func (s *storage) Fetch(br blob.Ref) (io.ReadCloser, uint32, error) {
return s.fetch(br, 0, -1)
}
func (s *storage) SubFetch(br blob.Ref, offset, length int64) (io.ReadCloser, error) {
if offset < 0 || length < 0 {
return nil, errors.New("invalid offset or length")
}
rc, _, err := s.fetch(br, offset, length)
return rc, err
}
// length of -1 means all
func (s *storage) fetch(br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
meta, err := s.meta(br)
if err != nil {
return nil, 0, err
@ -333,7 +346,19 @@ func (s *storage) Fetch(br blob.Ref) (io.ReadCloser, uint32, error) {
return nil, 0, fmt.Errorf("diskpacked: attempt to fetch blob from out of range pack file %d > %d", meta.file, len(s.fds))
}
rac := s.fds[meta.file]
var rs io.ReadSeeker = io.NewSectionReader(rac, meta.offset, int64(meta.size))
var rs io.ReadSeeker
if length == -1 {
// normal Fetch mode
rs = io.NewSectionReader(rac, meta.offset, int64(meta.size))
} else {
if offset > int64(meta.size) {
offset = int64(meta.size)
length = 0
} else if offset+length > int64(meta.size) {
length = int64(meta.size) - offset
}
rs = io.NewSectionReader(rac, meta.offset+offset, length)
}
fn := rac.Name()
// Ensure entry is in map.
readVar.Add(fn, 0)

View File

@ -31,6 +31,7 @@ Example low-level config:
package localdisk
import (
"errors"
"fmt"
"io"
"os"
@ -116,13 +117,26 @@ func (ds *DiskStorage) tryRemoveDir(dir string) {
os.Remove(dir) // ignore error
}
func (ds *DiskStorage) Fetch(blob blob.Ref) (io.ReadCloser, uint32, error) {
fileName := ds.blobPath(blob)
func (ds *DiskStorage) Fetch(br blob.Ref) (io.ReadCloser, uint32, error) {
return ds.fetch(br, 0, -1)
}
func (ds *DiskStorage) SubFetch(br blob.Ref, offset, length int64) (io.ReadCloser, error) {
if offset < 0 || length < 0 {
return nil, errors.New("invalid offset or length")
}
rc, _, err := ds.fetch(br, offset, length)
return rc, err
}
// length -1 means entire file
func (ds *DiskStorage) fetch(br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) {
fileName := ds.blobPath(br)
stat, err := os.Stat(fileName)
if os.IsNotExist(err) {
return nil, 0, os.ErrNotExist
}
size := types.U32(stat.Size())
size = types.U32(stat.Size())
file, err := os.Open(fileName)
if err != nil {
if os.IsNotExist(err) {
@ -130,7 +144,18 @@ func (ds *DiskStorage) Fetch(blob blob.Ref) (io.ReadCloser, uint32, error) {
}
return nil, 0, err
}
return file, size, nil
// normal Fetch:
if length < 0 {
return file, size, nil
}
// SubFetch:
return struct {
io.Reader
io.Closer
}{
io.NewSectionReader(file, offset, length),
file,
}, 0 /* unused */, err
}
func (ds *DiskStorage) RemoveBlobs(blobs []blob.Ref) error {

View File

@ -98,6 +98,30 @@ func (tf *Fetcher) Fetch(ref blob.Ref) (file io.ReadCloser, size uint32, err err
}, size, nil
}
func (tf *Fetcher) SubFetch(ref blob.Ref, offset, length int64) (io.ReadCloser, error) {
if tf.FetchErr != nil {
if err := tf.FetchErr(); err != nil {
return nil, err
}
}
tf.mu.RLock()
defer tf.mu.RUnlock()
tb, ok := tf.m[ref.String()]
if !ok {
return nil, os.ErrNotExist
}
atomic.AddInt64(&tf.blobsFetched, 1)
atomic.AddInt64(&tf.bytesFetched, length)
return struct {
*io.SectionReader
io.Closer
}{
io.NewSectionReader(strings.NewReader(tb.Contents), offset, int64(length)),
types.NopCloser,
}, nil
}
func (tf *Fetcher) BlobContents(br blob.Ref) (contents string, ok bool) {
tf.mu.RLock()
defer tf.mu.RUnlock()