diff --git a/clients/go/camput/camput.go b/clients/go/camput/camput.go index 6cf2a51c5..1f05aa28b 100644 --- a/clients/go/camput/camput.go +++ b/clients/go/camput/camput.go @@ -28,6 +28,7 @@ import ( "sort" "camli/blobref" + "camli/blobserver" "camli/blobserver/remote" "camli/client" "camli/schema" @@ -43,7 +44,7 @@ var ( flagVerbose = flag.Bool("verbose", false, "extra debug logging") ) -var ErrUsage = os.NewError("invalid command usage") +var ErrUsage = UsageError("invalid command usage") type UsageError string @@ -93,9 +94,15 @@ type HaveCache interface { type Uploader struct { *client.Client + + // for debugging; normally nil, but overrides Client if set + // TODO(bradfitz): clean this up? embed a StatReceiver instead + // of a Client? + altStatReceiver blobserver.StatReceiver + entityFetcher jsonsign.EntityFetcher - transport *tinkerTransport + transport *tinkerTransport // for HTTP statistics pwd string statCache UploadCache haveCache HaveCache @@ -126,7 +133,7 @@ func (up *Uploader) UploadFileBlob(filename string) (*client.PutResult, os.Error if err != nil { return nil, err } - // assuming what I did here is not too lame, maybe I should set a limit on how much we accept from the stdin? + // TODO(bradfitz,mpl): limit this buffer size? file := buf.Bytes() s1 := sha1.New() size, err = io.Copy(s1, buf) @@ -193,18 +200,27 @@ func (up *Uploader) UploadFile(filename string, rollSplits bool) (respr *client. switch { case fi.IsRegular(): + m["camliType"] = "file" + file, err := os.Open(filename) if err != nil { return nil, err } defer file.Close() - storage := remote.NewFromClient(up.Client) - m["camliType"] = "file" + + statReceiver := up.altStatReceiver + if statReceiver == nil { + // TODO(bradfitz): just make Client be a + // StatReceiver? move remote's ReceiveBlob -> + // Upload wrapper into Client itself? + statReceiver = remote.NewFromClient(up.Client) + } + schemaWriteFileMap := schema.WriteFileMap if rollSplits { schemaWriteFileMap = schema.WriteFileMapRolling } - blobref, err := schemaWriteFileMap(storage, m, io.LimitReader(file, fi.Size)) + blobref, err := schemaWriteFileMap(statReceiver, m, io.LimitReader(file, fi.Size)) if err != nil { return nil, err } @@ -258,7 +274,7 @@ func (up *Uploader) UploadFile(filename string, rollSplits bool) (respr *client. for _, name := range dirNames { rate <- true go func(dirEntName string) { - pr, err := up.UploadFile(filename + "/" + dirEntName, rollSplits) + pr, err := up.UploadFile(filename+"/"+dirEntName, rollSplits) if pr == nil && err == nil { log.Fatalf("nil/nil from up.UploadFile on %q", filename+"/"+dirEntName) } @@ -536,7 +552,7 @@ func main() { } else { err = cmd.RunCommand(up, cmdFlags.Args()) } - if ue, isUsage := err.(UsageError); isUsage || err == ErrUsage { + if ue, isUsage := err.(UsageError); isUsage { if isUsage { errf("%s\n", ue) } diff --git a/clients/go/camput/files.go b/clients/go/camput/files.go index 5c800d677..67b11dcad 100644 --- a/clients/go/camput/files.go +++ b/clients/go/camput/files.go @@ -19,9 +19,13 @@ package main import ( "flag" "fmt" + "io" + "io/ioutil" "os" "strings" + "sync" + "camli/blobref" "camli/client" "camli/schema" ) @@ -36,7 +40,7 @@ type fileCmd struct { havecache, statcache bool // Go into in-memory stats mode only; doesn't actually upload. - memstats bool + memstats bool } func init() { @@ -79,10 +83,10 @@ func (c *fileCmd) RunCommand(up *Uploader, args []string) os.Error { return UsageError("Can't set tag without using --permanode") } if c.memstats { - // TODO(bradfitz): implement - return os.NewError("TODO(bradfitz): implement") + sr := new(statsStatReceiver) + up.altStatReceiver = sr + AddSaveHook(func() { sr.DumpStats() }) } - if c.statcache { cache := NewFlatStatCache() AddSaveHook(func() { cache.Save() }) @@ -109,8 +113,8 @@ func (c *fileCmd) RunCommand(up *Uploader, args []string) os.Error { } } - for _, arg := range args { - lastPut, err = up.UploadFile(arg, c.rollSplits) + for _, filename := range args { + lastPut, err = up.UploadFile(filename, c.rollSplits) handleResult("file", lastPut, err) if permaNode != nil { @@ -134,3 +138,50 @@ func (c *fileCmd) RunCommand(up *Uploader, args []string) os.Error { } return nil } + +// statsStatReceiver is a dummy blobserver.StatReceiver that doesn't store anything; +// it just collects statistics. +type statsStatReceiver struct { + mu sync.Mutex + have map[string]int64 +} + +func (sr *statsStatReceiver) lock() { + sr.mu.Lock() + if sr.have == nil { + sr.have = make(map[string]int64) + } +} + +func (sr *statsStatReceiver) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (sb blobref.SizedBlobRef, err os.Error) { + n, err := io.Copy(ioutil.Discard, source) + if err != nil { + return + } + sr.lock() + defer sr.mu.Unlock() + sr.have[blob.String()] = n + return blobref.SizedBlobRef{blob, n}, nil +} + +func (sr *statsStatReceiver) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs []*blobref.BlobRef, _ int) os.Error { + sr.lock() + defer sr.mu.Unlock() + for _, br := range blobs { + if size, ok := sr.have[br.String()]; ok { + dest <- blobref.SizedBlobRef{br, size} + } + } + return nil +} + +func (sr *statsStatReceiver) DumpStats() { + sr.lock() + defer sr.mu.Unlock() + + var sum int64 + for _, size := range sr.have { + sum += size + } + fmt.Printf("In-memory blob stats: %d blobs, %d bytes\n", len(sr.have), sum) +}