From 20864065521fcc8133f9bfdf63f9c15dec09a39d Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Wed, 15 Jan 2014 18:32:02 -0800 Subject: [PATCH] Client fixes and cleanups: * Fix TODO about moving camput file's havecache up a layer into camput itself. So now it's used for all commands (camput permanode, camput attr, etc). Reduces HTTP requests. * Fix camlistore.org/issue/343 -- don't do stats before uploads in cases where it's useless. Adds a field to UploadHandle. * Fix camlistore.org/issue/344: upload the public key whenever uploading a signed blob. Usually this is a no-op because of the have cache. * Use zero (or <=0 rather), not -1, to mean unknown on UploadHandle Size. * More docs on public stuff. --- TODO | 9 ----- cmd/camput/camput.go | 29 +++++++++++++-- cmd/camput/files.go | 13 +------ pkg/client/client.go | 46 ++++++++++++++--------- pkg/client/upload.go | 87 +++++++++++++++++++++++++++++--------------- 5 files changed, 112 insertions(+), 72 deletions(-) diff --git a/TODO b/TODO index 1d92eeb52..de3be1c91 100644 --- a/TODO +++ b/TODO @@ -30,15 +30,6 @@ Offline list: -- bring in the google glog package to third_party and use it in places that want selective logging (e.g. pkg/index/receive.go) --- camput's havecache initialization should move up a layer to - to be global to all commands, not specific to "camput file". - (the statcache can stay in file). otherwise, we have no haveCache - on the pkg/client.*Client and the UploadAndSignBlob call (for - creating permanodes and other claims) can't efficiently upload its - public key if the server doesn't already have it. currently it - has to try to upload it (doing a remote HTTP stat) each time - when using "camput permanode" or "camput attr". - -- use 'uint32' instead of 'int64' for blob sizes everywhere (notably blob.SizedRef). blobs have a max size of 10-32 MB anyway, and the index.Corpus is now using uint32 to save memory. diff --git a/cmd/camput/camput.go b/cmd/camput/camput.go index 61b09f425..e06b35f78 100644 --- a/cmd/camput/camput.go +++ b/cmd/camput/camput.go @@ -25,6 +25,7 @@ import ( "os" "strconv" "strings" + "sync" "camlistore.org/pkg/client" "camlistore.org/pkg/cmdmain" @@ -37,19 +38,27 @@ const buffered = 16 // arbitrary var ( flagProxyLocal = false flagHTTP = flag.Bool("verbose_http", false, "show HTTP request summaries") + flagHaveCache = true ) -var cachedUploader *Uploader // initialized by getUploader +var ( + uploaderOnce sync.Once + uploader *Uploader // initialized by getUploader +) func init() { if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); debug { flag.BoolVar(&flagProxyLocal, "proxy_local", false, "If true, the HTTP_PROXY environment is also used for localhost requests. This can be helpful during debugging.") + flag.BoolVar(&flagHaveCache, "havecache", true, "Use the 'have cache', a cache keeping track of what blobs the remote server should already have from previous uploads.") } cmdmain.ExtraFlagRegistration = func() { client.AddFlags() } cmdmain.PreExit = func() { up := getUploader() + if up.haveCache != nil { + up.haveCache.Close() + } stats := up.Stats() log.Printf("Client stats: %s", stats.String()) log.Printf(" #HTTP reqs: %d", up.transport.Requests()) @@ -57,10 +66,22 @@ func init() { } func getUploader() *Uploader { - if cachedUploader == nil { - cachedUploader = newUploader() + uploaderOnce.Do(initUploader) + return uploader +} + +func initUploader() { + up := newUploader() + if flagHaveCache { + gen, err := up.StorageGeneration() + if err != nil { + log.Printf("WARNING: not using local server inventory cache; failed to retrieve server's storage generation: %v", err) + } else { + up.haveCache = NewKvHaveCache(gen) + up.Client.SetHaveCache(up.haveCache) + } } - return cachedUploader + uploader = up } func handleResult(what string, pr *client.PutResult, err error) error { diff --git a/cmd/camput/files.go b/cmd/camput/files.go index 428b68d04..560969728 100644 --- a/cmd/camput/files.go +++ b/cmd/camput/files.go @@ -58,7 +58,7 @@ type fileCmd struct { argsFromInput bool // Android mode: filenames piped into stdin, one at a time. deleteAfterUpload bool // with fileNodes, deletes the input file once uploaded - havecache, statcache bool + statcache bool // Go into in-memory stats mode only; doesn't actually upload. memstats bool @@ -85,13 +85,11 @@ func init() { if debug, _ := strconv.ParseBool(os.Getenv("CAMLI_DEBUG")); debug { flags.BoolVar(&cmd.statcache, "statcache", true, "Use the stat cache, assuming unchanged files already uploaded in the past are still there. Fast, but potentially dangerous.") - flags.BoolVar(&cmd.havecache, "havecache", true, "Use the 'have cache', a cache keeping track of what blobs the remote server should already have from previous uploads.") flags.BoolVar(&cmd.memstats, "debug-memstats", false, "Enter debug in-memory mode; collecting stats only. Doesn't upload anything.") flags.StringVar(&cmd.histo, "debug-histogram-file", "", "Optional file to create and write the blob size for each file uploaded. For use with GNU R and hist(read.table(\"filename\")$V1). Requires debug-memstats.") flags.BoolVar(&cmd.capCtime, "capctime", false, "For file blobs use file modification time as creation time if it would be bigger (newer) than modification time. For stable filenode creation (you can forge mtime, but can't forge ctime).") flags.BoolVar(&flagUseSQLiteChildCache, "sqlitecache", false, "Use sqlite for the statcache and havecache instead of a flat cache.") } else { - cmd.havecache = true cmd.statcache = true } if android.IsChild() { @@ -219,9 +217,6 @@ func (c *fileCmd) RunCommand(args []string) error { if len(args) == 0 { return cmdmain.UsageError("No files or directories given.") } - if up.haveCache != nil { - defer up.haveCache.Close() - } if up.statCache != nil { defer up.statCache.Close() } @@ -282,7 +277,7 @@ func (c *fileCmd) RunCommand(args []string) error { } func (c *fileCmd) initCaches(up *Uploader) { - if !c.statcache && !c.havecache { + if !c.statcache { return } gen, err := up.StorageGeneration() @@ -293,10 +288,6 @@ func (c *fileCmd) initCaches(up *Uploader) { if c.statcache { up.statCache = NewKvStatCache(gen) } - if c.havecache { - up.haveCache = NewKvHaveCache(gen) - up.Client.SetHaveCache(up.haveCache) - } } // DumpStats creates the destFile and writes a line per received blob, diff --git a/pkg/client/client.go b/pkg/client/client.go index af3320653..bfe0b4b58 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -886,36 +886,43 @@ func (c *Client) signBlob(bb schema.Buildable, sigTime time.Time) (string, error return bb.Builder().SignAt(signer, sigTime) } +// uploadPublicKey uploads the public key (if one is defined), so +// subsequent (likely synchronous) indexing of uploaded signed blobs +// will have access to the public key to verify it. In the normal +// case, the stat cache prevents this from doing anything anyway. +func (c *Client) uploadPublicKey() error { + sigRef := c.SignerPublicKeyBlobref() + if !sigRef.Valid() { + return nil + } + var err error + if _, keyUploaded := c.haveCache.StatBlobCache(sigRef); !keyUploaded { + _, err = c.uploadString(c.publicKeyArmored, false) + } + return err +} + func (c *Client) UploadAndSignBlob(b schema.AnyBlob) (*PutResult, error) { signed, err := c.signBlob(b.Blob(), time.Time{}) if err != nil { return nil, err } - - // sigRef is guaranteed valid at this point, because SignBlob - // succeeded. If we don't know for sure that the server - // already has this public key, upload it. And do it serially - // so by the time we do the second upload of the signed blob, - // any synchronous indexing on the server won't fail due to a - // missing public key. - sigRef := c.SignerPublicKeyBlobref() - if _, keyUploaded := c.haveCache.StatBlobCache(sigRef); !keyUploaded { - if _, err := c.uploadString(c.publicKeyArmored); err != nil { - return nil, err - } + if err := c.uploadPublicKey(); err != nil { + return nil, err } - - return c.uploadString(signed) + return c.uploadString(signed, false) } func (c *Client) UploadBlob(b schema.AnyBlob) (*PutResult, error) { // TODO(bradfitz): ask the blob for its own blobref, rather // than changing the hash function with uploadString? - return c.uploadString(b.Blob().JSON()) + return c.uploadString(b.Blob().JSON(), true) } -func (c *Client) uploadString(s string) (*PutResult, error) { - return c.Upload(NewUploadHandleFromString(s)) +func (c *Client) uploadString(s string, stat bool) (*PutResult, error) { + uh := NewUploadHandleFromString(s) + uh.SkipStat = !stat + return c.Upload(uh) } func (c *Client) UploadNewPermanode() (*PutResult, error) { @@ -929,7 +936,10 @@ func (c *Client) UploadPlannedPermanode(key string, sigTime time.Time) (*PutResu if err != nil { return nil, err } - return c.uploadString(signed) + if err := c.uploadPublicKey(); err != nil { + return nil, err + } + return c.uploadString(signed, true) } // IsIgnoredFile returns whether the file at fullpath should be ignored by camput. diff --git a/pkg/client/upload.go b/pkg/client/upload.go index b8c976589..65e1cca6c 100644 --- a/pkg/client/upload.go +++ b/pkg/client/upload.go @@ -41,11 +41,31 @@ var debugUploads = os.Getenv("CAMLI_DEBUG_UPLOADS") != "" // Writer adds around content var multipartOverhead = calculateMultipartOverhead() +// UploadHandle contains the parameters is a request to upload a blob. type UploadHandle struct { - BlobRef blob.Ref - Size int64 // or -1 if size isn't known + // BlobRef is the required blobref of the blob to upload. + BlobRef blob.Ref + + // Contents is the blob data. Contents io.Reader - Vivify bool + + // Size optionally specifies the size of Contents. + // If <= 0, the Contents are slurped into memory to count the size. + Size int64 + + // Vivify optionally instructs the server to create a + // permanode for this blob. If used, the blob should be a + // "file" schema blob. This is typically used by + // lesser-trusted clients (such a mobile phones) which don't + // have rights to do signing directly. + Vivify bool + + // SkipStat indicates whether the stat check (checking whether + // the server already has the blob) will be skipped and the + // blob should be uploaded immediately. This is useful for + // small blobs that the server is unlikely to already have + // (e.g. new claims). + SkipStat bool } type PutResult struct { @@ -102,6 +122,7 @@ func parseStatResponse(r io.Reader) (*statResponse, error) { return s, nil } +// NewUploadHandleFromString returns an upload handle func NewUploadHandleFromString(data string) *UploadHandle { bref := blob.SHA1FromString(data) r := strings.NewReader(data) @@ -299,9 +320,8 @@ func (c *Client) doStat(dest chan<- blob.SizedRef, blobs []blob.Ref, wait time.D // Figure out the size of the contents. // If the size was provided, trust it. -// If the size was not provided (-1), slurp. -func readerAndSize(h *UploadHandle) (io.Reader, int64, error) { - if h.Size != -1 { +func (h *UploadHandle) readerAndSize() (io.Reader, int64, error) { + if h.Size > 0 { return h.Contents, h.Size, nil } var b bytes.Buffer @@ -312,6 +332,7 @@ func readerAndSize(h *UploadHandle) (io.Reader, int64, error) { return &b, n, nil } +// Upload uploads a blob, as described by the provided UploadHandle parameters. func (c *Client) Upload(h *UploadHandle) (*PutResult, error) { errorf := func(msg string, arg ...interface{}) (*PutResult, error) { err := fmt.Errorf(msg, arg...) @@ -319,7 +340,7 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, error) { return nil, err } - bodyReader, bodySize, err := readerAndSize(h) + bodyReader, bodySize, err := h.readerAndSize() if err != nil { return nil, fmt.Errorf("client: error slurping upload handle to find its length: %v", err) } @@ -345,30 +366,35 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, error) { if err != nil { return nil, err } - url_ := fmt.Sprintf("%s/camli/stat", pfx) - req := c.newRequest("POST", url_, strings.NewReader("camliversion=1&blob1="+blobrefStr)) - req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - resp, err := c.doReqGated(req) - if err != nil { - return errorf("stat http error: %v", err) - } - defer resp.Body.Close() + if !h.SkipStat { + url_ := fmt.Sprintf("%s/camli/stat", pfx) + req := c.newRequest("POST", url_, strings.NewReader("camliversion=1&blob1="+blobrefStr)) + req.Header.Add("Content-Type", "application/x-www-form-urlencoded") - if resp.StatusCode != 200 { - return errorf("stat response had http status %d", resp.StatusCode) - } + resp, err := c.doReqGated(req) + if err != nil { + return errorf("stat http error: %v", err) + } + defer resp.Body.Close() - stat, err := parseStatResponse(resp.Body) - resp.Body.Close() - if err != nil { - return nil, err - } - for _, sbr := range stat.HaveMap { - c.haveCache.NoteBlobExists(sbr.Ref, sbr.Size) - } - if !h.Vivify { - if _, ok := stat.HaveMap[blobrefStr]; ok { + if resp.StatusCode != 200 { + return errorf("stat response had http status %d", resp.StatusCode) + } + + stat, err := parseStatResponse(resp.Body) + resp.Body.Close() + if err != nil { + return nil, err + } + for _, sbr := range stat.HaveMap { + c.haveCache.NoteBlobExists(sbr.Ref, sbr.Size) + } + _, serverHasIt := stat.HaveMap[blobrefStr] + if debugUploads { + log.Printf("HTTP Stat(%s) = %v", blobrefStr, serverHasIt) + } + if !h.Vivify && serverHasIt { pr.Skipped = true if closer, ok := h.Contents.(io.Closer); ok { // TODO(bradfitz): I did this @@ -377,6 +403,7 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, error) { // fix the docs. closer.Close() } + c.haveCache.NoteBlobExists(h.BlobRef, bodySize) return pr, nil } } @@ -407,14 +434,14 @@ func (c *Client) Upload(h *UploadHandle) (*PutResult, error) { // c.log.Printf("Uploading %s", br) uploadURL := fmt.Sprintf("%s/camli/upload", pfx) - req = c.newRequest("POST", uploadURL) + req := c.newRequest("POST", uploadURL) req.Header.Set("Content-Type", multipartWriter.FormDataContentType()) if h.Vivify { req.Header.Add("X-Camlistore-Vivify", "1") } req.Body = ioutil.NopCloser(pipeReader) req.ContentLength = multipartOverhead + bodySize + int64(len(blobrefStr))*2 - resp, err = c.doReqGated(req) + resp, err := c.doReqGated(req) if err != nil { return errorf("upload http error: %v", err) }