diff --git a/pkg/blobserver/diskpacked/diskpacked.go b/pkg/blobserver/diskpacked/diskpacked.go index d6d204914..01ea73feb 100644 --- a/pkg/blobserver/diskpacked/diskpacked.go +++ b/pkg/blobserver/diskpacked/diskpacked.go @@ -324,6 +324,19 @@ func (s *storage) Close() error { } func (s *storage) Fetch(br blob.Ref) (io.ReadCloser, uint32, error) { + return s.fetch(br, 0, -1) +} + +func (s *storage) SubFetch(br blob.Ref, offset, length int64) (io.ReadCloser, error) { + if offset < 0 || length < 0 { + return nil, errors.New("invalid offset or length") + } + rc, _, err := s.fetch(br, offset, length) + return rc, err +} + +// length of -1 means all +func (s *storage) fetch(br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) { meta, err := s.meta(br) if err != nil { return nil, 0, err @@ -333,7 +346,19 @@ func (s *storage) Fetch(br blob.Ref) (io.ReadCloser, uint32, error) { return nil, 0, fmt.Errorf("diskpacked: attempt to fetch blob from out of range pack file %d > %d", meta.file, len(s.fds)) } rac := s.fds[meta.file] - var rs io.ReadSeeker = io.NewSectionReader(rac, meta.offset, int64(meta.size)) + var rs io.ReadSeeker + if length == -1 { + // normal Fetch mode + rs = io.NewSectionReader(rac, meta.offset, int64(meta.size)) + } else { + if offset > int64(meta.size) { + offset = int64(meta.size) + length = 0 + } else if offset+length > int64(meta.size) { + length = int64(meta.size) - offset + } + rs = io.NewSectionReader(rac, meta.offset+offset, length) + } fn := rac.Name() // Ensure entry is in map. readVar.Add(fn, 0) diff --git a/pkg/blobserver/localdisk/localdisk.go b/pkg/blobserver/localdisk/localdisk.go index c68172a3f..d11b469f1 100644 --- a/pkg/blobserver/localdisk/localdisk.go +++ b/pkg/blobserver/localdisk/localdisk.go @@ -31,6 +31,7 @@ Example low-level config: package localdisk import ( + "errors" "fmt" "io" "os" @@ -116,13 +117,26 @@ func (ds *DiskStorage) tryRemoveDir(dir string) { os.Remove(dir) // ignore error } -func (ds *DiskStorage) Fetch(blob blob.Ref) (io.ReadCloser, uint32, error) { - fileName := ds.blobPath(blob) +func (ds *DiskStorage) Fetch(br blob.Ref) (io.ReadCloser, uint32, error) { + return ds.fetch(br, 0, -1) +} + +func (ds *DiskStorage) SubFetch(br blob.Ref, offset, length int64) (io.ReadCloser, error) { + if offset < 0 || length < 0 { + return nil, errors.New("invalid offset or length") + } + rc, _, err := ds.fetch(br, offset, length) + return rc, err +} + +// length -1 means entire file +func (ds *DiskStorage) fetch(br blob.Ref, offset, length int64) (rc io.ReadCloser, size uint32, err error) { + fileName := ds.blobPath(br) stat, err := os.Stat(fileName) if os.IsNotExist(err) { return nil, 0, os.ErrNotExist } - size := types.U32(stat.Size()) + size = types.U32(stat.Size()) file, err := os.Open(fileName) if err != nil { if os.IsNotExist(err) { @@ -130,7 +144,18 @@ func (ds *DiskStorage) Fetch(blob blob.Ref) (io.ReadCloser, uint32, error) { } return nil, 0, err } - return file, size, nil + // normal Fetch: + if length < 0 { + return file, size, nil + } + // SubFetch: + return struct { + io.Reader + io.Closer + }{ + io.NewSectionReader(file, offset, length), + file, + }, 0 /* unused */, err } func (ds *DiskStorage) RemoveBlobs(blobs []blob.Ref) error { diff --git a/pkg/test/fetcher.go b/pkg/test/fetcher.go index d49fed1de..97a4c9ec4 100644 --- a/pkg/test/fetcher.go +++ b/pkg/test/fetcher.go @@ -98,6 +98,30 @@ func (tf *Fetcher) Fetch(ref blob.Ref) (file io.ReadCloser, size uint32, err err }, size, nil } +func (tf *Fetcher) SubFetch(ref blob.Ref, offset, length int64) (io.ReadCloser, error) { + if tf.FetchErr != nil { + if err := tf.FetchErr(); err != nil { + return nil, err + } + } + tf.mu.RLock() + defer tf.mu.RUnlock() + tb, ok := tf.m[ref.String()] + if !ok { + return nil, os.ErrNotExist + } + atomic.AddInt64(&tf.blobsFetched, 1) + atomic.AddInt64(&tf.bytesFetched, length) + + return struct { + *io.SectionReader + io.Closer + }{ + io.NewSectionReader(strings.NewReader(tb.Contents), offset, int64(length)), + types.NopCloser, + }, nil +} + func (tf *Fetcher) BlobContents(br blob.Ref) (contents string, ok bool) { tf.mu.RLock() defer tf.mu.RUnlock()