index: move seekFetcherMissTracker up a layer

In prep for missing blob dependency rescheduling in indexer.

Change-Id: I1d492e6aa64cfb658daec17e4621d1453c6d3607
This commit is contained in:
Brad Fitzpatrick 2014-03-14 09:14:44 -07:00
parent bf6031a397
commit bf01b14961
1 changed files with 36 additions and 29 deletions

View File

@ -93,8 +93,22 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz
sniffer.Parse()
mm, err := ix.populateMutationMap(blobRef, sniffer)
fetcher := &seekFetcherMissTracker{
fetcher: ix.BlobSource,
seeker: blob.SeekerFromStreamingFetcher(ix.BlobSource),
}
mm, err := ix.populateMutationMap(fetcher, blobRef, sniffer)
if err != nil {
fetcher.mu.Lock()
defer fetcher.mu.Unlock()
if len(fetcher.missing) == 0 {
return
}
// TODO(bradfitz): there was an error indexing this file, and
// we failed to load the blobs in f.missing. Add those as dependencies
// somewhere so when we get one of those missing blobs, we kick off
// a re-index of this file for whenever the indexer is idle.
return
}
@ -148,7 +162,7 @@ func (ix *Index) commit(mm *mutationMap) error {
//
// the blobref can be trusted at this point (it's been fully consumed
// and verified to match), and the sniffer has been populated.
func (ix *Index) populateMutationMap(br blob.Ref, sniffer *BlobSniffer) (*mutationMap, error) {
func (ix *Index) populateMutationMap(fetcher *seekFetcherMissTracker, br blob.Ref, sniffer *BlobSniffer) (*mutationMap, error) {
// TODO(mpl): shouldn't we remove these two from the map (so they don't get committed) when
// e.g in populateClaim we detect a bogus claim (which does not yield an error)?
mm := &mutationMap{
@ -165,11 +179,11 @@ func (ix *Index) populateMutationMap(br blob.Ref, sniffer *BlobSniffer) (*mutati
return nil, err
}
case "file":
if err := ix.populateFile(blob, mm); err != nil {
if err := ix.populateFile(fetcher, blob, mm); err != nil {
return nil, err
}
case "directory":
if err := ix.populateDir(blob, mm); err != nil {
if err := ix.populateDir(fetcher, blob, mm); err != nil {
return nil, err
}
}
@ -196,13 +210,25 @@ func (w *keepFirstN) Write(p []byte) (n int, err error) {
// seekFetcherMissTracker is a blob.SeekFetcher that records which blob(s) it failed
// to load from src.
type seekFetcherMissTracker struct {
src blob.SeekFetcher
fetcher blob.StreamingFetcher
seeker blob.SeekFetcher // of fetcher. will be deleted when SeekFetcher is globally killed
mu sync.Mutex // guards missing
missing []blob.Ref
}
func (f *seekFetcherMissTracker) Fetch(br blob.Ref) (blob types.ReadSeekCloser, size uint32, err error) {
blob, size, err = f.src.Fetch(br)
blob, size, err = f.seeker.Fetch(br)
if err == os.ErrNotExist {
f.mu.Lock()
defer f.mu.Unlock()
f.missing = append(f.missing, br)
}
return
}
func (f *seekFetcherMissTracker) FetchStreaming(br blob.Ref) (blob io.ReadCloser, size uint32, err error) {
blob, size, err = f.fetcher.FetchStreaming(br)
if err == os.ErrNotExist {
f.mu.Lock()
defer f.mu.Unlock()
@ -213,30 +239,11 @@ func (f *seekFetcherMissTracker) Fetch(br blob.Ref) (blob types.ReadSeekCloser,
// b: the parsed file schema blob
// mm: keys to populate
func (ix *Index) populateFile(b *schema.Blob, mm *mutationMap) (err error) {
func (ix *Index) populateFile(fetcher blob.SeekFetcher, b *schema.Blob, mm *mutationMap) (err error) {
var times []time.Time // all creation or mod times seen; may be zero
times = append(times, b.ModTime())
blobRef := b.BlobRef()
fetcher := &seekFetcherMissTracker{
// TODO(bradfitz): cache this SeekFetcher on ix so it
// it's have to be re-made each time? Probably small.
src: blob.SeekerFromStreamingFetcher(ix.BlobSource),
}
defer func() {
if err == nil {
return
}
fetcher.mu.Lock()
defer fetcher.mu.Unlock()
if len(fetcher.missing) == 0 {
return
}
// TODO(bradfitz): there was an error indexing this file, and
// we failed to load the blobs in f.missing. Add those as dependencies
// somewhere so when we get one of those missing blobs, we kick off
// a re-index of this file for whenever the indexer is idle.
}()
fr, err := b.NewFileReader(fetcher)
if err != nil {
// TODO(bradfitz): propagate up a transient failure
@ -500,12 +507,12 @@ func indexMusic(r types.SizeReaderAt, wholeRef blob.Ref, mm *mutationMap) {
// b: the parsed file schema blob
// mm: keys to populate
func (ix *Index) populateDir(b *schema.Blob, mm *mutationMap) error {
func (ix *Index) populateDir(fetcher blob.SeekFetcher, b *schema.Blob, mm *mutationMap) error {
blobRef := b.BlobRef()
// TODO(bradfitz): move the NewDirReader and FileName method off *schema.Blob and onto
// StaticFile/StaticDirectory or something.
seekFetcher := blob.SeekerFromStreamingFetcher(ix.BlobSource)
dr, err := b.NewDirReader(seekFetcher)
dr, err := b.NewDirReader(fetcher)
if err != nil {
// TODO(bradfitz): propagate up a transient failure
// error type, so we can retry indexing files in the