diff --git a/cmd/camput/camput.go b/cmd/camput/camput.go index e6ce47a72..1d4fdbe66 100644 --- a/cmd/camput/camput.go +++ b/cmd/camput/camput.go @@ -59,9 +59,7 @@ func init() { if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); debug { debugFlagOnce.Do(registerDebugFlags) } - cmdmain.ExtraFlagRegistration = func() { - client.AddFlags() - } + cmdmain.ExtraFlagRegistration = client.AddFlags cmdmain.PreExit = func() { if up := uploader; up != nil { up.Close() diff --git a/cmd/camput/camput_test.go b/cmd/camput/camput_test.go index d7f8f567d..833e4a374 100644 --- a/cmd/camput/camput_test.go +++ b/cmd/camput/camput_test.go @@ -169,6 +169,8 @@ func TestUploadDirectories(t *testing.T) { } defer setAndRestore(&uploadWorkers, 1)() + defer setAndRestore(&dirUploadWorkers, 1)() + defer setAndRestore(&statCacheWorkers, 1)() e := &env{ Timeout: 5 * time.Second, diff --git a/cmd/camput/files.go b/cmd/camput/files.go index 82b3c3d83..ecb04444d 100644 --- a/cmd/camput/files.go +++ b/cmd/camput/files.go @@ -65,10 +65,12 @@ type fileCmd struct { histo string // optional histogram output filename } -var flagUseSQLiteChildCache bool // Use sqlite for the statcache and havecache. - var ( - uploadWorkers = -1 // concurrent upload workers (negative means unbounded: memory hog) + flagUseSQLiteChildCache bool // Use sqlite for the statcache and havecache. + + uploadWorkers = 10 // concurrent upload workers (negative means unbounded: memory hog) + dirUploadWorkers = 5 // concurrent directory uploading workers + statCacheWorkers = 10 // concurrent statcache workers ) func init() { @@ -98,6 +100,10 @@ func init() { } if android.IsChild() { flags.BoolVar(&cmd.argsFromInput, "stdinargs", false, "If true, filenames to upload are sent one-per-line on stdin. EOF means to quit the process with exit status 0.") + // limit number of goroutines to limit memory + uploadWorkers = 3 + dirUploadWorkers = 2 + statCacheWorkers = 3 } flagCacheLog = flags.Bool("logcache", false, "log caching details") @@ -936,7 +942,7 @@ func (t *TreeUpload) statPath(fullPath string, fi os.FileInfo) (nod *node, err e if err != nil { return nil, err } - sort.Sort(byFileName(fis)) + sort.Sort(byTypeAndName(fis)) for _, fi := range fis { depn, err := t.statPath(filepath.Join(fullPath, filepath.Base(fi.Name())), fi) if err != nil { @@ -1007,9 +1013,9 @@ func (t *TreeUpload) run() { } }) } else { - upload = NewNodeWorker(uploadWorkers, func(n *node, ok bool) { + dirUpload := NewNodeWorker(dirUploadWorkers, func(n *node, ok bool) { if !ok { - log.Printf("done with all uploads.") + log.Printf("done uploading directories - done with all uploads.") uploadsdonec <- true return } @@ -1018,7 +1024,25 @@ func (t *TreeUpload) run() { log.Fatalf("Error uploading %s: %v", n.fullPath, err) } n.SetPutResult(put, nil) - if c := t.up.statCache; c != nil && !n.fi.IsDir() { + uploadedc <- n + }) + + upload = NewNodeWorker(uploadWorkers, func(n *node, ok bool) { + if !ok { + log.Printf("done with all uploads.") + close(dirUpload) + return + } + if n.fi.IsDir() { + dirUpload <- n + return + } + put, err := t.up.uploadNode(n) + if err != nil { + log.Fatalf("Error uploading %s: %v", n.fullPath, err) + } + n.SetPutResult(put, nil) + if c := t.up.statCache; c != nil { c.AddCachedPutResult( t.up.pwd, n.fullPath, n.fi, put, withPermanode) } @@ -1026,7 +1050,7 @@ func (t *TreeUpload) run() { }) } - checkStatCache := NewNodeWorker(10, func(n *node, ok bool) { + checkStatCache := NewNodeWorker(statCacheWorkers, func(n *node, ok bool) { if hook := testHookStatCache; hook != nil { hook(n, ok) } @@ -1073,7 +1097,7 @@ Loop: t.skipped.incr(n) case n, ok := <-stattedc: if !ok { - log.Printf("done stattting:") + log.Printf("done statting:") dumpStats() close(checkStatCache) stattedc = nil @@ -1117,11 +1141,21 @@ func (t *TreeUpload) Wait() (*client.PutResult, error) { return t.finalPutRes, t.err } -type byFileName []os.FileInfo +type byTypeAndName []os.FileInfo -func (s byFileName) Len() int { return len(s) } -func (s byFileName) Less(i, j int) bool { return s[i].Name() < s[j].Name() } -func (s byFileName) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s byTypeAndName) Len() int { return len(s) } +func (s byTypeAndName) Less(i, j int) bool { + // files go before directories + if s[i].IsDir() { + if !s[j].IsDir() { + return false + } + } else if s[j].IsDir() { + return true + } + return s[i].Name() < s[j].Name() +} +func (s byTypeAndName) Swap(i, j int) { s[i], s[j] = s[j], s[i] } // trackDigestReader is an io.Reader wrapper which records the digest of what it reads. type trackDigestReader struct {