camput: Preparation for vivify.

Refactor writeFileMapRolling so we can write the chunks,
then the top level contents, separately.

Change-Id: Ib56cd4b2aa295516fafbe7072ad5b352d1aaaa89
This commit is contained in:
mpl 2012-12-20 18:34:14 +01:00
parent f59fe1d535
commit 6cb7ecd644
5 changed files with 155 additions and 88 deletions

View File

@ -44,7 +44,7 @@ type fileCmd struct {
makePermanode bool // make new, unique permanode of the root (dir or file)
filePermanodes bool // make planned permanodes for each file (based on their digest)
rollSplits bool
vivify bool
diskUsage bool // show "du" disk usage only (dry run mode), don't actually upload
havecache, statcache bool
@ -59,6 +59,7 @@ func init() {
cmd := new(fileCmd)
flags.BoolVar(&cmd.makePermanode, "permanode", false, "Create an associate a new permanode for the uploaded file or directory.")
flags.BoolVar(&cmd.filePermanodes, "filenodes", false, "Create (if necessary) content-based permanodes for each uploaded file.")
flags.BoolVar(&cmd.vivify, "vivify", false, "Ask the server to vivify that file for us.")
flags.StringVar(&cmd.name, "name", "", "Optional name attribute to set on permanode when using -permanode.")
flags.StringVar(&cmd.tag, "tag", "", "Optional tag(s) to set on permanode when using -permanode or -filenodes. Single value or comma separated.")
@ -112,7 +113,7 @@ func (c *fileCmd) RunCommand(up *Uploader, args []string) error {
cache := NewFlatHaveCache()
up.haveCache = cache
}
up.fileOpts = &fileOptions{permanode: c.filePermanodes, tag: c.tag}
up.fileOpts = &fileOptions{permanode: c.filePermanodes, tag: c.tag, vivify: c.vivify}
var (
permaNode *client.PutResult
@ -367,6 +368,26 @@ func (up *Uploader) uploadNodeRegularFile(n *node) (*client.PutResult, error) {
if up.fileOpts.wantFilePermanode() {
fileContents = &trackDigestReader{r: fileContents}
}
if up.fileOpts.wantVivify() {
err := schema.WriteFileChunks(up.statReceiver(), m, fileContents)
if err != nil {
return nil, err
}
json, err := m.JSON()
if err != nil {
return nil, err
}
blobref := blobref.SHA1FromString(json)
h := &client.UploadHandle{
BlobRef: blobref,
Size: int64(len(json)),
Contents: strings.NewReader(json),
Vivify: true,
}
return up.Upload(h)
}
blobref, err := schema.WriteFileMap(up.statReceiver(), m, fileContents)
if err != nil {
return nil, err

View File

@ -63,6 +63,7 @@ type fileOptions struct {
// tag is an optional tag or comma-delimited tags to apply to
// the above permanode.
tag string
vivify bool
}
func (o *fileOptions) tags() []string {
@ -76,6 +77,10 @@ func (o *fileOptions) wantFilePermanode() bool {
return o != nil && o.permanode
}
func (o *fileOptions) wantVivify() bool {
return o != nil && o.vivify
}
// sigTime optionally specifies the signature time.
// If zero, the current time is used.
func (up *Uploader) SignMap(m schema.Map, sigTime time.Time) (string, error) {

View File

@ -148,6 +148,10 @@ func handleMultiPartUpload(conn http.ResponseWriter, req *http.Request, blobRece
receivedBlobs = append(receivedBlobs, blobGot)
}
if req.Header.Get("X-Camlistore-Vivify") == "1" {
// TODO(mpl)
}
ret, err := commonUploadResponse(blobReceiver, req)
if err != nil {
httputil.ServerError(conn, req, err)

View File

@ -44,6 +44,7 @@ type UploadHandle struct {
BlobRef *blobref.BlobRef
Size int64 // or -1 if size isn't known
Contents io.Reader
Vivify bool
}
type PutResult struct {
@ -325,6 +326,9 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, error) {
req = c.newRequest("POST", stat.uploadUrl)
req.Header.Set("Content-Type", multipartWriter.FormDataContentType())
if h.Vivify {
req.Header.Add("X-Camlistore-Vivify", "1")
}
req.Body = ioutil.NopCloser(pipeReader)
req.ContentLength = multipartOverhead + bodySize + int64(len(blobrefStr))*2
req.TransferEncoding = nil

View File

@ -183,38 +183,134 @@ func (r *noteEOFReader) Read(p []byte) (n int, err error) {
return
}
func uploadString(bs blobserver.StatReceiver, s string) (*blobref.BlobRef, error) {
br := blobref.SHA1FromString(s)
hasIt, err := serverHasBlob(bs, br)
if err != nil {
return nil, err
}
if hasIt {
return br, nil
}
_, err = bs.ReceiveBlob(br, strings.NewReader(s))
if err != nil {
return nil, err
}
return br, nil
}
// uploadBytes gets a map from mapSource (of type either "bytes" or
// "file", which is a superset of "bytes"), sets it to the provided
// size, and populates with provided spans. The bytes or file schema
// blob is uploaded and its blobref is returned.
func uploadBytes(bs blobserver.StatReceiver, mapSource func() Map, size int64, s []span) (*blobref.BlobRef, error) {
parts := []BytesPart{}
err := addBytesParts(bs, &parts, s)
if err != nil {
return nil, err
}
m := mapSource()
err = PopulateParts(m, size, parts)
if err != nil {
return nil, err
}
json, err := m.JSON()
if err != nil {
return nil, err
}
return uploadString(bs, json)
}
// addBytesParts uploads the provided spans to bs, appending elements to *dst.
func addBytesParts(bs blobserver.StatReceiver, dst *[]BytesPart, spans []span) error {
for _, sp := range spans {
if len(sp.children) == 1 && sp.children[0].isSingleBlob() {
// Remove an occasional useless indirection of
// what would become a bytes schema blob
// pointing to a single blobref. Just promote
// the blobref child instead.
child := sp.children[0]
*dst = append(*dst, BytesPart{
BlobRef: child.br,
Size: uint64(child.size()),
})
sp.children = nil
}
if len(sp.children) > 0 {
childrenSize := int64(0)
for _, cs := range sp.children {
childrenSize += cs.size()
}
br, err := uploadBytes(bs, newBytes, childrenSize, sp.children)
if err != nil {
return err
}
*dst = append(*dst, BytesPart{
BytesRef: br,
Size: uint64(childrenSize),
})
}
if sp.from == sp.to {
panic("Shouldn't happen. " + fmt.Sprintf("weird span with same from & to: %#v", sp))
}
*dst = append(*dst, BytesPart{
BlobRef: sp.br,
Size: uint64(sp.to - sp.from),
})
}
return nil
}
// writeFileMap uploads chunks of r to bs while populating fileMap and
// finally uploading fileMap. The returned blobref is of fileMap's
// JSON blob. It uses rolling checksum for the chunks sizes.
func writeFileMapRolling(bs blobserver.StatReceiver, fileMap Map, r io.Reader) (outbr *blobref.BlobRef, outerr error) {
rootFile := func() Map { return fileMap }
n, spans, err := writeFileChunks(bs, fileMap, r)
if err != nil {
return nil, err
}
// The top-level content parts
return uploadBytes(bs, rootFile, n, spans)
}
// WriteFileChunks uploads chunks of r to bs while populating fileMap.
// It does not upload fileMap.
func WriteFileChunks(bs blobserver.StatReceiver, fileMap Map, r io.Reader) error {
rootFile := func() Map { return fileMap }
n, spans, err := writeFileChunks(bs, fileMap, r)
if err != nil {
return err
}
topLevel := func(mapSource func() Map, size int64, s []span) error {
parts := []BytesPart{}
err := addBytesParts(bs, &parts, s)
if err != nil {
return err
}
m := mapSource()
err = PopulateParts(m, size, parts)
return err
}
// The top-level content parts
return topLevel(rootFile, n, spans)
}
func writeFileChunks(bs blobserver.StatReceiver, fileMap Map, r io.Reader) (n int64, spans []span, outerr error) {
src := &noteEOFReader{r: r}
blobSize := 0 // of the next blob being built, should be same as buf.Len()
bufr := bufio.NewReaderSize(src, bufioReaderSize)
spans := []span{} // the tree of spans, cut on interesting rollsum boundaries
spans = []span{} // the tree of spans, cut on interesting rollsum boundaries
rs := rollsum.New()
n := int64(0)
last := n
buf := new(bytes.Buffer)
rootFile := func() Map { return fileMap }
uploadString := func(s string) (*blobref.BlobRef, error) {
br := blobref.SHA1FromString(s)
hasIt, err := serverHasBlob(bs, br)
if err != nil {
return nil, err
}
if hasIt {
return br, nil
}
_, err = bs.ReceiveBlob(br, strings.NewReader(s))
if err != nil {
return nil, err
}
return br, nil
}
// TODO: keep multiple of these in-flight at a time.
uploadLastSpan := func() bool {
defer buf.Reset()
br, err := uploadString(buf.String())
br, err := uploadString(bs, buf.String())
if err != nil {
outerr = err
return false
@ -235,7 +331,7 @@ func writeFileMapRolling(bs blobserver.StatReceiver, fileMap Map, r io.Reader) (
break
}
if err != nil {
return nil, err
return 0, nil, err
}
buf.WriteByte(c)
@ -282,69 +378,6 @@ func writeFileMapRolling(bs blobserver.StatReceiver, fileMap Map, r io.Reader) (
}
}
var addBytesParts func(dst *[]BytesPart, s []span) error
return n, spans, nil
// uploadBytes gets a map from mapSource (of type either "bytes" or
// "file", which is a superset of "bytes"), sets it to the provided
// size, and populates with provided spans. The bytes or file schema
// blob is uploaded and its blobref is returned.
uploadBytes := func(mapSource func() Map, size int64, s []span) (*blobref.BlobRef, error) {
parts := []BytesPart{}
err := addBytesParts(&parts, s)
if err != nil {
return nil, err
}
m := mapSource()
err = PopulateParts(m, size, parts)
if err != nil {
return nil, err
}
json, err := m.JSON()
if err != nil {
return nil, err
}
return uploadString(json)
}
addBytesParts = func(dst *[]BytesPart, spansl []span) error {
for _, sp := range spansl {
if len(sp.children) == 1 && sp.children[0].isSingleBlob() {
// Remove an occasional useless indirection of
// what would become a bytes schema blob
// pointing to a single blobref. Just promote
// the blobref child instead.
child := sp.children[0]
*dst = append(*dst, BytesPart{
BlobRef: child.br,
Size: uint64(child.size()),
})
sp.children = nil
}
if len(sp.children) > 0 {
childrenSize := int64(0)
for _, cs := range sp.children {
childrenSize += cs.size()
}
br, err := uploadBytes(newBytes, childrenSize, sp.children)
if err != nil {
return err
}
*dst = append(*dst, BytesPart{
BytesRef: br,
Size: uint64(childrenSize),
})
}
if sp.from == sp.to {
panic("Shouldn't happen. " + fmt.Sprintf("weird span with same from & to: %#v", sp))
}
*dst = append(*dst, BytesPart{
BlobRef: sp.br,
Size: uint64(sp.to - sp.from),
})
}
return nil
}
// The top-level content parts
return uploadBytes(rootFile, n, spans)
}