Merge "camput: limit upload concurrency (number of goroutines)"

This commit is contained in:
Brad Fitzpatrick 2014-02-21 23:35:19 +00:00 committed by Gerrit Code Review
commit 735aca5825
3 changed files with 50 additions and 16 deletions

View File

@ -59,9 +59,7 @@ func init() {
if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); debug {
debugFlagOnce.Do(registerDebugFlags)
}
cmdmain.ExtraFlagRegistration = func() {
client.AddFlags()
}
cmdmain.ExtraFlagRegistration = client.AddFlags
cmdmain.PreExit = func() {
if up := uploader; up != nil {
up.Close()

View File

@ -169,6 +169,8 @@ func TestUploadDirectories(t *testing.T) {
}
defer setAndRestore(&uploadWorkers, 1)()
defer setAndRestore(&dirUploadWorkers, 1)()
defer setAndRestore(&statCacheWorkers, 1)()
e := &env{
Timeout: 5 * time.Second,

View File

@ -65,10 +65,12 @@ type fileCmd struct {
histo string // optional histogram output filename
}
var flagUseSQLiteChildCache bool // Use sqlite for the statcache and havecache.
var (
uploadWorkers = -1 // concurrent upload workers (negative means unbounded: memory hog)
flagUseSQLiteChildCache bool // Use sqlite for the statcache and havecache.
uploadWorkers = 10 // concurrent upload workers (negative means unbounded: memory hog)
dirUploadWorkers = 5 // concurrent directory uploading workers
statCacheWorkers = 10 // concurrent statcache workers
)
func init() {
@ -98,6 +100,10 @@ func init() {
}
if android.IsChild() {
flags.BoolVar(&cmd.argsFromInput, "stdinargs", false, "If true, filenames to upload are sent one-per-line on stdin. EOF means to quit the process with exit status 0.")
// limit number of goroutines to limit memory
uploadWorkers = 3
dirUploadWorkers = 2
statCacheWorkers = 3
}
flagCacheLog = flags.Bool("logcache", false, "log caching details")
@ -936,7 +942,7 @@ func (t *TreeUpload) statPath(fullPath string, fi os.FileInfo) (nod *node, err e
if err != nil {
return nil, err
}
sort.Sort(byFileName(fis))
sort.Sort(byTypeAndName(fis))
for _, fi := range fis {
depn, err := t.statPath(filepath.Join(fullPath, filepath.Base(fi.Name())), fi)
if err != nil {
@ -1007,9 +1013,9 @@ func (t *TreeUpload) run() {
}
})
} else {
upload = NewNodeWorker(uploadWorkers, func(n *node, ok bool) {
dirUpload := NewNodeWorker(dirUploadWorkers, func(n *node, ok bool) {
if !ok {
log.Printf("done with all uploads.")
log.Printf("done uploading directories - done with all uploads.")
uploadsdonec <- true
return
}
@ -1018,7 +1024,25 @@ func (t *TreeUpload) run() {
log.Fatalf("Error uploading %s: %v", n.fullPath, err)
}
n.SetPutResult(put, nil)
if c := t.up.statCache; c != nil && !n.fi.IsDir() {
uploadedc <- n
})
upload = NewNodeWorker(uploadWorkers, func(n *node, ok bool) {
if !ok {
log.Printf("done with all uploads.")
close(dirUpload)
return
}
if n.fi.IsDir() {
dirUpload <- n
return
}
put, err := t.up.uploadNode(n)
if err != nil {
log.Fatalf("Error uploading %s: %v", n.fullPath, err)
}
n.SetPutResult(put, nil)
if c := t.up.statCache; c != nil {
c.AddCachedPutResult(
t.up.pwd, n.fullPath, n.fi, put, withPermanode)
}
@ -1026,7 +1050,7 @@ func (t *TreeUpload) run() {
})
}
checkStatCache := NewNodeWorker(10, func(n *node, ok bool) {
checkStatCache := NewNodeWorker(statCacheWorkers, func(n *node, ok bool) {
if hook := testHookStatCache; hook != nil {
hook(n, ok)
}
@ -1073,7 +1097,7 @@ Loop:
t.skipped.incr(n)
case n, ok := <-stattedc:
if !ok {
log.Printf("done stattting:")
log.Printf("done statting:")
dumpStats()
close(checkStatCache)
stattedc = nil
@ -1117,11 +1141,21 @@ func (t *TreeUpload) Wait() (*client.PutResult, error) {
return t.finalPutRes, t.err
}
type byFileName []os.FileInfo
type byTypeAndName []os.FileInfo
func (s byFileName) Len() int { return len(s) }
func (s byFileName) Less(i, j int) bool { return s[i].Name() < s[j].Name() }
func (s byFileName) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTypeAndName) Len() int { return len(s) }
func (s byTypeAndName) Less(i, j int) bool {
// files go before directories
if s[i].IsDir() {
if !s[j].IsDir() {
return false
}
} else if s[j].IsDir() {
return true
}
return s[i].Name() < s[j].Name()
}
func (s byTypeAndName) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// trackDigestReader is an io.Reader wrapper which records the digest of what it reads.
type trackDigestReader struct {