From c0ea8dbcb1633bd0434911abb8fca6e99e9fed08 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 7 Sep 2011 17:51:29 -0700 Subject: [PATCH] Issue 17: file vs. bytes schema change Details: http://code.google.com/p/camlistore/issues/detail?id=17 Change-Id: Ia1237952a2a66f1dcacd00c5c28eb360d9f1d1d7 --- clients/go/camput/camput.go | 5 +- doc/schema/bytes.txt | 38 +++++++++++++++ doc/schema/files/file.txt | 42 ++--------------- lib/go/camli/fs/fs.go | 21 ++++++--- lib/go/camli/mysqlindexer/receive.go | 3 -- lib/go/camli/schema/fileread_test.go | 31 ++++++------- lib/go/camli/schema/filereader.go | 34 +++++++------- lib/go/camli/schema/filewriter.go | 34 +++++++------- lib/go/camli/schema/schema.go | 69 ++++++++++++++++------------ server/go/camlistored/download.go | 6 +-- 10 files changed, 152 insertions(+), 131 deletions(-) create mode 100644 doc/schema/bytes.txt diff --git a/clients/go/camput/camput.go b/clients/go/camput/camput.go index 260f9a61c..ab838cb26 100644 --- a/clients/go/camput/camput.go +++ b/clients/go/camput/camput.go @@ -100,12 +100,13 @@ func (up *Uploader) UploadFile(filename string) (*client.PutResult, os.Error) { if err != nil { return nil, err } - parts := []schema.ContentPart{{BlobRef: blobpr.BlobRef, Size: uint64(blobpr.Size)}} + parts := []schema.BytesPart{{BlobRef: blobpr.BlobRef, Size: uint64(blobpr.Size)}} if blobpr.Size != fi.Size { // TODO: handle races of file changing while reading it // after the stat. } - if err = schema.PopulateRegularFileMap(m, fi.Size, parts); err != nil { + m["camliType"] = "file" + if err = schema.PopulateParts(m, fi.Size, parts); err != nil { return nil, err } case fi.IsSymlink(): diff --git a/doc/schema/bytes.txt b/doc/schema/bytes.txt new file mode 100644 index 000000000..41afbc9c0 --- /dev/null +++ b/doc/schema/bytes.txt @@ -0,0 +1,38 @@ +Description of a series of bytes. + +A "bytes" is a metadata (JSON) blob to describe blobs. It's a recursive +definition that's able to describe a hash tree, describing very large +blobs (or "files"). + +A "bytes" blob can be used on its own, but is also used by things like +a "file" schema blob. + + +{"camliVersion": 1, + "camliType": "bytes", + + // Required. Array of contiguous regions of bytes. Zero or more elements. + // + // Each element must have: + // "size": the number of bytes that this element contributes to array of bytes. + // Required, and must be greater than zero. + // + // At most one of: + // "blobRef": where to get the raw bytes from. if this and "bytesRef" + // are missing, the bytes are all zero (e.g. a sparse file hole) + // "bytesRef": alternative to blobRef, where to get the range's bytes + // from, but pointing recursively at a "bytes" schema blob + // describing the range, recursively. large files are made of + // these in a hash tree. it is an error if both "bytesRef" + // and "blobRef" are specified. + // + // Optional: + // "offset": the number of bytes into blobRef or bytesRef to skip to + // get the necessary bytes for the range. usually zero (unspecified) + "parts": [ + {"blobRef": "digalg-blobref", "size": 1024}, + {"bytesRef": "digalg-blobref", "size": 5000000, "offset": 492 }, + {"size": 1000000}, + {"blobRef": "digalg-blobref", "size": 10}, + ] +} diff --git a/doc/schema/files/file.txt b/doc/schema/files/file.txt index 8ccfc36e5..c1389ba5a 100644 --- a/doc/schema/files/file.txt +++ b/doc/schema/files/file.txt @@ -3,47 +3,11 @@ File schema {"camliVersion": 1, "camliType": "file", - // - // INCLUDE ALL REQUIRED & ANY OPTIONAL FIELDS FROM file-common.txt - // - - // Required: - // (redundant with sum of contentParts sizes, but useful. if different, - // this value is canonical and clients should either truncate the file - // at this point or pad the rest with zeroes, as if there was a missing - // sparse file segment missing from contentParts) - "size": 6001034, // i.e. 1024 + 5000000 + 1000000 + 10 from below + // #include "file-common.txt" # metadata about the file + // #include "../bytes.txt" # describes the bytes of the file // Optional, if linkcount > 1, for representing hardlinks properly. - "inodeRef": "digalg-blobref", // to "inode" blobref, when linkcount > 1 - - // Optional, if the file is a fragment of a larger file (referenced - // from a "subFileBlobRef"). If true, this is just a hint to the indexer - // not to index its bytes. - "fragment": false, - - // Required. Array of contiguous regions of bytes. Zero or more elements. - // Typically will be just one. - // - // Each element must have: - // "size": the number of bytes that this element contributes to this file. - // required, and must be greater than zero. - // - // Optional: - // "blobRef": where to get the raw bytes from. if this and "subFileBlobRef" - // are missing, the bytes are all zero (e.g. a sparse file hole) - // "subFileBlobRef": alternative to blobRef, where to get the range's bytes - // from, but pointing at a file schema blob describing the - // range, recursively. large files are made of these in - // a hash tree. - // "offset": the number of bytes into blobRef or subFileBlobRef to skip to - // get the necessary bytes for the range. usually zero. - "contentParts": [ - {"blobRef": "digalg-blobref", "size": 1024}, - {"blobRef": "digalg-blobref", "size": 5000000, "offset": 492 }, - {"size": 1000000}, - {"blobRef": "digalg-blobref", "size": 10}, - ] + "inodeRef": "digalg-blobref", // to "inode" blobref, when the link count > 1 } // TODO: Mac/NTFS-style resource forks? perhaps just a "streams" diff --git a/lib/go/camli/fs/fs.go b/lib/go/camli/fs/fs.go index 8560eee5c..f8835996b 100644 --- a/lib/go/camli/fs/fs.go +++ b/lib/go/camli/fs/fs.go @@ -234,7 +234,7 @@ func (fs *CamliFileSystem) GetAttr(name string) (*fuse.Attr, fuse.Status) { // TODO: other types if ss.Type == "file" { - fi.Size = int64(ss.Size) + fi.Size = int64(ss.SumPartsSize()) } fi.Mtime_ns = schema.NanosFromRFC3339(ss.UnixMtime) @@ -280,7 +280,7 @@ func (fs *CamliFileSystem) Open(name string, flags uint32) (file fuse.RawFuseFil return nil, fuse.EINVAL } - return &CamliFile{nil, fs, fileblob, ss}, fuse.OK + return &CamliFile{fs: fs, blob: fileblob, ss: ss}, fuse.OK } // returns fuse.OK on success; anything else on error @@ -390,18 +390,27 @@ type CamliFile struct { fs *CamliFileSystem blob *blobref.BlobRef ss *schema.Superset + + size uint64 // memoized +} + +func (f *CamliFile) Size() uint64 { + if f.size == 0 { + f.size = f.ss.SumPartsSize() + } + return f.size } func (file *CamliFile) Read(ri *fuse.ReadIn, bp *fuse.BufferPool) (retbuf []byte, retst fuse.Status) { offset := ri.Offset - if offset >= file.ss.Size { + if offset >= file.Size() { return []byte(""), fuse.OK // TODO: correct status? } size := ri.Size // size of read to do (uint32) endOffset := offset + uint64(size) - if endOffset > file.ss.Size { - size -= uint32(endOffset - file.ss.Size) - endOffset = file.ss.Size + if endOffset > file.Size() { + size -= uint32(endOffset - file.Size()) + endOffset = file.Size() } buf := bytes.NewBuffer(make([]byte, 0, int(size))) diff --git a/lib/go/camli/mysqlindexer/receive.go b/lib/go/camli/mysqlindexer/receive.go index 71db62ddd..d1c0932f6 100644 --- a/lib/go/camli/mysqlindexer/receive.go +++ b/lib/go/camli/mysqlindexer/receive.go @@ -248,9 +248,6 @@ func (mi *Indexer) populatePermanode(blobRef *blobref.BlobRef, camli *schema.Sup } func (mi *Indexer) populateFile(blobRef *blobref.BlobRef, ss *schema.Superset) (err os.Error) { - if ss.Fragment { - return nil - } seekFetcher, err := blobref.SeekerFromStreamingFetcher(mi.BlobSource) if err != nil { return err diff --git a/lib/go/camli/schema/fileread_test.go b/lib/go/camli/schema/fileread_test.go index d859a01b2..5701696f2 100644 --- a/lib/go/camli/schema/fileread_test.go +++ b/lib/go/camli/schema/fileread_test.go @@ -38,26 +38,26 @@ func init() { } type readTest struct { - parts []*ContentPart + parts []*BytesPart skip uint64 expected string } -func part(blob *test.Blob, offset, size uint64) *ContentPart { - return &ContentPart{BlobRef: blob.BlobRef(), Size: size, Offset: offset} +func part(blob *test.Blob, offset, size uint64) *BytesPart { + return &BytesPart{BlobRef: blob.BlobRef(), Size: size, Offset: offset} } -// filePart returns a ContentPart that references a file JSON schema +// filePart returns a BytesPart that references a file JSON schema // blob made of the provided content parts. -func filePart(cps []*ContentPart, skip uint64) *ContentPart { - m := NewCommonFilenameMap("") +func filePart(cps []*BytesPart, skip uint64) *BytesPart { + m := NewBytes() fileSize := int64(0) - cpl := []ContentPart{} + cpl := []BytesPart{} for _, cp := range cps { fileSize += int64(cp.Size) cpl = append(cpl, *cp) } - err := PopulateRegularFileMap(m, fileSize, cpl) + err := PopulateParts(m, fileSize, cpl) if err != nil { panic(err.String()) } @@ -67,22 +67,22 @@ func filePart(cps []*ContentPart, skip uint64) *ContentPart { } tb := &test.Blob{json} testFetcher.AddBlob(tb) - return &ContentPart{SubBlobRef: tb.BlobRef(), Size: uint64(fileSize) - skip, Offset: skip} + return &BytesPart{BytesRef: tb.BlobRef(), Size: uint64(fileSize) - skip, Offset: skip} } -func all(blob *test.Blob) *ContentPart { +func all(blob *test.Blob) *BytesPart { return part(blob, 0, uint64(blob.Size())) } -func zero(size uint64) *ContentPart { - return &ContentPart{Size: size} +func zero(size uint64) *BytesPart { + return &BytesPart{Size: size} } -func parts(parts ...*ContentPart) []*ContentPart { +func parts(parts ...*BytesPart) []*BytesPart { return parts } -func sizeSum(parts []*ContentPart) (s uint64) { +func sizeSum(parts []*BytesPart) (s uint64) { for _, p := range parts { s += uint64(p.Size) } @@ -126,8 +126,7 @@ func TestReader(t *testing.T) { ss := new(Superset) ss.Type = "file" ss.Version = 1 - ss.Size = sizeSum(rt.parts) - ss.ContentParts = rt.parts + ss.Parts = rt.parts fr, err := ss.NewFileReader(testFetcher) if err != nil { t.Errorf("read error on test %d: %v", idx, err) diff --git a/lib/go/camli/schema/filereader.go b/lib/go/camli/schema/filereader.go index 967630592..62316b2ce 100644 --- a/lib/go/camli/schema/filereader.go +++ b/lib/go/camli/schema/filereader.go @@ -150,11 +150,13 @@ type FileReader struct { cr blobref.ReadSeekCloser // cached reader (for blobref chunks) crbr *blobref.BlobRef // the blobref that cr is for - csubfr *FileReader // cached sub blobref reader (for subBlobRef chunks) - ccp *ContentPart // the content part that csubfr is cached for + csubfr *FileReader // cached sub blobref reader (for subBlobRef chunks) + ccp *BytesPart // the content part that csubfr is cached for } -// TODO: make this take a blobref.FetcherAt instead? +// TODO(bradfitz): make this take a blobref.FetcherAt instead? +// TODO(bradfitz): rename this into bytes reader? but for now it's still +// named FileReader, but can also read a "bytes" schema. func NewFileReader(fetcher blobref.SeekFetcher, fileBlobRef *blobref.BlobRef) (*FileReader, os.Error) { if fileBlobRef == nil { return nil, os.NewError("schema/filereader: NewFileReader blobref was nil") @@ -167,8 +169,8 @@ func NewFileReader(fetcher blobref.SeekFetcher, fileBlobRef *blobref.BlobRef) (* if err = json.NewDecoder(rsc).Decode(ss); err != nil { return nil, fmt.Errorf("schema/filereader: decoding file schema blob: %v", err) } - if ss.Type != "file" { - return nil, fmt.Errorf("schema/filereader: expected \"file\" schema blob, got %q", ss.Type) + if ss.Type != "file" && ss.Type != "bytes" { + return nil, fmt.Errorf("schema/filereader: expected \"file\" or \"bytes\" schema blob, got %q", ss.Type) } fr, err := ss.NewFileReader(fetcher) if err != nil { @@ -178,10 +180,10 @@ func NewFileReader(fetcher blobref.SeekFetcher, fileBlobRef *blobref.BlobRef) (* } func (ss *Superset) NewFileReader(fetcher blobref.SeekFetcher) (*FileReader, os.Error) { - if ss.Type != "file" { - return nil, fmt.Errorf("schema/filereader: Superset not of type \"file\"") + if ss.Type != "file" && ss.Type != "bytes" { + return nil, fmt.Errorf("schema/filereader: Superset not of type \"file\" or \"bytes\"") } - return &FileReader{fetcher: fetcher, ss: ss, remain: int64(ss.Size)}, nil + return &FileReader{fetcher: fetcher, ss: ss, remain: int64(ss.SumPartsSize())}, nil } // FileSchema returns the reader's schema superset. Don't mutate it. @@ -205,8 +207,8 @@ func (fr *FileReader) Skip(skipBytes uint64) uint64 { wantedSkipped := skipBytes - for skipBytes != 0 && fr.ci < len(fr.ss.ContentParts) { - cp := fr.ss.ContentParts[fr.ci] + for skipBytes != 0 && fr.ci < len(fr.ss.Parts) { + cp := fr.ss.Parts[fr.ci] thisChunkSkippable := cp.Size - fr.ccon toSkip := minu64(skipBytes, thisChunkSkippable) fr.ccon += toSkip @@ -254,11 +256,11 @@ func (fr *FileReader) readerFor(br *blobref.BlobRef, seekTo int64) (r io.Reader, return rsc, nil } -func (fr *FileReader) subBlobRefReader(cp *ContentPart) (io.Reader, os.Error) { +func (fr *FileReader) subBlobRefReader(cp *BytesPart) (io.Reader, os.Error) { if fr.ccp == cp { return fr.csubfr, nil } - subfr, err := NewFileReader(fr.fetcher, cp.SubBlobRef) + subfr, err := NewFileReader(fr.fetcher, cp.BytesRef) if err == nil { subfr.Skip(cp.Offset) fr.csubfr = subfr @@ -267,16 +269,16 @@ func (fr *FileReader) subBlobRefReader(cp *ContentPart) (io.Reader, os.Error) { return subfr, err } -func (fr *FileReader) currentPart() (*ContentPart, os.Error) { +func (fr *FileReader) currentPart() (*BytesPart, os.Error) { for { - if fr.ci >= len(fr.ss.ContentParts) { + if fr.ci >= len(fr.ss.Parts) { fr.closeOpenBlobs() if fr.remain > 0 { return nil, fmt.Errorf("schema: declared file schema size was larger than sum of content parts") } return nil, os.EOF } - cp := fr.ss.ContentParts[fr.ci] + cp := fr.ss.Parts[fr.ci] thisChunkReadable := cp.Size - fr.ccon if thisChunkReadable == 0 { fr.ci++ @@ -303,7 +305,7 @@ func (fr *FileReader) Read(p []byte) (n int, err os.Error) { } br := cp.BlobRef - sbr := cp.SubBlobRef + sbr := cp.BytesRef if br != nil && sbr != nil { return 0, fmt.Errorf("content part index %d has both blobRef and subFileBlobRef", fr.ci) } diff --git a/lib/go/camli/schema/filewriter.go b/lib/go/camli/schema/filewriter.go index b5ef006a9..26ecd9cf3 100644 --- a/lib/go/camli/schema/filewriter.go +++ b/lib/go/camli/schema/filewriter.go @@ -40,7 +40,7 @@ func WriteFileFromReader(bs blobserver.Storage, filename string, r io.Reader) (* // Naive for now. Just in 1MB chunks. // TODO: rolling hash and hash trees. - parts, size := []ContentPart{}, int64(0) + parts, size := []BytesPart{}, int64(0) buf := new(bytes.Buffer) for { @@ -72,15 +72,15 @@ func WriteFileFromReader(bs blobserver.Storage, filename string, r io.Reader) (* } size += n - parts = append(parts, ContentPart{ + parts = append(parts, BytesPart{ BlobRef: br, Size: uint64(n), Offset: 0, // into BlobRef to read from (not of dest) }) } - m := NewCommonFilenameMap(filename) - err := PopulateRegularFileMap(m, size, parts) + m := NewFileMap(filename) + err := PopulateParts(m, size, parts) if err != nil { return nil, err } @@ -211,21 +211,21 @@ func WriteFileFromReaderRolling(bs blobserver.Storage, filename string, r io.Rea } } - var addContentParts func(dst *[]ContentPart, s []span) os.Error + var addBytesParts func(dst *[]BytesPart, s []span) os.Error uploadFile := func(filename string, isFragment bool, fileSize int64, s []span) (*blobref.BlobRef, os.Error) { - parts := []ContentPart{} - err := addContentParts(&parts, s) - if err != nil { - return nil, err - } - m := NewCommonFilenameMap(filename) - err = PopulateRegularFileMap(m, fileSize, parts) + parts := []BytesPart{} + err := addBytesParts(&parts, s) if err != nil { return nil, err } + m := NewFileMap(filename) if isFragment { - m["fragment"] = true + m = NewBytes() + } + err = PopulateParts(m, fileSize, parts) + if err != nil { + return nil, err } json, err := MapToCamliJson(m) if err != nil { @@ -234,7 +234,7 @@ func WriteFileFromReaderRolling(bs blobserver.Storage, filename string, r io.Rea return uploadString(json) } - addContentParts = func(dst *[]ContentPart, spansl []span) os.Error { + addBytesParts = func(dst *[]BytesPart, spansl []span) os.Error { for _, sp := range spansl { if len(sp.children) > 0 { childrenSize := int64(0) @@ -245,13 +245,13 @@ func WriteFileFromReaderRolling(bs blobserver.Storage, filename string, r io.Rea if err != nil { return err } - *dst = append(*dst, ContentPart{ - SubBlobRef: br, + *dst = append(*dst, BytesPart{ + BytesRef: br, Size: uint64(childrenSize), }) } if sp.from != sp.to { - *dst = append(*dst, ContentPart{ + *dst = append(*dst, BytesPart{ BlobRef: sp.br, Size: uint64(sp.to - sp.from), }) diff --git a/lib/go/camli/schema/schema.go b/lib/go/camli/schema/schema.go index ad5019ed5..b4d416190 100644 --- a/lib/go/camli/schema/schema.go +++ b/lib/go/camli/schema/schema.go @@ -213,20 +213,23 @@ type Superset struct { UnixCtime string `json:"unixCtime"` UnixAtime string `json:"unixAtime"` - Size uint64 `json:"size"` // for files - ContentParts []*ContentPart `json:"contentParts"` - Fragment bool `json:"fragment"` + Parts []*BytesPart `json:"parts"` Entries string `json:"entries"` // for directories, a blobref to a static-set Members []string `json:"members"` // for static sets (for directory static-sets: // blobrefs to child dirs/files) } -type ContentPart struct { - BlobRef *blobref.BlobRef `json:"blobRef"` - SubBlobRef *blobref.BlobRef `json:"subFileBlobRef"` - Size uint64 `json:"size"` - Offset uint64 `json:"offset"` +type BytesPart struct { + // Required. + Size uint64 `json:"size"` + + // At most one of: + BlobRef *blobref.BlobRef `json:"blobRef,omitempty"` + BytesRef *blobref.BlobRef `json:"bytesRef,omitempty"` + + // Optional (default value is zero if unset anyway): + Offset uint64 `json:"offset,omitempty"` } func stringFromMixedArray(parts []interface{}) string { @@ -244,6 +247,13 @@ func stringFromMixedArray(parts []interface{}) string { return buf.String() } +func (ss *Superset) SumPartsSize() (size uint64) { + for _, part := range ss.Parts { + size += uint64(part.Size) + } + return size +} + func (ss *Superset) SymlinkTargetString() string { if ss.SymlinkTarget != "" { return ss.SymlinkTarget @@ -366,6 +376,12 @@ func MapToCamliJson(m map[string]interface{}) (string, os.Error) { return string(buf.Bytes()), nil } +func NewFileMap(fileName string) map[string]interface{} { + m := NewCommonFilenameMap(fileName) + m["camliType"] = "file" + return m +} + func NewCommonFilenameMap(fileName string) map[string]interface{} { m := newCamliMap(1, "" /* no type yet */ ) if fileName != "" { @@ -409,39 +425,30 @@ func NewCommonFileMap(fileName string, fi *os.FileInfo) map[string]interface{} { return m } -type InvalidContentPartsError struct { - StatSize int64 - SumOfParts int64 -} - -func (e *InvalidContentPartsError) String() string { - return fmt.Sprintf("Invalid ContentPart slice in PopulateRegularFileMap; file stat size is %d but sum of parts was %d", e.StatSize, e.SumOfParts) -} - -func PopulateRegularFileMap(m map[string]interface{}, size int64, parts []ContentPart) os.Error { - m["camliType"] = "file" - m["size"] = size - - sumSize := uint64(0) +func PopulateParts(m map[string]interface{}, size int64, parts []BytesPart) os.Error { + sumSize := int64(0) mparts := make([]map[string]interface{}, len(parts)) for idx, part := range parts { mpart := make(map[string]interface{}) mparts[idx] = mpart - if part.BlobRef != nil { + switch { + case part.BlobRef != nil && part.BytesRef != nil: + return os.NewError("schema: part contains both blobRef and bytesRef") + case part.BlobRef != nil: mpart["blobRef"] = part.BlobRef.String() - } else if part.SubBlobRef != nil { - mpart["subFileBlobRef"] = part.SubBlobRef.String() + case part.BytesRef != nil: + mpart["bytesRef"] = part.BytesRef.String() } mpart["size"] = part.Size - sumSize += part.Size + sumSize += int64(part.Size) if part.Offset != 0 { mpart["offset"] = part.Offset } } - if sumSize != uint64(size) { - return &InvalidContentPartsError{size, int64(sumSize)} + if sumSize != size { + return fmt.Errorf("schema: declared size %d doesn't match sum of parts size %d", size, sumSize) } - m["contentParts"] = mparts + m["parts"] = mparts return nil } @@ -459,6 +466,10 @@ func PopulateSymlinkMap(m map[string]interface{}, fileName string) os.Error { return nil } +func NewBytes() map[string]interface{} { + return newCamliMap(1, "bytes") +} + func PopulateDirectoryMap(m map[string]interface{}, staticSetRef *blobref.BlobRef) { m["camliType"] = "directory" m["entries"] = staticSetRef.String() diff --git a/server/go/camlistored/download.go b/server/go/camlistored/download.go index 61c285d69..89a10b811 100644 --- a/server/go/camlistored/download.go +++ b/server/go/camlistored/download.go @@ -58,7 +58,7 @@ func (dh *DownloadHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request, defer fr.Close() schema := fr.FileSchema() - rw.Header().Set("Content-Length", fmt.Sprintf("%d", schema.Size)) + rw.Header().Set("Content-Length", fmt.Sprintf("%d", schema.SumPartsSize())) // TODO: fr.FileSchema() and guess a mime type? For now: mimeType := "application/octet-stream" @@ -89,9 +89,9 @@ func (dh *DownloadHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request, log.Printf("error serving download of file schema %s: %v", file, err) return } - if n != int64(schema.Size) { + if size := schema.SumPartsSize(); n != int64(size) { log.Printf("error serving download of file schema %s: sent %d, expected size of %d", - file, n, schema.Size) + file, n, size) return }