Some Issue 18 statistics work on camput

Change-Id: I409b0616aac5c8bdfd234cafed831b7339391ffa
This commit is contained in:
Brad Fitzpatrick 2011-09-29 10:37:24 -07:00
parent 8c72348a19
commit dc69d53aae
2 changed files with 81 additions and 14 deletions

View File

@ -28,6 +28,7 @@ import (
"sort"
"camli/blobref"
"camli/blobserver"
"camli/blobserver/remote"
"camli/client"
"camli/schema"
@ -43,7 +44,7 @@ var (
flagVerbose = flag.Bool("verbose", false, "extra debug logging")
)
var ErrUsage = os.NewError("invalid command usage")
var ErrUsage = UsageError("invalid command usage")
type UsageError string
@ -93,9 +94,15 @@ type HaveCache interface {
type Uploader struct {
*client.Client
// for debugging; normally nil, but overrides Client if set
// TODO(bradfitz): clean this up? embed a StatReceiver instead
// of a Client?
altStatReceiver blobserver.StatReceiver
entityFetcher jsonsign.EntityFetcher
transport *tinkerTransport
transport *tinkerTransport // for HTTP statistics
pwd string
statCache UploadCache
haveCache HaveCache
@ -126,7 +133,7 @@ func (up *Uploader) UploadFileBlob(filename string) (*client.PutResult, os.Error
if err != nil {
return nil, err
}
// assuming what I did here is not too lame, maybe I should set a limit on how much we accept from the stdin?
// TODO(bradfitz,mpl): limit this buffer size?
file := buf.Bytes()
s1 := sha1.New()
size, err = io.Copy(s1, buf)
@ -193,18 +200,27 @@ func (up *Uploader) UploadFile(filename string, rollSplits bool) (respr *client.
switch {
case fi.IsRegular():
m["camliType"] = "file"
file, err := os.Open(filename)
if err != nil {
return nil, err
}
defer file.Close()
storage := remote.NewFromClient(up.Client)
m["camliType"] = "file"
statReceiver := up.altStatReceiver
if statReceiver == nil {
// TODO(bradfitz): just make Client be a
// StatReceiver? move remote's ReceiveBlob ->
// Upload wrapper into Client itself?
statReceiver = remote.NewFromClient(up.Client)
}
schemaWriteFileMap := schema.WriteFileMap
if rollSplits {
schemaWriteFileMap = schema.WriteFileMapRolling
}
blobref, err := schemaWriteFileMap(storage, m, io.LimitReader(file, fi.Size))
blobref, err := schemaWriteFileMap(statReceiver, m, io.LimitReader(file, fi.Size))
if err != nil {
return nil, err
}
@ -258,7 +274,7 @@ func (up *Uploader) UploadFile(filename string, rollSplits bool) (respr *client.
for _, name := range dirNames {
rate <- true
go func(dirEntName string) {
pr, err := up.UploadFile(filename + "/" + dirEntName, rollSplits)
pr, err := up.UploadFile(filename+"/"+dirEntName, rollSplits)
if pr == nil && err == nil {
log.Fatalf("nil/nil from up.UploadFile on %q", filename+"/"+dirEntName)
}
@ -536,7 +552,7 @@ func main() {
} else {
err = cmd.RunCommand(up, cmdFlags.Args())
}
if ue, isUsage := err.(UsageError); isUsage || err == ErrUsage {
if ue, isUsage := err.(UsageError); isUsage {
if isUsage {
errf("%s\n", ue)
}

View File

@ -19,9 +19,13 @@ package main
import (
"flag"
"fmt"
"io"
"io/ioutil"
"os"
"strings"
"sync"
"camli/blobref"
"camli/client"
"camli/schema"
)
@ -36,7 +40,7 @@ type fileCmd struct {
havecache, statcache bool
// Go into in-memory stats mode only; doesn't actually upload.
memstats bool
memstats bool
}
func init() {
@ -79,10 +83,10 @@ func (c *fileCmd) RunCommand(up *Uploader, args []string) os.Error {
return UsageError("Can't set tag without using --permanode")
}
if c.memstats {
// TODO(bradfitz): implement
return os.NewError("TODO(bradfitz): implement")
sr := new(statsStatReceiver)
up.altStatReceiver = sr
AddSaveHook(func() { sr.DumpStats() })
}
if c.statcache {
cache := NewFlatStatCache()
AddSaveHook(func() { cache.Save() })
@ -109,8 +113,8 @@ func (c *fileCmd) RunCommand(up *Uploader, args []string) os.Error {
}
}
for _, arg := range args {
lastPut, err = up.UploadFile(arg, c.rollSplits)
for _, filename := range args {
lastPut, err = up.UploadFile(filename, c.rollSplits)
handleResult("file", lastPut, err)
if permaNode != nil {
@ -134,3 +138,50 @@ func (c *fileCmd) RunCommand(up *Uploader, args []string) os.Error {
}
return nil
}
// statsStatReceiver is a dummy blobserver.StatReceiver that doesn't store anything;
// it just collects statistics.
type statsStatReceiver struct {
mu sync.Mutex
have map[string]int64
}
func (sr *statsStatReceiver) lock() {
sr.mu.Lock()
if sr.have == nil {
sr.have = make(map[string]int64)
}
}
func (sr *statsStatReceiver) ReceiveBlob(blob *blobref.BlobRef, source io.Reader) (sb blobref.SizedBlobRef, err os.Error) {
n, err := io.Copy(ioutil.Discard, source)
if err != nil {
return
}
sr.lock()
defer sr.mu.Unlock()
sr.have[blob.String()] = n
return blobref.SizedBlobRef{blob, n}, nil
}
func (sr *statsStatReceiver) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs []*blobref.BlobRef, _ int) os.Error {
sr.lock()
defer sr.mu.Unlock()
for _, br := range blobs {
if size, ok := sr.have[br.String()]; ok {
dest <- blobref.SizedBlobRef{br, size}
}
}
return nil
}
func (sr *statsStatReceiver) DumpStats() {
sr.lock()
defer sr.mu.Unlock()
var sum int64
for _, size := range sr.have {
sum += size
}
fmt.Printf("In-memory blob stats: %d blobs, %d bytes\n", len(sr.have), sum)
}