From a534e02c2d6c09a31b3eda997a39496c00202a62 Mon Sep 17 00:00:00 2001 From: mpl Date: Tue, 9 Oct 2012 16:47:43 +0200 Subject: [PATCH] camput: client becomes StatReceiver. also added ReaderSize function to guess before reading the size of a source. Change-Id: I659e174821696b3c58759b132b82710a49583018 --- cmd/camput/files.go | 8 ++-- cmd/camsync/camsync.go | 4 +- pkg/client/config.go | 2 +- pkg/client/enumerate.go | 8 ++-- pkg/client/get.go | 18 ++++++++ pkg/readerutil/readersize.go | 51 +++++++++++++++++++++++ pkg/readerutil/readersize_test.go | 68 +++++++++++++++++++++++++++++++ 7 files changed, 147 insertions(+), 12 deletions(-) create mode 100644 pkg/readerutil/readersize.go create mode 100644 pkg/readerutil/readersize_test.go diff --git a/cmd/camput/files.go b/cmd/camput/files.go index 08059b8e3..56ba2be05 100644 --- a/cmd/camput/files.go +++ b/cmd/camput/files.go @@ -34,7 +34,6 @@ import ( "camlistore.org/pkg/blobref" "camlistore.org/pkg/blobserver" - "camlistore.org/pkg/blobserver/remote" "camlistore.org/pkg/client" "camlistore.org/pkg/schema" @@ -372,10 +371,9 @@ func (up *Uploader) uploadNode(n *node) (*client.PutResult, error) { func (up *Uploader) statReceiver() blobserver.StatReceiver { statReceiver := up.altStatReceiver if statReceiver == nil { - // TODO(bradfitz): just make Client be a - // StatReceiver? move remote's ReceiveBlob -> - // Upload wrapper into Client itself? - statReceiver = remote.NewFromClient(up.Client) + // TODO(mpl): simplify the altStatReceiver situation as well, + // see TODO in cmd/camput/uploader.go + statReceiver = up.Client } return statReceiver } diff --git a/cmd/camsync/camsync.go b/cmd/camsync/camsync.go index d5991c615..388fba1ef 100644 --- a/cmd/camsync/camsync.go +++ b/cmd/camsync/camsync.go @@ -101,7 +101,7 @@ func doPass(sc, dc *client.Client, passNum int) (stats SyncStats, retErr error) destErr := make(chan error) go func() { - srcErr <- sc.EnumerateBlobs(srcBlobs) + srcErr <- sc.SimpleEnumerateBlobs(srcBlobs) }() checkSourceError := func() { if err := <-srcErr; err != nil { @@ -118,7 +118,7 @@ func doPass(sc, dc *client.Client, passNum int) (stats SyncStats, retErr error) } go func() { - destErr <- dc.EnumerateBlobs(destBlobs) + destErr <- dc.SimpleEnumerateBlobs(destBlobs) }() checkDestError := func() { if err := <-destErr; err != nil { diff --git a/pkg/client/config.go b/pkg/client/config.go index 80a6e5998..b91e401c1 100644 --- a/pkg/client/config.go +++ b/pkg/client/config.go @@ -129,7 +129,7 @@ func fileExists(name string) bool { var ( signerPublicKeyRefOnce sync.Once - signerPublicKeyRef *blobref.BlobRef + signerPublicKeyRef *blobref.BlobRef ) // TODO: move to config package? diff --git a/pkg/client/enumerate.go b/pkg/client/enumerate.go index 02f8ad86f..2cc41eaa6 100644 --- a/pkg/client/enumerate.go +++ b/pkg/client/enumerate.go @@ -26,13 +26,13 @@ import ( ) type EnumerateOpts struct { - After string - MaxWait time.Duration // how long to poll for (second granularity), waiting for any blob, or 0 for no limit - Limit int // if non-zero, the max blobs to return + After string + MaxWait time.Duration // how long to poll for (second granularity), waiting for any blob, or 0 for no limit + Limit int // if non-zero, the max blobs to return } // Note: closes ch. -func (c *Client) EnumerateBlobs(ch chan<- blobref.SizedBlobRef) error { +func (c *Client) SimpleEnumerateBlobs(ch chan<- blobref.SizedBlobRef) error { return c.EnumerateBlobsOpts(ch, EnumerateOpts{}) } diff --git a/pkg/client/get.go b/pkg/client/get.go index f888b812c..44c456025 100644 --- a/pkg/client/get.go +++ b/pkg/client/get.go @@ -24,6 +24,7 @@ import ( "log" "camlistore.org/pkg/blobref" + "camlistore.org/pkg/readerutil" ) var _ = log.Printf @@ -64,3 +65,20 @@ func (c *Client) FetchVia(b *blobref.BlobRef, v []*blobref.BlobRef) (io.ReadClos return resp.Body, size, nil } + +func (c *Client) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (blobref.SizedBlobRef, error) { + size, ok := readerutil.ReaderSize(source) + if !ok { + size = -1 + } + h := &UploadHandle{ + BlobRef: blob, + Size: size, // -1 if we don't know + Contents: source, + } + pr, err := c.Upload(h) + if err != nil { + return blobref.SizedBlobRef{}, err + } + return pr.SizedBlobRef(), nil +} diff --git a/pkg/readerutil/readersize.go b/pkg/readerutil/readersize.go new file mode 100644 index 000000000..b0e1d4da2 --- /dev/null +++ b/pkg/readerutil/readersize.go @@ -0,0 +1,51 @@ +/* +Copyright 2012 The Camlistore Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package readerutil + +import ( + "bytes" + "io" + "os" +) + +// ReaderSize tries to determine the length of r. +func ReaderSize(r io.Reader) (size int64, ok bool) { + switch rt := r.(type) { + case io.Seeker: + pos, err := rt.Seek(0, os.SEEK_CUR) + if err != nil { + return + } + end, err := rt.Seek(0, os.SEEK_END) + if err != nil { + return + } + size = end - pos + pos1, err := rt.Seek(pos, os.SEEK_SET) + if err != nil || pos1 != pos { + msg := "failed to restore seek position" + if err != nil { + msg += ": " + err.Error() + } + panic(msg) + } + return size, true + case *bytes.Buffer: + return int64(rt.Len()), true + } + return +} diff --git a/pkg/readerutil/readersize_test.go b/pkg/readerutil/readersize_test.go new file mode 100644 index 000000000..2738e3567 --- /dev/null +++ b/pkg/readerutil/readersize_test.go @@ -0,0 +1,68 @@ +/* +Copyright 2012 The Camlistore Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package readerutil + +import ( + "bytes" + "io" + "io/ioutil" + "os" + "testing" +) + +const text = "HelloWorld" + +type testSrc struct { + name string + src io.Reader + want int64 +} + +func (tsrc *testSrc) run(t *testing.T) { + n, ok := ReaderSize(tsrc.src) + if !ok { + t.Fatalf("failed to read size for %q", tsrc.name) + } + if n != tsrc.want { + t.Fatalf("wanted %v, got %v", tsrc.want, n) + } +} + +func TestBytesBuffer(t *testing.T) { + buf := bytes.NewBuffer([]byte(text)) + tsrc := &testSrc{"buffer", buf, int64(len(text))} + tsrc.run(t) +} + +func TestSeeker(t *testing.T) { + f, err := ioutil.TempFile("", "camliTestReaderSize") + if err != nil { + t.Fatal(err) + } + defer os.Remove(f.Name()) + defer f.Close() + size, err := f.Write([]byte(text)) + if err != nil { + t.Fatal(err) + } + pos, err := f.Seek(5, 0) + if err != nil { + t.Fatal(err) + } + tsrc := &testSrc{"seeker", f, int64(size) - pos} + tsrc.run(t) +}