diff --git a/pkg/index/receive.go b/pkg/index/receive.go index 0eb390dfb..3ecb71a95 100644 --- a/pkg/index/receive.go +++ b/pkg/index/receive.go @@ -93,8 +93,22 @@ func (ix *Index) ReceiveBlob(blobRef blob.Ref, source io.Reader) (retsb blob.Siz sniffer.Parse() - mm, err := ix.populateMutationMap(blobRef, sniffer) + fetcher := &seekFetcherMissTracker{ + fetcher: ix.BlobSource, + seeker: blob.SeekerFromStreamingFetcher(ix.BlobSource), + } + + mm, err := ix.populateMutationMap(fetcher, blobRef, sniffer) if err != nil { + fetcher.mu.Lock() + defer fetcher.mu.Unlock() + if len(fetcher.missing) == 0 { + return + } + // TODO(bradfitz): there was an error indexing this file, and + // we failed to load the blobs in f.missing. Add those as dependencies + // somewhere so when we get one of those missing blobs, we kick off + // a re-index of this file for whenever the indexer is idle. return } @@ -148,7 +162,7 @@ func (ix *Index) commit(mm *mutationMap) error { // // the blobref can be trusted at this point (it's been fully consumed // and verified to match), and the sniffer has been populated. -func (ix *Index) populateMutationMap(br blob.Ref, sniffer *BlobSniffer) (*mutationMap, error) { +func (ix *Index) populateMutationMap(fetcher *seekFetcherMissTracker, br blob.Ref, sniffer *BlobSniffer) (*mutationMap, error) { // TODO(mpl): shouldn't we remove these two from the map (so they don't get committed) when // e.g in populateClaim we detect a bogus claim (which does not yield an error)? mm := &mutationMap{ @@ -165,11 +179,11 @@ func (ix *Index) populateMutationMap(br blob.Ref, sniffer *BlobSniffer) (*mutati return nil, err } case "file": - if err := ix.populateFile(blob, mm); err != nil { + if err := ix.populateFile(fetcher, blob, mm); err != nil { return nil, err } case "directory": - if err := ix.populateDir(blob, mm); err != nil { + if err := ix.populateDir(fetcher, blob, mm); err != nil { return nil, err } } @@ -196,13 +210,25 @@ func (w *keepFirstN) Write(p []byte) (n int, err error) { // seekFetcherMissTracker is a blob.SeekFetcher that records which blob(s) it failed // to load from src. type seekFetcherMissTracker struct { - src blob.SeekFetcher + fetcher blob.StreamingFetcher + seeker blob.SeekFetcher // of fetcher. will be deleted when SeekFetcher is globally killed + mu sync.Mutex // guards missing missing []blob.Ref } func (f *seekFetcherMissTracker) Fetch(br blob.Ref) (blob types.ReadSeekCloser, size uint32, err error) { - blob, size, err = f.src.Fetch(br) + blob, size, err = f.seeker.Fetch(br) + if err == os.ErrNotExist { + f.mu.Lock() + defer f.mu.Unlock() + f.missing = append(f.missing, br) + } + return +} + +func (f *seekFetcherMissTracker) FetchStreaming(br blob.Ref) (blob io.ReadCloser, size uint32, err error) { + blob, size, err = f.fetcher.FetchStreaming(br) if err == os.ErrNotExist { f.mu.Lock() defer f.mu.Unlock() @@ -213,30 +239,11 @@ func (f *seekFetcherMissTracker) Fetch(br blob.Ref) (blob types.ReadSeekCloser, // b: the parsed file schema blob // mm: keys to populate -func (ix *Index) populateFile(b *schema.Blob, mm *mutationMap) (err error) { +func (ix *Index) populateFile(fetcher blob.SeekFetcher, b *schema.Blob, mm *mutationMap) (err error) { var times []time.Time // all creation or mod times seen; may be zero times = append(times, b.ModTime()) blobRef := b.BlobRef() - fetcher := &seekFetcherMissTracker{ - // TODO(bradfitz): cache this SeekFetcher on ix so it - // it's have to be re-made each time? Probably small. - src: blob.SeekerFromStreamingFetcher(ix.BlobSource), - } - defer func() { - if err == nil { - return - } - fetcher.mu.Lock() - defer fetcher.mu.Unlock() - if len(fetcher.missing) == 0 { - return - } - // TODO(bradfitz): there was an error indexing this file, and - // we failed to load the blobs in f.missing. Add those as dependencies - // somewhere so when we get one of those missing blobs, we kick off - // a re-index of this file for whenever the indexer is idle. - }() fr, err := b.NewFileReader(fetcher) if err != nil { // TODO(bradfitz): propagate up a transient failure @@ -500,12 +507,12 @@ func indexMusic(r types.SizeReaderAt, wholeRef blob.Ref, mm *mutationMap) { // b: the parsed file schema blob // mm: keys to populate -func (ix *Index) populateDir(b *schema.Blob, mm *mutationMap) error { +func (ix *Index) populateDir(fetcher blob.SeekFetcher, b *schema.Blob, mm *mutationMap) error { blobRef := b.BlobRef() // TODO(bradfitz): move the NewDirReader and FileName method off *schema.Blob and onto + // StaticFile/StaticDirectory or something. - seekFetcher := blob.SeekerFromStreamingFetcher(ix.BlobSource) - dr, err := b.NewDirReader(seekFetcher) + dr, err := b.NewDirReader(fetcher) if err != nil { // TODO(bradfitz): propagate up a transient failure // error type, so we can retry indexing files in the