schema filewriter: rollsum chunking version

This commit is contained in:
Brad Fitzpatrick 2011-06-06 08:54:31 -07:00
parent e0b1723538
commit fac6b7f020
2 changed files with 171 additions and 12 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package schema
import (
"bufio"
"bytes"
"crypto"
"fmt"
@ -27,6 +28,7 @@ import (
"camli/blobref"
"camli/blobserver"
"camli/rollsum"
)
var _ = log.Printf
@ -110,5 +112,154 @@ func serverHasBlob(bs blobserver.Storage, br *blobref.BlobRef) (have bool, err o
have = true
}
return
}
type span struct {
from, to int64
bits int
br *blobref.BlobRef
children []span
}
func (s *span) size() int64 {
size := s.to - s.from
for _, cs := range s.children {
size += cs.size()
}
return size
}
// WriteFileFromReaderRolling creates and uploads a "file" JSON schema
// composed of chunks of r, also uploading the chunks. The returned
// BlobRef is of the JSON file schema blob.
func WriteFileFromReaderRolling(bs blobserver.Storage, filename string, r io.Reader) (outbr *blobref.BlobRef, outerr os.Error) {
bufr := bufio.NewReader(r)
spans := []span{} // the tree of spans, cut on interesting rollsum boundaries
rs := rollsum.New()
n := int64(0)
last := n
buf := new(bytes.Buffer)
uploadString := func(s string) (*blobref.BlobRef, os.Error) {
br := blobref.Sha1FromString(s)
hasIt, err := serverHasBlob(bs, br)
if err != nil {
return nil, err
}
if hasIt {
return br, nil
}
_, err = bs.ReceiveBlob(br, strings.NewReader(s))
if err != nil {
return nil, err
}
return br, nil
}
// TODO: keep multiple of these in-flight at a time.
uploadLastSpan := func() bool {
defer buf.Reset()
br, err := uploadString(buf.String())
if err != nil {
outerr = err
return false
}
spans[len(spans)-1].br = br
return true
}
for {
c, err := bufr.ReadByte()
if err == os.EOF {
if n != last {
spans = append(spans, span{from: last, to: n})
if !uploadLastSpan() {
return
}
}
break
}
if err != nil {
return nil, err
}
buf.WriteByte(c)
n++
rs.Roll(c)
if !rs.OnSplit() {
continue
}
bits := rs.Bits()
// Take any spans from the end of the spans slice that
// have a smaller 'bits' score and make them children
// of this node.
var children []span
childrenFrom := len(spans)
for childrenFrom > 0 && spans[childrenFrom-1].bits < bits {
childrenFrom--
}
if nCopy := len(spans) - childrenFrom; nCopy > 0 {
children = make([]span, nCopy)
copy(children, spans[childrenFrom:])
spans = spans[:childrenFrom]
}
spans = append(spans, span{from: last, to: n, bits: bits, children: children})
last = n
if !uploadLastSpan() {
return
}
}
var addContentParts func(dst *[]ContentPart, s []span) os.Error
uploadFile := func(filename string, fileSize int64, s []span) (*blobref.BlobRef, os.Error) {
parts := []ContentPart{}
err := addContentParts(&parts, s)
if err != nil {
return nil, err
}
m := NewCommonFilenameMap(filename)
err = PopulateRegularFileMap(m, fileSize, parts)
if err != nil {
return nil, err
}
json, err := MapToCamliJson(m)
if err != nil {
return nil, err
}
return uploadString(json)
}
addContentParts = func(dst *[]ContentPart, spansl []span) os.Error {
for _, sp := range spansl {
if len(sp.children) > 0 {
childrenSize := int64(0)
for _, cs := range sp.children {
childrenSize += cs.size()
}
br, err := uploadFile("", childrenSize, sp.children)
if err != nil {
return err
}
*dst = append(*dst, ContentPart{
SubBlobRefString: br.String(),
SubBlobRef: br,
Size: uint64(childrenSize),
})
}
if sp.from != sp.to {
*dst = append(*dst, ContentPart{
BlobRefString: sp.br.String(),
BlobRef: sp.br,
Size: uint64(sp.to - sp.from),
})
}
}
return nil
}
// The top-level content parts
return uploadFile(filename, n, spans)
}

View File

@ -90,10 +90,12 @@ type Superset struct {
}
type ContentPart struct {
BlobRefString string "blobRef"
BlobRef *blobref.BlobRef // TODO: ditch BlobRefString? use json.Unmarshaler?
Size uint64 "size"
Offset uint64 "offset"
BlobRefString string "blobRef"
BlobRef *blobref.BlobRef // TODO: ditch BlobRefString? use json.Unmarshaler?
SubBlobRefString string "subFileBlobRef"
SubBlobRef *blobref.BlobRef
Size uint64 "size"
Offset uint64 "offset"
}
func (cp *ContentPart) blobref() *blobref.BlobRef {
@ -242,12 +244,14 @@ func MapToCamliJson(m map[string]interface{}) (string, os.Error) {
func NewCommonFilenameMap(fileName string) map[string]interface{} {
m := newCamliMap(1, "" /* no type yet */ )
lastSlash := strings.LastIndex(fileName, "/")
baseName := fileName[lastSlash+1:]
if isValidUtf8(baseName) {
m["fileName"] = baseName
} else {
m["fileNameBytes"] = []uint8(baseName)
if fileName != "" {
lastSlash := strings.LastIndex(fileName, "/")
baseName := fileName[lastSlash+1:]
if isValidUtf8(baseName) {
m["fileName"] = baseName
} else {
m["fileNameBytes"] = []uint8(baseName)
}
}
return m
}
@ -299,7 +303,11 @@ func PopulateRegularFileMap(m map[string]interface{}, size int64, parts []Conten
for idx, part := range parts {
mpart := make(map[string]interface{})
mparts[idx] = mpart
mpart["blobRef"] = part.BlobRef.String()
if part.BlobRef != nil {
mpart["blobRef"] = part.BlobRef.String()
} else if part.SubBlobRef != nil {
mpart["subFileBlobRef"] = part.SubBlobRef.String()
}
mpart["size"] = part.Size
sumSize += part.Size
if part.Offset != 0 {