mirror of https://github.com/perkeep/perkeep.git
make CachingFetcher also a StreamingFetcher
This commit is contained in:
parent
59e577c023
commit
b652dc62e8
|
@ -17,10 +17,11 @@ limitations under the License.
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
"camli/blobref"
|
"camli/blobref"
|
||||||
"camli/blobserver"
|
"camli/blobserver"
|
||||||
|
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func NewCachingFetcher(cacheTarget blobserver.Cache, sfetcher blobref.StreamingFetcher) blobref.SeekFetcher {
|
func NewCachingFetcher(cacheTarget blobserver.Cache, sfetcher blobref.StreamingFetcher) blobref.SeekFetcher {
|
||||||
|
@ -32,25 +33,33 @@ type CachingFetcher struct {
|
||||||
sf blobref.StreamingFetcher
|
sf blobref.StreamingFetcher
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ blobref.StreamingFetcher = (*CachingFetcher)(nil)
|
||||||
|
var _ blobref.SeekFetcher = (*CachingFetcher)(nil)
|
||||||
|
|
||||||
|
func (cf *CachingFetcher) FetchStreaming(br *blobref.BlobRef) (file io.ReadCloser, size int64, err os.Error) {
|
||||||
|
file, size, err = cf.c.Fetch(br)
|
||||||
|
if err == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
cf.faultIn(br)
|
||||||
|
return cf.c.Fetch(br)
|
||||||
|
}
|
||||||
|
|
||||||
func (cf *CachingFetcher) Fetch(br *blobref.BlobRef) (file blobref.ReadSeekCloser, size int64, err os.Error) {
|
func (cf *CachingFetcher) Fetch(br *blobref.BlobRef) (file blobref.ReadSeekCloser, size int64, err os.Error) {
|
||||||
file, size, err = cf.c.Fetch(br)
|
file, size, err = cf.c.Fetch(br)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
cf.faultIn(br)
|
||||||
|
return cf.c.Fetch(br)
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: let fetches some in real-time with stream in the
|
func (cf *CachingFetcher) faultIn(br *blobref.BlobRef) os.Error {
|
||||||
// common case, only blocking if a Seek-forward is encountered
|
sblob, _, err := cf.sf.FetchStreaming(br)
|
||||||
// mid-download. But for now we're lazy and first copy the
|
|
||||||
// whole thing to cache.
|
|
||||||
sblob, size, err := cf.sf.FetchStreaming(br)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, 0, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = cf.c.ReceiveBlob(br, sblob)
|
_, err = cf.c.ReceiveBlob(br, sblob)
|
||||||
if err != nil {
|
return err
|
||||||
return nil, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return cf.c.Fetch(br)
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,7 +75,7 @@ type BlobEnumerator interface {
|
||||||
|
|
||||||
// Cache is the minimal interface expected of a blob cache.
|
// Cache is the minimal interface expected of a blob cache.
|
||||||
type Cache interface {
|
type Cache interface {
|
||||||
blobref.SeekFetcher // TODO: change this to be just a normal StreamingFetcher
|
blobref.SeekFetcher
|
||||||
BlobReceiver
|
BlobReceiver
|
||||||
BlobStatter
|
BlobStatter
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue