From 9eed2f6082bff9705ad441e558e98d36009b6fd4 Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 2 Feb 2013 22:35:08 -0800 Subject: [PATCH] schema: remove FileWriter's serial byte/file schema uploads after chunks are uploaded Change-Id: I1c3a9c496ce168cc8b6d087ef1a0de99cac22127 --- pkg/schema/filewriter.go | 74 ++++++++++++++++++++++++++++------------ 1 file changed, 53 insertions(+), 21 deletions(-) diff --git a/pkg/schema/filewriter.go b/pkg/schema/filewriter.go index f50c5c7f8..e492906ca 100644 --- a/pkg/schema/filewriter.go +++ b/pkg/schema/filewriter.go @@ -181,10 +181,9 @@ func (r *noteEOFReader) Read(p []byte) (n int, err error) { return } -// br may be nil, to mean calculate it. func uploadString(bs blobserver.StatReceiver, br *blobref.BlobRef, s string) (*blobref.BlobRef, error) { if br == nil { - br = blobref.SHA1FromString(s) + panic("nil blobref") } hasIt, err := serverHasBlob(bs, br) if err != nil { @@ -204,21 +203,56 @@ func uploadString(bs blobserver.StatReceiver, br *blobref.BlobRef, s string) (*b // "file", which is a superset of "bytes"), sets it to the provided // size, and populates with provided spans. The bytes or file schema // blob is uploaded and its blobref is returned. -func uploadBytes(bs blobserver.StatReceiver, bb *Builder, size int64, s []span) (*blobref.BlobRef, error) { +func uploadBytes(bs blobserver.StatReceiver, bb *Builder, size int64, s []span) *uploadBytesFuture { + future := newUploadBytesFuture() parts := []BytesPart{} - err := addBytesParts(bs, &parts, s) - if err != nil { - return nil, err + addBytesParts(bs, &parts, s, future) + + if err := bb.PopulateParts(size, parts); err != nil { + future.errc <- err + } else { + json := bb.Blob().JSON() + br := blobref.SHA1FromString(json) + future.br = br + go func() { + _, err := uploadString(bs, br, json) + future.errc <- err + }() } - err = bb.PopulateParts(size, parts) - if err != nil { - return nil, err + return future +} + +func newUploadBytesFuture() *uploadBytesFuture { + return &uploadBytesFuture{ + errc: make(chan error, 1), } - return uploadString(bs, nil, bb.Blob().JSON()) +} + +// An uploadBytesFuture is an eager result of a still-in-progress uploadBytes call. +// Call Get to wait and get its final result. +type uploadBytesFuture struct { + br *blobref.BlobRef + errc chan error + children []*uploadBytesFuture +} + +// BlobRef returns the optimistic blobref of this uploadBytes call without blocking. +func (f *uploadBytesFuture) BlobRef() *blobref.BlobRef { + return f.br +} + +// Get blocks for all children and returns any final error. +func (f *uploadBytesFuture) Get() (*blobref.BlobRef, error) { + for _, f := range f.children { + if _, err := f.Get(); err != nil { + return nil, err + } + } + return f.br, <-f.errc } // addBytesParts uploads the provided spans to bs, appending elements to *dst. -func addBytesParts(bs blobserver.StatReceiver, dst *[]BytesPart, spans []span) error { +func addBytesParts(bs blobserver.StatReceiver, dst *[]BytesPart, spans []span, parent *uploadBytesFuture) { for _, sp := range spans { if len(sp.children) == 1 && sp.children[0].isSingleBlob() { // Remove an occasional useless indirection of @@ -237,12 +271,10 @@ func addBytesParts(bs blobserver.StatReceiver, dst *[]BytesPart, spans []span) e for _, cs := range sp.children { childrenSize += cs.size() } - br, err := uploadBytes(bs, newBytes(), childrenSize, sp.children) - if err != nil { - return err - } + future := uploadBytes(bs, newBytes(), childrenSize, sp.children) + parent.children = append(parent.children, future) *dst = append(*dst, BytesPart{ - BytesRef: br, + BytesRef: future.BlobRef(), Size: uint64(childrenSize), }) } @@ -254,19 +286,18 @@ func addBytesParts(bs blobserver.StatReceiver, dst *[]BytesPart, spans []span) e Size: uint64(sp.to - sp.from), }) } - return nil } // writeFileMap uploads chunks of r to bs while populating fileMap and // finally uploading fileMap. The returned blobref is of fileMap's // JSON blob. It uses rolling checksum for the chunks sizes. -func writeFileMapRolling(bs blobserver.StatReceiver, file *Builder, r io.Reader) (outbr *blobref.BlobRef, outerr error) { +func writeFileMapRolling(bs blobserver.StatReceiver, file *Builder, r io.Reader) (*blobref.BlobRef, error) { n, spans, err := writeFileChunks(bs, file, r) if err != nil { return nil, err } // The top-level content parts - return uploadBytes(bs, file, n, spans) + return uploadBytes(bs, file, n, spans).Get() } // WriteFileChunks uploads chunks of r to bs while populating file. @@ -277,8 +308,9 @@ func WriteFileChunks(bs blobserver.StatReceiver, file *Builder, r io.Reader) err return err } parts := []BytesPart{} - err = addBytesParts(bs, &parts, spans) - if err != nil { + future := newUploadBytesFuture() + addBytesParts(bs, &parts, spans, future) + if _, err := future.Get(); err != nil { return err } return file.PopulateParts(size, parts)