camput: do directories in parallel, don't deadlock, cleanup, sanitize logging, ...

start of much more work.

Change-Id: I667260895a3f6f1425937189ff9752b18bd6de0d
This commit is contained in:
Brad Fitzpatrick 2011-09-24 17:20:47 -07:00
parent 35231e69f6
commit c3d6526fd1
4 changed files with 128 additions and 38 deletions

View File

@ -1 +1 @@
6g version weekly.2011-09-16 9821+
6g version weekly.2011-09-21 9853

View File

@ -33,6 +33,8 @@ import (
"camli/jsonsign"
)
const buffered = 16 // arbitrary
// Things that can be uploaded. (at most one of these)
var flagBlob = flag.Bool("blob", false, "upload a file's bytes as a single blob")
var flagFile = flag.Bool("file", false, "upload a file's bytes as a blob, as well as its JSON file record")
@ -44,10 +46,15 @@ var flagRemove = flag.Bool("remove", false, "remove the list of blobrefs")
var flagName = flag.String("name", "", "Optional name attribute to set on permanode when using -permanode and -file")
var flagTag = flag.String("tag", "", "Optional tag attribute to set on permanode when using -permanode and -file. Single value or comma separated ones.")
var flagVerbose = flag.Bool("verbose", false, "be verbose")
var (
flagVerbose = flag.Bool("verbose", false, "be verbose")
flagCacheLog = flag.Bool("logcache", false, "log caching details")
)
var flagUseStatCache = flag.Bool("statcache", false, "Use the stat cache, assuming unchanged files already uploaded in the past are still there. Fast, but potentially dangerous.")
var flagUseHaveCache = flag.Bool("havecache", false, "Use the 'have cache', a cache keeping track of what blobs the remote server should already have from previous uploads.")
var (
flagUseStatCache = flag.Bool("statcache", false, "Use the stat cache, assuming unchanged files already uploaded in the past are still there. Fast, but potentially dangerous.")
flagUseHaveCache = flag.Bool("havecache", false, "Use the 'have cache', a cache keeping track of what blobs the remote server should already have from previous uploads.")
)
var flagSetAttr = flag.Bool("set-attr", false, "set (replace) an attribute")
var flagAddAttr = flag.Bool("add-attr", false, "add an attribute, additional if one already exists")
@ -89,14 +96,7 @@ func blobDetails(contents io.ReadSeeker) (bref *blobref.BlobRef, size int64, err
return
}
func vprintf(format string, args ...interface{}) {
if *flagVerbose {
log.Printf(format, args...)
}
}
func (up *Uploader) UploadFileBlob(filename string) (*client.PutResult, os.Error) {
vprintf("Uploading filename: %s", filename)
fi, err := os.Stat(filename)
if err != nil {
return nil, err
@ -114,21 +114,22 @@ func (up *Uploader) UploadFileBlob(filename string) (*client.PutResult, os.Error
return nil, err
}
file.Seek(0, 0)
handle := &client.UploadHandle{ref, size, file}
body := io.LimitReader(file, size)
handle := &client.UploadHandle{ref, size, body}
return up.Upload(handle)
}
func (up *Uploader) beginFileUpload() {
func (up *Uploader) getUploadToken() {
up.filecapc <- true
}
func (up *Uploader) endFileUpload() {
func (up *Uploader) releaseUploadToken() {
<-up.filecapc
}
func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr os.Error) {
up.beginFileUpload()
defer up.endFileUpload()
up.getUploadToken()
defer up.releaseUploadToken()
fi, err := os.Lstat(filename)
if err != nil {
@ -138,7 +139,7 @@ func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr
if up.statCache != nil && fi.IsRegular() {
cachedRes, err := up.statCache.CachedPutResult(up.pwd, filename, fi)
if err == nil {
vprintf("Cache HIT on %q -> %v", filename, cachedRes)
cachelog.Printf("Cache HIT on %q -> %v", filename, cachedRes)
return cachedRes, nil
}
defer func() {
@ -183,14 +184,61 @@ func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr
}
dir.Close()
sort.Strings(dirNames)
// TODO: process dirName entries in parallel
for _, dirEntName := range dirNames {
pr, err := up.UploadFile(filename + "/" + dirEntName)
if err != nil {
return nil, err
// Temporarily give up our upload token while we
// process all our children. The defer function makes
// sure we re-acquire it (keeping balance in the
// world) before we return.
up.releaseUploadToken()
tokenTookBack := false
defer func() {
if !tokenTookBack {
up.getUploadToken()
}
ss.Add(pr.BlobRef)
}()
rate := make(chan bool, 100) // max outstanding goroutines, further limited by filecapc
type nameResult struct {
name string
putres *client.PutResult
err os.Error
}
resc := make(chan nameResult, buffered)
go func() {
for _, name := range dirNames {
rate <- true
go func(dirEntName string) {
pr, err := up.UploadFile(filename + "/" + dirEntName)
if pr == nil && err == nil {
log.Fatalf("nil/nil from up.UploadFile on %q", filename+"/"+dirEntName)
}
resc <- nameResult{dirEntName, pr, err}
<-rate
}(name)
}
}()
resm := make(map[string]*client.PutResult)
var entUploadErr os.Error
for _ = range dirNames {
r := <-resc
if r.err != nil {
entUploadErr = fmt.Errorf("error uploading %s: %v", r.name, r.err)
continue
}
resm[r.name] = r.putres
}
if entUploadErr != nil {
return nil, entUploadErr
}
for _, name := range dirNames {
ss.Add(resm[name].BlobRef)
}
// Re-acquire the upload token that we temporarily yielded up above.
up.getUploadToken()
tokenTookBack = true
sspr, err := up.UploadMap(ss.Map())
if err != nil {
return nil, err
@ -209,7 +257,11 @@ func (up *Uploader) UploadFile(filename string) (respr *client.PutResult, outerr
}
mappr, err := up.UploadMap(m)
vprintf("Uploaded map: %v => %v, %v", m, mappr, err)
if err == nil {
vlog.Printf("Uploaded %q, %s for %s", m["camliType"], mappr.BlobRef, filename)
} else {
vlog.Printf("Error uploading map %v: %v", m, err)
}
return mappr, err
}
@ -238,7 +290,6 @@ func (up *Uploader) UploadMap(m map[string]interface{}) (*client.PutResult, os.E
if err != nil {
return nil, err
}
vprintf("json: %s\n", json)
return up.uploadString(json)
}
@ -253,13 +304,16 @@ func (up *Uploader) UploadAndSignMap(m map[string]interface{}) (*client.PutResul
func (up *Uploader) uploadString(s string) (*client.PutResult, os.Error) {
uh := client.NewUploadHandleFromString(s)
if c := up.haveCache; c != nil && c.BlobExists(uh.BlobRef) {
vprintf("HaveCache HIT for %s / %d", uh.BlobRef, uh.Size)
cachelog.Printf("HaveCache HIT for %s / %d", uh.BlobRef, uh.Size)
return &client.PutResult{BlobRef: uh.BlobRef, Size: uh.Size, Skipped: true}, nil
}
pr, err := up.Upload(uh)
if err == nil && up.haveCache != nil {
up.haveCache.NoteBlobExists(uh.BlobRef)
}
if pr == nil && err == nil {
log.Fatalf("Got nil/nil in uploadString while uploading %s", s)
}
return pr, err
}
@ -304,11 +358,7 @@ func handleResult(what string, pr *client.PutResult, err os.Error) {
wereErrors = true
return
}
if *flagVerbose {
fmt.Printf("Put %s: %q\n", what, pr)
} else {
fmt.Println(pr.BlobRef.String())
}
fmt.Println(pr.BlobRef.String())
}
func main() {
@ -328,6 +378,7 @@ func main() {
}
transport := new(tinkerTransport)
transport.transport = &http.Transport{DisableKeepAlives: false}
cc.SetHttpClient(&http.Client{Transport: transport})
pwd, err := os.Getwd()

View File

@ -79,6 +79,7 @@ func cacheKey(pwd, filename string) string {
return filepath.Clean(pwd) + "\x00" + filepath.Clean(filename)
}
func (c *FlatStatCache) CachedPutResult(pwd, filename string, fi *os.FileInfo) (*client.PutResult, os.Error) {
c.mu.Lock()
defer c.mu.Unlock()
@ -86,11 +87,11 @@ func (c *FlatStatCache) CachedPutResult(pwd, filename string, fi *os.FileInfo) (
key := cacheKey(pwd, filename)
val, ok := c.m[key]
if !ok {
log.Printf("cache MISS on %q: not in cache", key)
cachelog.Printf("cache MISS on %q: not in cache", key)
return nil, ErrCacheMiss
}
if !reflect.DeepEqual(&val.Fi, fi) {
log.Printf("cache MISS on %q: stats not equal:\n%#v\n%#v", key, val.Fi, fi)
cachelog.Printf("cache MISS on %q: stats not equal:\n%#v\n%#v", key, val.Fi, fi)
return nil, ErrCacheMiss
}
pr := val.Pr
@ -103,7 +104,7 @@ func (c *FlatStatCache) AddCachedPutResult(pwd, filename string, fi *os.FileInfo
key := cacheKey(pwd, filename)
val := fileInfoPutRes{*fi, *pr}
vprintf("Adding to stat cache %q: %v", key, val)
cachelog.Printf("Adding to stat cache %q: %v", key, val)
c.dirty[key] = val
c.m[key] = val
@ -113,7 +114,7 @@ func (c *FlatStatCache) Save() {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.dirty) == 0 {
vprintf("FlatStatCache: Save, but nothing dirty")
cachelog.Printf("FlatStatCache: Save, but nothing dirty")
return
}
@ -133,7 +134,7 @@ func (c *FlatStatCache) Save() {
write(v)
}
c.dirty = make(map[string]fileInfoPutRes)
log.Printf("FlatStatCache: saved")
cachelog.Printf("FlatStatCache: saved")
}
type FlatHaveCache struct {
@ -182,7 +183,7 @@ func (c *FlatHaveCache) Save() {
c.mu.Lock()
defer c.mu.Unlock()
if len(c.dirty) == 0 {
vprintf("FlatHaveCache: Save, but nothing dirty")
cachelog.Printf("FlatHaveCache: Save, but nothing dirty")
return
}
@ -201,5 +202,5 @@ func (c *FlatHaveCache) Save() {
write(k)
}
c.dirty = make(map[string]bool)
log.Printf("FlatHaveCache: saved")
cachelog.Printf("FlatHaveCache: saved")
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"log"
)
type Logger interface {
Printf(format string, args ...interface{})
}
type flagLogger struct {
on *bool
}
var vlog = &flagLogger{flagVerbose}
var cachelog = &flagLogger{flagCacheLog}
func (fl *flagLogger) Printf(format string, args ...interface{}) {
if *fl.on {
log.Printf(format, args...)
}
}