schema: remove FileWriter's serial byte/file schema uploads after chunks are uploaded

Change-Id: I1c3a9c496ce168cc8b6d087ef1a0de99cac22127
This commit is contained in:
Brad Fitzpatrick 2013-02-02 22:35:08 -08:00
parent bdfc5b0c6c
commit 9eed2f6082
1 changed files with 53 additions and 21 deletions

View File

@ -181,10 +181,9 @@ func (r *noteEOFReader) Read(p []byte) (n int, err error) {
return
}
// br may be nil, to mean calculate it.
func uploadString(bs blobserver.StatReceiver, br *blobref.BlobRef, s string) (*blobref.BlobRef, error) {
if br == nil {
br = blobref.SHA1FromString(s)
panic("nil blobref")
}
hasIt, err := serverHasBlob(bs, br)
if err != nil {
@ -204,21 +203,56 @@ func uploadString(bs blobserver.StatReceiver, br *blobref.BlobRef, s string) (*b
// "file", which is a superset of "bytes"), sets it to the provided
// size, and populates with provided spans. The bytes or file schema
// blob is uploaded and its blobref is returned.
func uploadBytes(bs blobserver.StatReceiver, bb *Builder, size int64, s []span) (*blobref.BlobRef, error) {
func uploadBytes(bs blobserver.StatReceiver, bb *Builder, size int64, s []span) *uploadBytesFuture {
future := newUploadBytesFuture()
parts := []BytesPart{}
err := addBytesParts(bs, &parts, s)
if err != nil {
return nil, err
addBytesParts(bs, &parts, s, future)
if err := bb.PopulateParts(size, parts); err != nil {
future.errc <- err
} else {
json := bb.Blob().JSON()
br := blobref.SHA1FromString(json)
future.br = br
go func() {
_, err := uploadString(bs, br, json)
future.errc <- err
}()
}
err = bb.PopulateParts(size, parts)
if err != nil {
return nil, err
return future
}
func newUploadBytesFuture() *uploadBytesFuture {
return &uploadBytesFuture{
errc: make(chan error, 1),
}
return uploadString(bs, nil, bb.Blob().JSON())
}
// An uploadBytesFuture is an eager result of a still-in-progress uploadBytes call.
// Call Get to wait and get its final result.
type uploadBytesFuture struct {
br *blobref.BlobRef
errc chan error
children []*uploadBytesFuture
}
// BlobRef returns the optimistic blobref of this uploadBytes call without blocking.
func (f *uploadBytesFuture) BlobRef() *blobref.BlobRef {
return f.br
}
// Get blocks for all children and returns any final error.
func (f *uploadBytesFuture) Get() (*blobref.BlobRef, error) {
for _, f := range f.children {
if _, err := f.Get(); err != nil {
return nil, err
}
}
return f.br, <-f.errc
}
// addBytesParts uploads the provided spans to bs, appending elements to *dst.
func addBytesParts(bs blobserver.StatReceiver, dst *[]BytesPart, spans []span) error {
func addBytesParts(bs blobserver.StatReceiver, dst *[]BytesPart, spans []span, parent *uploadBytesFuture) {
for _, sp := range spans {
if len(sp.children) == 1 && sp.children[0].isSingleBlob() {
// Remove an occasional useless indirection of
@ -237,12 +271,10 @@ func addBytesParts(bs blobserver.StatReceiver, dst *[]BytesPart, spans []span) e
for _, cs := range sp.children {
childrenSize += cs.size()
}
br, err := uploadBytes(bs, newBytes(), childrenSize, sp.children)
if err != nil {
return err
}
future := uploadBytes(bs, newBytes(), childrenSize, sp.children)
parent.children = append(parent.children, future)
*dst = append(*dst, BytesPart{
BytesRef: br,
BytesRef: future.BlobRef(),
Size: uint64(childrenSize),
})
}
@ -254,19 +286,18 @@ func addBytesParts(bs blobserver.StatReceiver, dst *[]BytesPart, spans []span) e
Size: uint64(sp.to - sp.from),
})
}
return nil
}
// writeFileMap uploads chunks of r to bs while populating fileMap and
// finally uploading fileMap. The returned blobref is of fileMap's
// JSON blob. It uses rolling checksum for the chunks sizes.
func writeFileMapRolling(bs blobserver.StatReceiver, file *Builder, r io.Reader) (outbr *blobref.BlobRef, outerr error) {
func writeFileMapRolling(bs blobserver.StatReceiver, file *Builder, r io.Reader) (*blobref.BlobRef, error) {
n, spans, err := writeFileChunks(bs, file, r)
if err != nil {
return nil, err
}
// The top-level content parts
return uploadBytes(bs, file, n, spans)
return uploadBytes(bs, file, n, spans).Get()
}
// WriteFileChunks uploads chunks of r to bs while populating file.
@ -277,8 +308,9 @@ func WriteFileChunks(bs blobserver.StatReceiver, file *Builder, r io.Reader) err
return err
}
parts := []BytesPart{}
err = addBytesParts(bs, &parts, spans)
if err != nil {
future := newUploadBytesFuture()
addBytesParts(bs, &parts, spans, future)
if _, err := future.Get(); err != nil {
return err
}
return file.PopulateParts(size, parts)