camget: finish --shared fetching support

Change-Id: I32edf63e01068a0e96f3255ba4d1313682cf03c4
This commit is contained in:
Brad Fitzpatrick 2013-01-02 14:50:52 -08:00
parent 827feaa3ac
commit 8e44c62047
3 changed files with 66 additions and 7 deletions

View File

@ -26,8 +26,9 @@ import (
// Pattern is the regular expression which matches a blobref.
// It does not contain ^ or $.
const Pattern = `\b([a-z0-9]+)-([a-f0-9]+)\b`
const Pattern = `\b([a-z][a-z0-9]*)-([a-f0-9]+)\b`
// whole blobref pattern
var kBlobRefPattern = regexp.MustCompile("^" + Pattern + "$")
var supportedDigests = map[string]func() hash.Hash{

View File

@ -22,6 +22,7 @@ import (
"fmt"
"io"
"log"
"regexp"
"camlistore.org/pkg/blobref"
"camlistore.org/pkg/readerutil"
@ -40,9 +41,32 @@ func (c *Client) FetchMap(b *blobref.BlobRef) (schema.Map, error) {
}
func (c *Client) FetchStreaming(b *blobref.BlobRef) (io.ReadCloser, int64, error) {
return c.FetchVia(b, nil)
return c.FetchVia(b, c.viaPathTo(b))
}
func (c *Client) viaPathTo(b *blobref.BlobRef) (path []*blobref.BlobRef) {
if c.via == nil {
return nil
}
it := b.String()
// Append path backwards first,
for {
v := c.via[it]
if v == "" {
break
}
path = append(path, blobref.MustParse(v))
it = v
}
// Then reverse it
for i := 0; i < len(path)/2; i++ {
path[i], path[len(path)-i-1] = path[len(path)-i-1], path[i]
}
return
}
var blobsRx = regexp.MustCompile(blobref.Pattern)
func (c *Client) FetchVia(b *blobref.BlobRef, v []*blobref.BlobRef) (io.ReadCloser, int64, error) {
pfx, err := c.prefix()
if err != nil {
@ -77,7 +101,30 @@ func (c *Client) FetchVia(b *blobref.BlobRef, v []*blobref.BlobRef) (io.ReadClos
return nil, 0, errors.New("blobserver didn't return a Content-Length for blob")
}
return resp.Body, size, nil
if c.via == nil {
// Not in sharing mode, so return immediately.
return resp.Body, size, nil
}
// Slurp 1 MB to find references to other blobrefs for the via path.
const maxSlurp = 1 << 20
var buf bytes.Buffer
_, err = io.Copy(&buf, io.LimitReader(resp.Body, maxSlurp))
if err != nil {
return nil, 0, err
}
// If it looks like a JSON schema blob (starts with '{')
if schema.LikelySchemaBlob(buf.Bytes()) {
for _, blobstr := range blobsRx.FindAllString(buf.String(), -1) {
c.via[blobstr] = b.String()
}
}
// Read from the multireader, but close the HTTP response body.
type rc struct {
io.Reader
io.Closer
}
return rc{io.MultiReader(&buf, resp.Body), resp.Body}, size, nil
}
func (c *Client) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (blobref.SizedBlobRef, error) {

View File

@ -265,18 +265,18 @@ type Superset struct {
Members []string `json:"members"` // for static sets (for directory static-sets: blobrefs to child dirs/files)
// Target is a "share" blob's target (the thing being shared)
Target *blobref.BlobRef
Target *blobref.BlobRef `json:"target"`
// Transitive is a property of a "share" blob.
Transitive bool
Transitive bool `json:"transitive"`
// AuthType is a "share" blob's authentication type that is required.
// Currently (2013-01-02) just "haveref" (if you know the share's blobref,
// you get access: the secret URL model)
AuthType string
AuthType string `json:"authType"`
}
func ParseSuperset(r io.Reader) (*Superset, error) {
var ss Superset
return &ss, json.NewDecoder(io.LimitReader(r, 1 << 20)).Decode(&ss)
return &ss, json.NewDecoder(io.LimitReader(r, 1<<20)).Decode(&ss)
}
// BytesPart is the type representing one of the "parts" in a "file"
@ -667,3 +667,14 @@ func RFC3339FromTime(t time.Time) string {
}
return t.UTC().Format(time.RFC3339Nano)
}
var bytesCamliVersion = []byte("camliVersion")
// LikelySchemaBlob returns quickly whether buf likely contains (or is
// the prefix of) a schema blob.
func LikelySchemaBlob(buf []byte) bool {
if len(buf) == 0 || buf[0] != '{' {
return false
}
return bytes.Contains(buf, bytesCamliVersion)
}