diff --git a/.last_go_version b/.last_go_version index d5d553e0f..68861bd02 100644 --- a/.last_go_version +++ b/.last_go_version @@ -1 +1 @@ -6g version weekly.2011-09-16 9821+ +6g version weekly.2011-09-21 9853 diff --git a/clients/go/camput/camput.go b/clients/go/camput/camput.go index 9de341914..971966cd8 100644 --- a/clients/go/camput/camput.go +++ b/clients/go/camput/camput.go @@ -33,6 +33,8 @@ import ( "camli/jsonsign" ) +const buffered = 16 // arbitrary + // Things that can be uploaded. (at most one of these) var flagBlob = flag.Bool("blob", false, "upload a file's bytes as a single blob") var flagFile = flag.Bool("file", false, "upload a file's bytes as a blob, as well as its JSON file record") @@ -44,10 +46,15 @@ var flagRemove = flag.Bool("remove", false, "remove the list of blobrefs") var flagName = flag.String("name", "", "Optional name attribute to set on permanode when using -permanode and -file") var flagTag = flag.String("tag", "", "Optional tag attribute to set on permanode when using -permanode and -file. Single value or comma separated ones.") -var flagVerbose = flag.Bool("verbose", false, "be verbose") +var ( + flagVerbose = flag.Bool("verbose", false, "be verbose") + flagCacheLog = flag.Bool("logcache", false, "log caching details") +) -var flagUseStatCache = flag.Bool("statcache", false, "Use the stat cache, assuming unchanged files already uploaded in the past are still there. Fast, but potentially dangerous.") -var flagUseHaveCache = flag.Bool("havecache", false, "Use the 'have cache', a cache keeping track of what blobs the remote server should already have from previous uploads.") +var ( +flagUseStatCache = flag.Bool("statcache", false, "Use the stat cache, assuming unchanged files already uploaded in the past are still there. Fast, but potentially dangerous.") + flagUseHaveCache = flag.Bool("havecache", false, "Use the 'have cache', a cache keeping track of what blobs the remote server should already have from previous uploads.") +) var flagSetAttr = flag.Bool("set-attr", false, "set (replace) an attribute") var flagAddAttr = flag.Bool("add-attr", false, "add an attribute, additional if one already exists") @@ -89,14 +96,7 @@ func blobDetails(contents io.ReadSeeker) (bref *blobref.BlobRef, size int64, err return } -func vprintf(format string, args ...interface{}) { - if *flagVerbose { - log.Printf(format, args...) - } -} - func (up *Uploader) UploadFileBlob(filename string) (*client.PutResult, os.Error) { - vprintf("Uploading filename: %s", filename) fi, err := os.Stat(filename) if err != nil { return nil, err @@ -114,21 +114,22 @@ func (up *Uploader) UploadFileBlob(filename string) (*client.PutResult, os.Error return nil, err } file.Seek(0, 0) - handle := &client.UploadHandle{ref, size, file} + body := io.LimitReader(file, size) + handle := &client.UploadHandle{ref, size, body} return up.Upload(handle) } -func (up *Uploader) beginFileUpload() { +func (up *Uploader) getUploadToken() { up.filecapc <- true } -func (up *Uploader) endFileUpload() { +func (up *Uploader) releaseUploadToken() { <-up.filecapc } func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr os.Error) { - up.beginFileUpload() - defer up.endFileUpload() + up.getUploadToken() + defer up.releaseUploadToken() fi, err := os.Lstat(filename) if err != nil { @@ -138,7 +139,7 @@ func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr if up.statCache != nil && fi.IsRegular() { cachedRes, err := up.statCache.CachedPutResult(up.pwd, filename, fi) if err == nil { - vprintf("Cache HIT on %q -> %v", filename, cachedRes) + cachelog.Printf("Cache HIT on %q -> %v", filename, cachedRes) return cachedRes, nil } defer func() { @@ -183,14 +184,61 @@ func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr } dir.Close() sort.Strings(dirNames) - // TODO: process dirName entries in parallel - for _, dirEntName := range dirNames { - pr, err := up.UploadFile(filename + "/" + dirEntName) - if err != nil { - return nil, err + + // Temporarily give up our upload token while we + // process all our children. The defer function makes + // sure we re-acquire it (keeping balance in the + // world) before we return. + up.releaseUploadToken() + tokenTookBack := false + defer func() { + if !tokenTookBack { + up.getUploadToken() } - ss.Add(pr.BlobRef) + }() + + rate := make(chan bool, 100) // max outstanding goroutines, further limited by filecapc + type nameResult struct { + name string + putres *client.PutResult + err os.Error } + + resc := make(chan nameResult, buffered) + go func() { + for _, name := range dirNames { + rate <- true + go func(dirEntName string) { + pr, err := up.UploadFile(filename + "/" + dirEntName) + if pr == nil && err == nil { + log.Fatalf("nil/nil from up.UploadFile on %q", filename+"/"+dirEntName) + } + resc <- nameResult{dirEntName, pr, err} + <-rate + }(name) + } + }() + resm := make(map[string]*client.PutResult) + var entUploadErr os.Error + for _ = range dirNames { + r := <-resc + if r.err != nil { + entUploadErr = fmt.Errorf("error uploading %s: %v", r.name, r.err) + continue + } + resm[r.name] = r.putres + } + if entUploadErr != nil { + return nil, entUploadErr + } + for _, name := range dirNames { + ss.Add(resm[name].BlobRef) + } + + // Re-acquire the upload token that we temporarily yielded up above. + up.getUploadToken() + tokenTookBack = true + sspr, err := up.UploadMap(ss.Map()) if err != nil { return nil, err @@ -209,7 +257,11 @@ func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr } mappr, err := up.UploadMap(m) - vprintf("Uploaded map: %v => %v, %v", m, mappr, err) + if err == nil { + vlog.Printf("Uploaded %q, %s for %s", m["camliType"], mappr.BlobRef, filename) + } else { + vlog.Printf("Error uploading map %v: %v", m, err) + } return mappr, err } @@ -238,7 +290,6 @@ func (up *Uploader) UploadMap(m map[string]interface{}) (*client.PutResult, os.E if err != nil { return nil, err } - vprintf("json: %s\n", json) return up.uploadString(json) } @@ -253,13 +304,16 @@ func (up *Uploader) UploadAndSignMap(m map[string]interface{}) (*client.PutResul func (up *Uploader) uploadString(s string) (*client.PutResult, os.Error) { uh := client.NewUploadHandleFromString(s) if c := up.haveCache; c != nil && c.BlobExists(uh.BlobRef) { - vprintf("HaveCache HIT for %s / %d", uh.BlobRef, uh.Size) + cachelog.Printf("HaveCache HIT for %s / %d", uh.BlobRef, uh.Size) return &client.PutResult{BlobRef: uh.BlobRef, Size: uh.Size, Skipped: true}, nil } pr, err := up.Upload(uh) if err == nil && up.haveCache != nil { up.haveCache.NoteBlobExists(uh.BlobRef) } + if pr == nil && err == nil { + log.Fatalf("Got nil/nil in uploadString while uploading %s", s) + } return pr, err } @@ -304,11 +358,7 @@ func handleResult(what string, pr *client.PutResult, err os.Error) { wereErrors = true return } - if *flagVerbose { - fmt.Printf("Put %s: %q\n", what, pr) - } else { - fmt.Println(pr.BlobRef.String()) - } + fmt.Println(pr.BlobRef.String()) } func main() { @@ -328,6 +378,7 @@ func main() { } transport := new(tinkerTransport) + transport.transport = &http.Transport{DisableKeepAlives: false} cc.SetHttpClient(&http.Client{Transport: transport}) pwd, err := os.Getwd() diff --git a/clients/go/camput/flatcache.go b/clients/go/camput/flatcache.go index 904278e50..49f694b84 100644 --- a/clients/go/camput/flatcache.go +++ b/clients/go/camput/flatcache.go @@ -79,6 +79,7 @@ func cacheKey(pwd, filename string) string { return filepath.Clean(pwd) + "\x00" + filepath.Clean(filename) } + func (c *FlatStatCache) CachedPutResult(pwd, filename string, fi *os.FileInfo) (*client.PutResult, os.Error) { c.mu.Lock() defer c.mu.Unlock() @@ -86,11 +87,11 @@ func (c *FlatStatCache) CachedPutResult(pwd, filename string, fi *os.FileInfo) ( key := cacheKey(pwd, filename) val, ok := c.m[key] if !ok { - log.Printf("cache MISS on %q: not in cache", key) + cachelog.Printf("cache MISS on %q: not in cache", key) return nil, ErrCacheMiss } if !reflect.DeepEqual(&val.Fi, fi) { - log.Printf("cache MISS on %q: stats not equal:\n%#v\n%#v", key, val.Fi, fi) + cachelog.Printf("cache MISS on %q: stats not equal:\n%#v\n%#v", key, val.Fi, fi) return nil, ErrCacheMiss } pr := val.Pr @@ -103,7 +104,7 @@ func (c *FlatStatCache) AddCachedPutResult(pwd, filename string, fi *os.FileInfo key := cacheKey(pwd, filename) val := fileInfoPutRes{*fi, *pr} - vprintf("Adding to stat cache %q: %v", key, val) + cachelog.Printf("Adding to stat cache %q: %v", key, val) c.dirty[key] = val c.m[key] = val @@ -113,7 +114,7 @@ func (c *FlatStatCache) Save() { c.mu.Lock() defer c.mu.Unlock() if len(c.dirty) == 0 { - vprintf("FlatStatCache: Save, but nothing dirty") + cachelog.Printf("FlatStatCache: Save, but nothing dirty") return } @@ -133,7 +134,7 @@ func (c *FlatStatCache) Save() { write(v) } c.dirty = make(map[string]fileInfoPutRes) - log.Printf("FlatStatCache: saved") + cachelog.Printf("FlatStatCache: saved") } type FlatHaveCache struct { @@ -182,7 +183,7 @@ func (c *FlatHaveCache) Save() { c.mu.Lock() defer c.mu.Unlock() if len(c.dirty) == 0 { - vprintf("FlatHaveCache: Save, but nothing dirty") + cachelog.Printf("FlatHaveCache: Save, but nothing dirty") return } @@ -201,5 +202,5 @@ func (c *FlatHaveCache) Save() { write(k) } c.dirty = make(map[string]bool) - log.Printf("FlatHaveCache: saved") + cachelog.Printf("FlatHaveCache: saved") } diff --git a/clients/go/camput/logging.go b/clients/go/camput/logging.go new file mode 100644 index 000000000..8f7035d94 --- /dev/null +++ b/clients/go/camput/logging.go @@ -0,0 +1,38 @@ +/* +Copyright 2011 Google Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "log" +) + +type Logger interface { + Printf(format string, args ...interface{}) +} + +type flagLogger struct { + on *bool +} + +var vlog = &flagLogger{flagVerbose} +var cachelog = &flagLogger{flagCacheLog} + +func (fl *flagLogger) Printf(format string, args ...interface{}) { + if *fl.on { + log.Printf(format, args...) + } +}