Issue 17: file vs. bytes schema change

Details:
http://code.google.com/p/camlistore/issues/detail?id=17

Change-Id: Ia1237952a2a66f1dcacd00c5c28eb360d9f1d1d7
This commit is contained in:
Brad Fitzpatrick 2011-09-07 17:51:29 -07:00
parent 62179648f9
commit c0ea8dbcb1
10 changed files with 152 additions and 131 deletions

View File

@ -100,12 +100,13 @@ func (up *Uploader) UploadFile(filename string) (*client.PutResult, os.Error) {
if err != nil {
return nil, err
}
parts := []schema.ContentPart{{BlobRef: blobpr.BlobRef, Size: uint64(blobpr.Size)}}
parts := []schema.BytesPart{{BlobRef: blobpr.BlobRef, Size: uint64(blobpr.Size)}}
if blobpr.Size != fi.Size {
// TODO: handle races of file changing while reading it
// after the stat.
}
if err = schema.PopulateRegularFileMap(m, fi.Size, parts); err != nil {
m["camliType"] = "file"
if err = schema.PopulateParts(m, fi.Size, parts); err != nil {
return nil, err
}
case fi.IsSymlink():

38
doc/schema/bytes.txt Normal file
View File

@ -0,0 +1,38 @@
Description of a series of bytes.
A "bytes" is a metadata (JSON) blob to describe blobs. It's a recursive
definition that's able to describe a hash tree, describing very large
blobs (or "files").
A "bytes" blob can be used on its own, but is also used by things like
a "file" schema blob.
{"camliVersion": 1,
"camliType": "bytes",
// Required. Array of contiguous regions of bytes. Zero or more elements.
//
// Each element must have:
// "size": the number of bytes that this element contributes to array of bytes.
// Required, and must be greater than zero.
//
// At most one of:
// "blobRef": where to get the raw bytes from. if this and "bytesRef"
// are missing, the bytes are all zero (e.g. a sparse file hole)
// "bytesRef": alternative to blobRef, where to get the range's bytes
// from, but pointing recursively at a "bytes" schema blob
// describing the range, recursively. large files are made of
// these in a hash tree. it is an error if both "bytesRef"
// and "blobRef" are specified.
//
// Optional:
// "offset": the number of bytes into blobRef or bytesRef to skip to
// get the necessary bytes for the range. usually zero (unspecified)
"parts": [
{"blobRef": "digalg-blobref", "size": 1024},
{"bytesRef": "digalg-blobref", "size": 5000000, "offset": 492 },
{"size": 1000000},
{"blobRef": "digalg-blobref", "size": 10},
]
}

View File

@ -3,47 +3,11 @@ File schema
{"camliVersion": 1,
"camliType": "file",
//
// INCLUDE ALL REQUIRED & ANY OPTIONAL FIELDS FROM file-common.txt
//
// Required:
// (redundant with sum of contentParts sizes, but useful. if different,
// this value is canonical and clients should either truncate the file
// at this point or pad the rest with zeroes, as if there was a missing
// sparse file segment missing from contentParts)
"size": 6001034, // i.e. 1024 + 5000000 + 1000000 + 10 from below
// #include "file-common.txt" # metadata about the file
// #include "../bytes.txt" # describes the bytes of the file
// Optional, if linkcount > 1, for representing hardlinks properly.
"inodeRef": "digalg-blobref", // to "inode" blobref, when linkcount > 1
// Optional, if the file is a fragment of a larger file (referenced
// from a "subFileBlobRef"). If true, this is just a hint to the indexer
// not to index its bytes.
"fragment": false,
// Required. Array of contiguous regions of bytes. Zero or more elements.
// Typically will be just one.
//
// Each element must have:
// "size": the number of bytes that this element contributes to this file.
// required, and must be greater than zero.
//
// Optional:
// "blobRef": where to get the raw bytes from. if this and "subFileBlobRef"
// are missing, the bytes are all zero (e.g. a sparse file hole)
// "subFileBlobRef": alternative to blobRef, where to get the range's bytes
// from, but pointing at a file schema blob describing the
// range, recursively. large files are made of these in
// a hash tree.
// "offset": the number of bytes into blobRef or subFileBlobRef to skip to
// get the necessary bytes for the range. usually zero.
"contentParts": [
{"blobRef": "digalg-blobref", "size": 1024},
{"blobRef": "digalg-blobref", "size": 5000000, "offset": 492 },
{"size": 1000000},
{"blobRef": "digalg-blobref", "size": 10},
]
"inodeRef": "digalg-blobref", // to "inode" blobref, when the link count > 1
}
// TODO: Mac/NTFS-style resource forks? perhaps just a "streams"

View File

@ -234,7 +234,7 @@ func (fs *CamliFileSystem) GetAttr(name string) (*fuse.Attr, fuse.Status) {
// TODO: other types
if ss.Type == "file" {
fi.Size = int64(ss.Size)
fi.Size = int64(ss.SumPartsSize())
}
fi.Mtime_ns = schema.NanosFromRFC3339(ss.UnixMtime)
@ -280,7 +280,7 @@ func (fs *CamliFileSystem) Open(name string, flags uint32) (file fuse.RawFuseFil
return nil, fuse.EINVAL
}
return &CamliFile{nil, fs, fileblob, ss}, fuse.OK
return &CamliFile{fs: fs, blob: fileblob, ss: ss}, fuse.OK
}
// returns fuse.OK on success; anything else on error
@ -390,18 +390,27 @@ type CamliFile struct {
fs *CamliFileSystem
blob *blobref.BlobRef
ss *schema.Superset
size uint64 // memoized
}
func (f *CamliFile) Size() uint64 {
if f.size == 0 {
f.size = f.ss.SumPartsSize()
}
return f.size
}
func (file *CamliFile) Read(ri *fuse.ReadIn, bp *fuse.BufferPool) (retbuf []byte, retst fuse.Status) {
offset := ri.Offset
if offset >= file.ss.Size {
if offset >= file.Size() {
return []byte(""), fuse.OK // TODO: correct status?
}
size := ri.Size // size of read to do (uint32)
endOffset := offset + uint64(size)
if endOffset > file.ss.Size {
size -= uint32(endOffset - file.ss.Size)
endOffset = file.ss.Size
if endOffset > file.Size() {
size -= uint32(endOffset - file.Size())
endOffset = file.Size()
}
buf := bytes.NewBuffer(make([]byte, 0, int(size)))

View File

@ -248,9 +248,6 @@ func (mi *Indexer) populatePermanode(blobRef *blobref.BlobRef, camli *schema.Sup
}
func (mi *Indexer) populateFile(blobRef *blobref.BlobRef, ss *schema.Superset) (err os.Error) {
if ss.Fragment {
return nil
}
seekFetcher, err := blobref.SeekerFromStreamingFetcher(mi.BlobSource)
if err != nil {
return err

View File

@ -38,26 +38,26 @@ func init() {
}
type readTest struct {
parts []*ContentPart
parts []*BytesPart
skip uint64
expected string
}
func part(blob *test.Blob, offset, size uint64) *ContentPart {
return &ContentPart{BlobRef: blob.BlobRef(), Size: size, Offset: offset}
func part(blob *test.Blob, offset, size uint64) *BytesPart {
return &BytesPart{BlobRef: blob.BlobRef(), Size: size, Offset: offset}
}
// filePart returns a ContentPart that references a file JSON schema
// filePart returns a BytesPart that references a file JSON schema
// blob made of the provided content parts.
func filePart(cps []*ContentPart, skip uint64) *ContentPart {
m := NewCommonFilenameMap("")
func filePart(cps []*BytesPart, skip uint64) *BytesPart {
m := NewBytes()
fileSize := int64(0)
cpl := []ContentPart{}
cpl := []BytesPart{}
for _, cp := range cps {
fileSize += int64(cp.Size)
cpl = append(cpl, *cp)
}
err := PopulateRegularFileMap(m, fileSize, cpl)
err := PopulateParts(m, fileSize, cpl)
if err != nil {
panic(err.String())
}
@ -67,22 +67,22 @@ func filePart(cps []*ContentPart, skip uint64) *ContentPart {
}
tb := &test.Blob{json}
testFetcher.AddBlob(tb)
return &ContentPart{SubBlobRef: tb.BlobRef(), Size: uint64(fileSize) - skip, Offset: skip}
return &BytesPart{BytesRef: tb.BlobRef(), Size: uint64(fileSize) - skip, Offset: skip}
}
func all(blob *test.Blob) *ContentPart {
func all(blob *test.Blob) *BytesPart {
return part(blob, 0, uint64(blob.Size()))
}
func zero(size uint64) *ContentPart {
return &ContentPart{Size: size}
func zero(size uint64) *BytesPart {
return &BytesPart{Size: size}
}
func parts(parts ...*ContentPart) []*ContentPart {
func parts(parts ...*BytesPart) []*BytesPart {
return parts
}
func sizeSum(parts []*ContentPart) (s uint64) {
func sizeSum(parts []*BytesPart) (s uint64) {
for _, p := range parts {
s += uint64(p.Size)
}
@ -126,8 +126,7 @@ func TestReader(t *testing.T) {
ss := new(Superset)
ss.Type = "file"
ss.Version = 1
ss.Size = sizeSum(rt.parts)
ss.ContentParts = rt.parts
ss.Parts = rt.parts
fr, err := ss.NewFileReader(testFetcher)
if err != nil {
t.Errorf("read error on test %d: %v", idx, err)

View File

@ -150,11 +150,13 @@ type FileReader struct {
cr blobref.ReadSeekCloser // cached reader (for blobref chunks)
crbr *blobref.BlobRef // the blobref that cr is for
csubfr *FileReader // cached sub blobref reader (for subBlobRef chunks)
ccp *ContentPart // the content part that csubfr is cached for
csubfr *FileReader // cached sub blobref reader (for subBlobRef chunks)
ccp *BytesPart // the content part that csubfr is cached for
}
// TODO: make this take a blobref.FetcherAt instead?
// TODO(bradfitz): make this take a blobref.FetcherAt instead?
// TODO(bradfitz): rename this into bytes reader? but for now it's still
// named FileReader, but can also read a "bytes" schema.
func NewFileReader(fetcher blobref.SeekFetcher, fileBlobRef *blobref.BlobRef) (*FileReader, os.Error) {
if fileBlobRef == nil {
return nil, os.NewError("schema/filereader: NewFileReader blobref was nil")
@ -167,8 +169,8 @@ func NewFileReader(fetcher blobref.SeekFetcher, fileBlobRef *blobref.BlobRef) (*
if err = json.NewDecoder(rsc).Decode(ss); err != nil {
return nil, fmt.Errorf("schema/filereader: decoding file schema blob: %v", err)
}
if ss.Type != "file" {
return nil, fmt.Errorf("schema/filereader: expected \"file\" schema blob, got %q", ss.Type)
if ss.Type != "file" && ss.Type != "bytes" {
return nil, fmt.Errorf("schema/filereader: expected \"file\" or \"bytes\" schema blob, got %q", ss.Type)
}
fr, err := ss.NewFileReader(fetcher)
if err != nil {
@ -178,10 +180,10 @@ func NewFileReader(fetcher blobref.SeekFetcher, fileBlobRef *blobref.BlobRef) (*
}
func (ss *Superset) NewFileReader(fetcher blobref.SeekFetcher) (*FileReader, os.Error) {
if ss.Type != "file" {
return nil, fmt.Errorf("schema/filereader: Superset not of type \"file\"")
if ss.Type != "file" && ss.Type != "bytes" {
return nil, fmt.Errorf("schema/filereader: Superset not of type \"file\" or \"bytes\"")
}
return &FileReader{fetcher: fetcher, ss: ss, remain: int64(ss.Size)}, nil
return &FileReader{fetcher: fetcher, ss: ss, remain: int64(ss.SumPartsSize())}, nil
}
// FileSchema returns the reader's schema superset. Don't mutate it.
@ -205,8 +207,8 @@ func (fr *FileReader) Skip(skipBytes uint64) uint64 {
wantedSkipped := skipBytes
for skipBytes != 0 && fr.ci < len(fr.ss.ContentParts) {
cp := fr.ss.ContentParts[fr.ci]
for skipBytes != 0 && fr.ci < len(fr.ss.Parts) {
cp := fr.ss.Parts[fr.ci]
thisChunkSkippable := cp.Size - fr.ccon
toSkip := minu64(skipBytes, thisChunkSkippable)
fr.ccon += toSkip
@ -254,11 +256,11 @@ func (fr *FileReader) readerFor(br *blobref.BlobRef, seekTo int64) (r io.Reader,
return rsc, nil
}
func (fr *FileReader) subBlobRefReader(cp *ContentPart) (io.Reader, os.Error) {
func (fr *FileReader) subBlobRefReader(cp *BytesPart) (io.Reader, os.Error) {
if fr.ccp == cp {
return fr.csubfr, nil
}
subfr, err := NewFileReader(fr.fetcher, cp.SubBlobRef)
subfr, err := NewFileReader(fr.fetcher, cp.BytesRef)
if err == nil {
subfr.Skip(cp.Offset)
fr.csubfr = subfr
@ -267,16 +269,16 @@ func (fr *FileReader) subBlobRefReader(cp *ContentPart) (io.Reader, os.Error) {
return subfr, err
}
func (fr *FileReader) currentPart() (*ContentPart, os.Error) {
func (fr *FileReader) currentPart() (*BytesPart, os.Error) {
for {
if fr.ci >= len(fr.ss.ContentParts) {
if fr.ci >= len(fr.ss.Parts) {
fr.closeOpenBlobs()
if fr.remain > 0 {
return nil, fmt.Errorf("schema: declared file schema size was larger than sum of content parts")
}
return nil, os.EOF
}
cp := fr.ss.ContentParts[fr.ci]
cp := fr.ss.Parts[fr.ci]
thisChunkReadable := cp.Size - fr.ccon
if thisChunkReadable == 0 {
fr.ci++
@ -303,7 +305,7 @@ func (fr *FileReader) Read(p []byte) (n int, err os.Error) {
}
br := cp.BlobRef
sbr := cp.SubBlobRef
sbr := cp.BytesRef
if br != nil && sbr != nil {
return 0, fmt.Errorf("content part index %d has both blobRef and subFileBlobRef", fr.ci)
}

View File

@ -40,7 +40,7 @@ func WriteFileFromReader(bs blobserver.Storage, filename string, r io.Reader) (*
// Naive for now. Just in 1MB chunks.
// TODO: rolling hash and hash trees.
parts, size := []ContentPart{}, int64(0)
parts, size := []BytesPart{}, int64(0)
buf := new(bytes.Buffer)
for {
@ -72,15 +72,15 @@ func WriteFileFromReader(bs blobserver.Storage, filename string, r io.Reader) (*
}
size += n
parts = append(parts, ContentPart{
parts = append(parts, BytesPart{
BlobRef: br,
Size: uint64(n),
Offset: 0, // into BlobRef to read from (not of dest)
})
}
m := NewCommonFilenameMap(filename)
err := PopulateRegularFileMap(m, size, parts)
m := NewFileMap(filename)
err := PopulateParts(m, size, parts)
if err != nil {
return nil, err
}
@ -211,21 +211,21 @@ func WriteFileFromReaderRolling(bs blobserver.Storage, filename string, r io.Rea
}
}
var addContentParts func(dst *[]ContentPart, s []span) os.Error
var addBytesParts func(dst *[]BytesPart, s []span) os.Error
uploadFile := func(filename string, isFragment bool, fileSize int64, s []span) (*blobref.BlobRef, os.Error) {
parts := []ContentPart{}
err := addContentParts(&parts, s)
if err != nil {
return nil, err
}
m := NewCommonFilenameMap(filename)
err = PopulateRegularFileMap(m, fileSize, parts)
parts := []BytesPart{}
err := addBytesParts(&parts, s)
if err != nil {
return nil, err
}
m := NewFileMap(filename)
if isFragment {
m["fragment"] = true
m = NewBytes()
}
err = PopulateParts(m, fileSize, parts)
if err != nil {
return nil, err
}
json, err := MapToCamliJson(m)
if err != nil {
@ -234,7 +234,7 @@ func WriteFileFromReaderRolling(bs blobserver.Storage, filename string, r io.Rea
return uploadString(json)
}
addContentParts = func(dst *[]ContentPart, spansl []span) os.Error {
addBytesParts = func(dst *[]BytesPart, spansl []span) os.Error {
for _, sp := range spansl {
if len(sp.children) > 0 {
childrenSize := int64(0)
@ -245,13 +245,13 @@ func WriteFileFromReaderRolling(bs blobserver.Storage, filename string, r io.Rea
if err != nil {
return err
}
*dst = append(*dst, ContentPart{
SubBlobRef: br,
*dst = append(*dst, BytesPart{
BytesRef: br,
Size: uint64(childrenSize),
})
}
if sp.from != sp.to {
*dst = append(*dst, ContentPart{
*dst = append(*dst, BytesPart{
BlobRef: sp.br,
Size: uint64(sp.to - sp.from),
})

View File

@ -213,20 +213,23 @@ type Superset struct {
UnixCtime string `json:"unixCtime"`
UnixAtime string `json:"unixAtime"`
Size uint64 `json:"size"` // for files
ContentParts []*ContentPart `json:"contentParts"`
Fragment bool `json:"fragment"`
Parts []*BytesPart `json:"parts"`
Entries string `json:"entries"` // for directories, a blobref to a static-set
Members []string `json:"members"` // for static sets (for directory static-sets:
// blobrefs to child dirs/files)
}
type ContentPart struct {
BlobRef *blobref.BlobRef `json:"blobRef"`
SubBlobRef *blobref.BlobRef `json:"subFileBlobRef"`
Size uint64 `json:"size"`
Offset uint64 `json:"offset"`
type BytesPart struct {
// Required.
Size uint64 `json:"size"`
// At most one of:
BlobRef *blobref.BlobRef `json:"blobRef,omitempty"`
BytesRef *blobref.BlobRef `json:"bytesRef,omitempty"`
// Optional (default value is zero if unset anyway):
Offset uint64 `json:"offset,omitempty"`
}
func stringFromMixedArray(parts []interface{}) string {
@ -244,6 +247,13 @@ func stringFromMixedArray(parts []interface{}) string {
return buf.String()
}
func (ss *Superset) SumPartsSize() (size uint64) {
for _, part := range ss.Parts {
size += uint64(part.Size)
}
return size
}
func (ss *Superset) SymlinkTargetString() string {
if ss.SymlinkTarget != "" {
return ss.SymlinkTarget
@ -366,6 +376,12 @@ func MapToCamliJson(m map[string]interface{}) (string, os.Error) {
return string(buf.Bytes()), nil
}
func NewFileMap(fileName string) map[string]interface{} {
m := NewCommonFilenameMap(fileName)
m["camliType"] = "file"
return m
}
func NewCommonFilenameMap(fileName string) map[string]interface{} {
m := newCamliMap(1, "" /* no type yet */ )
if fileName != "" {
@ -409,39 +425,30 @@ func NewCommonFileMap(fileName string, fi *os.FileInfo) map[string]interface{} {
return m
}
type InvalidContentPartsError struct {
StatSize int64
SumOfParts int64
}
func (e *InvalidContentPartsError) String() string {
return fmt.Sprintf("Invalid ContentPart slice in PopulateRegularFileMap; file stat size is %d but sum of parts was %d", e.StatSize, e.SumOfParts)
}
func PopulateRegularFileMap(m map[string]interface{}, size int64, parts []ContentPart) os.Error {
m["camliType"] = "file"
m["size"] = size
sumSize := uint64(0)
func PopulateParts(m map[string]interface{}, size int64, parts []BytesPart) os.Error {
sumSize := int64(0)
mparts := make([]map[string]interface{}, len(parts))
for idx, part := range parts {
mpart := make(map[string]interface{})
mparts[idx] = mpart
if part.BlobRef != nil {
switch {
case part.BlobRef != nil && part.BytesRef != nil:
return os.NewError("schema: part contains both blobRef and bytesRef")
case part.BlobRef != nil:
mpart["blobRef"] = part.BlobRef.String()
} else if part.SubBlobRef != nil {
mpart["subFileBlobRef"] = part.SubBlobRef.String()
case part.BytesRef != nil:
mpart["bytesRef"] = part.BytesRef.String()
}
mpart["size"] = part.Size
sumSize += part.Size
sumSize += int64(part.Size)
if part.Offset != 0 {
mpart["offset"] = part.Offset
}
}
if sumSize != uint64(size) {
return &InvalidContentPartsError{size, int64(sumSize)}
if sumSize != size {
return fmt.Errorf("schema: declared size %d doesn't match sum of parts size %d", size, sumSize)
}
m["contentParts"] = mparts
m["parts"] = mparts
return nil
}
@ -459,6 +466,10 @@ func PopulateSymlinkMap(m map[string]interface{}, fileName string) os.Error {
return nil
}
func NewBytes() map[string]interface{} {
return newCamliMap(1, "bytes")
}
func PopulateDirectoryMap(m map[string]interface{}, staticSetRef *blobref.BlobRef) {
m["camliType"] = "directory"
m["entries"] = staticSetRef.String()

View File

@ -58,7 +58,7 @@ func (dh *DownloadHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request,
defer fr.Close()
schema := fr.FileSchema()
rw.Header().Set("Content-Length", fmt.Sprintf("%d", schema.Size))
rw.Header().Set("Content-Length", fmt.Sprintf("%d", schema.SumPartsSize()))
// TODO: fr.FileSchema() and guess a mime type? For now:
mimeType := "application/octet-stream"
@ -89,9 +89,9 @@ func (dh *DownloadHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request,
log.Printf("error serving download of file schema %s: %v", file, err)
return
}
if n != int64(schema.Size) {
if size := schema.SumPartsSize(); n != int64(size) {
log.Printf("error serving download of file schema %s: sent %d, expected size of %d",
file, n, schema.Size)
file, n, size)
return
}