camput: client becomes StatReceiver.

also added ReaderSize function to
guess before reading the size of
a source.

Change-Id: I659e174821696b3c58759b132b82710a49583018
This commit is contained in:
mpl 2012-10-09 16:47:43 +02:00
parent 6baedb3dc7
commit a534e02c2d
7 changed files with 147 additions and 12 deletions

View File

@ -34,7 +34,6 @@ import (
"camlistore.org/pkg/blobref"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/blobserver/remote"
"camlistore.org/pkg/client"
"camlistore.org/pkg/schema"
@ -372,10 +371,9 @@ func (up *Uploader) uploadNode(n *node) (*client.PutResult, error) {
func (up *Uploader) statReceiver() blobserver.StatReceiver {
statReceiver := up.altStatReceiver
if statReceiver == nil {
// TODO(bradfitz): just make Client be a
// StatReceiver? move remote's ReceiveBlob ->
// Upload wrapper into Client itself?
statReceiver = remote.NewFromClient(up.Client)
// TODO(mpl): simplify the altStatReceiver situation as well,
// see TODO in cmd/camput/uploader.go
statReceiver = up.Client
}
return statReceiver
}

View File

@ -101,7 +101,7 @@ func doPass(sc, dc *client.Client, passNum int) (stats SyncStats, retErr error)
destErr := make(chan error)
go func() {
srcErr <- sc.EnumerateBlobs(srcBlobs)
srcErr <- sc.SimpleEnumerateBlobs(srcBlobs)
}()
checkSourceError := func() {
if err := <-srcErr; err != nil {
@ -118,7 +118,7 @@ func doPass(sc, dc *client.Client, passNum int) (stats SyncStats, retErr error)
}
go func() {
destErr <- dc.EnumerateBlobs(destBlobs)
destErr <- dc.SimpleEnumerateBlobs(destBlobs)
}()
checkDestError := func() {
if err := <-destErr; err != nil {

View File

@ -129,7 +129,7 @@ func fileExists(name string) bool {
var (
signerPublicKeyRefOnce sync.Once
signerPublicKeyRef *blobref.BlobRef
signerPublicKeyRef *blobref.BlobRef
)
// TODO: move to config package?

View File

@ -26,13 +26,13 @@ import (
)
type EnumerateOpts struct {
After string
MaxWait time.Duration // how long to poll for (second granularity), waiting for any blob, or 0 for no limit
Limit int // if non-zero, the max blobs to return
After string
MaxWait time.Duration // how long to poll for (second granularity), waiting for any blob, or 0 for no limit
Limit int // if non-zero, the max blobs to return
}
// Note: closes ch.
func (c *Client) EnumerateBlobs(ch chan<- blobref.SizedBlobRef) error {
func (c *Client) SimpleEnumerateBlobs(ch chan<- blobref.SizedBlobRef) error {
return c.EnumerateBlobsOpts(ch, EnumerateOpts{})
}

View File

@ -24,6 +24,7 @@ import (
"log"
"camlistore.org/pkg/blobref"
"camlistore.org/pkg/readerutil"
)
var _ = log.Printf
@ -64,3 +65,20 @@ func (c *Client) FetchVia(b *blobref.BlobRef, v []*blobref.BlobRef) (io.ReadClos
return resp.Body, size, nil
}
func (c *Client) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (blobref.SizedBlobRef, error) {
size, ok := readerutil.ReaderSize(source)
if !ok {
size = -1
}
h := &UploadHandle{
BlobRef: blob,
Size: size, // -1 if we don't know
Contents: source,
}
pr, err := c.Upload(h)
if err != nil {
return blobref.SizedBlobRef{}, err
}
return pr.SizedBlobRef(), nil
}

View File

@ -0,0 +1,51 @@
/*
Copyright 2012 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package readerutil
import (
"bytes"
"io"
"os"
)
// ReaderSize tries to determine the length of r.
func ReaderSize(r io.Reader) (size int64, ok bool) {
switch rt := r.(type) {
case io.Seeker:
pos, err := rt.Seek(0, os.SEEK_CUR)
if err != nil {
return
}
end, err := rt.Seek(0, os.SEEK_END)
if err != nil {
return
}
size = end - pos
pos1, err := rt.Seek(pos, os.SEEK_SET)
if err != nil || pos1 != pos {
msg := "failed to restore seek position"
if err != nil {
msg += ": " + err.Error()
}
panic(msg)
}
return size, true
case *bytes.Buffer:
return int64(rt.Len()), true
}
return
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2012 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package readerutil
import (
"bytes"
"io"
"io/ioutil"
"os"
"testing"
)
const text = "HelloWorld"
type testSrc struct {
name string
src io.Reader
want int64
}
func (tsrc *testSrc) run(t *testing.T) {
n, ok := ReaderSize(tsrc.src)
if !ok {
t.Fatalf("failed to read size for %q", tsrc.name)
}
if n != tsrc.want {
t.Fatalf("wanted %v, got %v", tsrc.want, n)
}
}
func TestBytesBuffer(t *testing.T) {
buf := bytes.NewBuffer([]byte(text))
tsrc := &testSrc{"buffer", buf, int64(len(text))}
tsrc.run(t)
}
func TestSeeker(t *testing.T) {
f, err := ioutil.TempFile("", "camliTestReaderSize")
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
defer f.Close()
size, err := f.Write([]byte(text))
if err != nil {
t.Fatal(err)
}
pos, err := f.Seek(5, 0)
if err != nil {
t.Fatal(err)
}
tsrc := &testSrc{"seeker", f, int64(size) - pos}
tsrc.run(t)
}