diff --git a/lib/go/camli/schema/filewriter.go b/lib/go/camli/schema/filewriter.go index f96af3752..4126513f2 100644 --- a/lib/go/camli/schema/filewriter.go +++ b/lib/go/camli/schema/filewriter.go @@ -17,6 +17,7 @@ limitations under the License. package schema import ( + "bufio" "bytes" "crypto" "fmt" @@ -27,6 +28,7 @@ import ( "camli/blobref" "camli/blobserver" + "camli/rollsum" ) var _ = log.Printf @@ -110,5 +112,154 @@ func serverHasBlob(bs blobserver.Storage, br *blobref.BlobRef) (have bool, err o have = true } return - +} + +type span struct { + from, to int64 + bits int + br *blobref.BlobRef + children []span +} + +func (s *span) size() int64 { + size := s.to - s.from + for _, cs := range s.children { + size += cs.size() + } + return size +} + +// WriteFileFromReaderRolling creates and uploads a "file" JSON schema +// composed of chunks of r, also uploading the chunks. The returned +// BlobRef is of the JSON file schema blob. +func WriteFileFromReaderRolling(bs blobserver.Storage, filename string, r io.Reader) (outbr *blobref.BlobRef, outerr os.Error) { + bufr := bufio.NewReader(r) + spans := []span{} // the tree of spans, cut on interesting rollsum boundaries + rs := rollsum.New() + n := int64(0) + last := n + buf := new(bytes.Buffer) + + uploadString := func(s string) (*blobref.BlobRef, os.Error) { + br := blobref.Sha1FromString(s) + hasIt, err := serverHasBlob(bs, br) + if err != nil { + return nil, err + } + if hasIt { + return br, nil + } + _, err = bs.ReceiveBlob(br, strings.NewReader(s)) + if err != nil { + return nil, err + } + return br, nil + } + + // TODO: keep multiple of these in-flight at a time. + uploadLastSpan := func() bool { + defer buf.Reset() + br, err := uploadString(buf.String()) + if err != nil { + outerr = err + return false + } + spans[len(spans)-1].br = br + return true + } + + for { + c, err := bufr.ReadByte() + if err == os.EOF { + if n != last { + spans = append(spans, span{from: last, to: n}) + if !uploadLastSpan() { + return + } + } + break + } + if err != nil { + return nil, err + } + buf.WriteByte(c) + + n++ + rs.Roll(c) + if !rs.OnSplit() { + continue + } + bits := rs.Bits() + + // Take any spans from the end of the spans slice that + // have a smaller 'bits' score and make them children + // of this node. + var children []span + childrenFrom := len(spans) + for childrenFrom > 0 && spans[childrenFrom-1].bits < bits { + childrenFrom-- + } + if nCopy := len(spans) - childrenFrom; nCopy > 0 { + children = make([]span, nCopy) + copy(children, spans[childrenFrom:]) + spans = spans[:childrenFrom] + } + + spans = append(spans, span{from: last, to: n, bits: bits, children: children}) + last = n + if !uploadLastSpan() { + return + } + } + + var addContentParts func(dst *[]ContentPart, s []span) os.Error + + uploadFile := func(filename string, fileSize int64, s []span) (*blobref.BlobRef, os.Error) { + parts := []ContentPart{} + err := addContentParts(&parts, s) + if err != nil { + return nil, err + } + m := NewCommonFilenameMap(filename) + err = PopulateRegularFileMap(m, fileSize, parts) + if err != nil { + return nil, err + } + json, err := MapToCamliJson(m) + if err != nil { + return nil, err + } + return uploadString(json) + } + + addContentParts = func(dst *[]ContentPart, spansl []span) os.Error { + for _, sp := range spansl { + if len(sp.children) > 0 { + childrenSize := int64(0) + for _, cs := range sp.children { + childrenSize += cs.size() + } + br, err := uploadFile("", childrenSize, sp.children) + if err != nil { + return err + } + *dst = append(*dst, ContentPart{ + SubBlobRefString: br.String(), + SubBlobRef: br, + Size: uint64(childrenSize), + }) + } + if sp.from != sp.to { + *dst = append(*dst, ContentPart{ + BlobRefString: sp.br.String(), + BlobRef: sp.br, + Size: uint64(sp.to - sp.from), + }) + } + } + return nil + } + + // The top-level content parts + return uploadFile(filename, n, spans) } diff --git a/lib/go/camli/schema/schema.go b/lib/go/camli/schema/schema.go index 1098b723f..a2cb1217a 100644 --- a/lib/go/camli/schema/schema.go +++ b/lib/go/camli/schema/schema.go @@ -90,10 +90,12 @@ type Superset struct { } type ContentPart struct { - BlobRefString string "blobRef" - BlobRef *blobref.BlobRef // TODO: ditch BlobRefString? use json.Unmarshaler? - Size uint64 "size" - Offset uint64 "offset" + BlobRefString string "blobRef" + BlobRef *blobref.BlobRef // TODO: ditch BlobRefString? use json.Unmarshaler? + SubBlobRefString string "subFileBlobRef" + SubBlobRef *blobref.BlobRef + Size uint64 "size" + Offset uint64 "offset" } func (cp *ContentPart) blobref() *blobref.BlobRef { @@ -242,12 +244,14 @@ func MapToCamliJson(m map[string]interface{}) (string, os.Error) { func NewCommonFilenameMap(fileName string) map[string]interface{} { m := newCamliMap(1, "" /* no type yet */ ) - lastSlash := strings.LastIndex(fileName, "/") - baseName := fileName[lastSlash+1:] - if isValidUtf8(baseName) { - m["fileName"] = baseName - } else { - m["fileNameBytes"] = []uint8(baseName) + if fileName != "" { + lastSlash := strings.LastIndex(fileName, "/") + baseName := fileName[lastSlash+1:] + if isValidUtf8(baseName) { + m["fileName"] = baseName + } else { + m["fileNameBytes"] = []uint8(baseName) + } } return m } @@ -299,7 +303,11 @@ func PopulateRegularFileMap(m map[string]interface{}, size int64, parts []Conten for idx, part := range parts { mpart := make(map[string]interface{}) mparts[idx] = mpart - mpart["blobRef"] = part.BlobRef.String() + if part.BlobRef != nil { + mpart["blobRef"] = part.BlobRef.String() + } else if part.SubBlobRef != nil { + mpart["subFileBlobRef"] = part.SubBlobRef.String() + } mpart["size"] = part.Size sumSize += part.Size if part.Offset != 0 {