From f34b4b35d2ed59d36ac8169fa00de78adf5e25bb Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 26 Feb 2011 14:03:10 -0800 Subject: [PATCH] Spec and start of longpoll enumerate. --- doc/protocol/blob-enumerate-protocol.txt | 21 +++++++++++++++++++++ lib/go/blobserver/handlers/enumerate.go | 22 ++++++++++++++++++++-- lib/go/blobserver/handlers/stat.go | 3 +++ lib/go/blobserver/interface.go | 23 +++++++++++++++-------- lib/go/blobserver/localdisk/localdisk.go | 6 +++--- 5 files changed, 62 insertions(+), 13 deletions(-) diff --git a/doc/protocol/blob-enumerate-protocol.txt b/doc/protocol/blob-enumerate-protocol.txt index c2b3ae7a5..7245fe016 100644 --- a/doc/protocol/blob-enumerate-protocol.txt +++ b/doc/protocol/blob-enumerate-protocol.txt @@ -19,6 +19,23 @@ URL GET parameters: so be sure to pay attention to the presence of a "continueAfter" key in the JSON response. + maxwaitsec optional The client may send this, an integer max + number of seconds the client is willing to + wait for the arrival of blobs. If the + server supports long-polling (an optional + feature), then the server will return + immediately if any blobs or available, else + it will wait for this number of seconds. + This option isn't supported with 'after'. + The server's reply must include + "canLongPoll" set to true if the server + supports this feature. Even if the server + supports long polling, the server may cap + 'maxwaitsec' and wait for less time than + requested by the client. + + + Response: HTTP/1.1 200 OK @@ -32,6 +49,7 @@ Content-Type: text/javascript "size": 3}, ], "after": "sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33", + "canLongPoll": true, } Response keys: @@ -44,3 +62,6 @@ Response keys: should be passed to the next request's "after" request parameter. + canLongPoll optional Set to true (type boolean) if the server supports + long polling. If not true, the server ignores + the client's "maxwaitsec" parameter. diff --git a/lib/go/blobserver/handlers/enumerate.go b/lib/go/blobserver/handlers/enumerate.go index 7843a0b9d..032776ee1 100644 --- a/lib/go/blobserver/handlers/enumerate.go +++ b/lib/go/blobserver/handlers/enumerate.go @@ -41,19 +41,33 @@ func CreateEnumerateHandler(storage blobserver.Storage, partition blobserver.Par } } -func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage blobserver.Storage, partition blobserver.Partition) { +func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobEnumerator, partition blobserver.Partition) { limit, err := strconv.Atoui(req.FormValue("limit")) if err != nil || limit > maxEnumerate { limit = maxEnumerate } + waitSeconds := 0 + if waitStr := req.FormValue("maxwaitsec"); waitStr != "" { + waitSeconds, _ = strconv.Atoi(waitStr) + switch { + case waitSeconds < 0: + waitSeconds = 0 + case waitSeconds > 30: + // TODO: don't hard-code 30. push this up into a blobserver interface + // for getting the configuration of the server (ultimately a flag in + // in the binary) + waitSeconds = 30 + } + } + conn.SetHeader("Content-Type", "text/javascript; charset=utf-8") fmt.Fprintf(conn, "{\n \"blobs\": [\n") blobch := make(chan *blobref.SizedBlobRef, 100) resultch := make(chan os.Error, 1) go func() { - resultch <- storage.EnumerateBlobs(blobch, partition, req.FormValue("after"), limit+1) + resultch <- storage.EnumerateBlobs(blobch, partition, req.FormValue("after"), limit+1, waitSeconds) }() after := "" @@ -101,5 +115,9 @@ func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage b if after != "" { fmt.Fprintf(conn, ",\n \"continueAfter\": \"%s\"", after) } + const longPollSupported = true + if longPollSupported { + fmt.Fprintf(conn, ",\n \"canLongPoll\": true") + } fmt.Fprintf(conn, "\n}\n") } diff --git a/lib/go/blobserver/handlers/stat.go b/lib/go/blobserver/handlers/stat.go index d4d94bd1a..174bddfd2 100644 --- a/lib/go/blobserver/handlers/stat.go +++ b/lib/go/blobserver/handlers/stat.go @@ -79,6 +79,9 @@ func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver. case waitSeconds < 0: waitSeconds = 0 case waitSeconds > 30: + // TODO: don't hard-code 30. push this up into a blobserver interface + // for getting the configuration of the server (ultimately a flag in + // in the binary) waitSeconds = 30 } } diff --git a/lib/go/blobserver/interface.go b/lib/go/blobserver/interface.go index 81b879dea..5ca40009a 100644 --- a/lib/go/blobserver/interface.go +++ b/lib/go/blobserver/interface.go @@ -41,15 +41,27 @@ type BlobStatter interface { // waitSeconds is the max time to wait for the blobs to exist // in the given partition, or 0 for no delay. Stat(dest chan *blobref.SizedBlobRef, - partition Partition, - blobs []*blobref.BlobRef, - waitSeconds int) os.Error + partition Partition, + blobs []*blobref.BlobRef, + waitSeconds int) os.Error +} + +type BlobEnumerator interface { + // EnumerateBobs sends at most limit SizedBlobRef into dest, + // sorted, as long as they are lexigraphically greater than + // after (if provided). + EnumerateBlobs(dest chan *blobref.SizedBlobRef, + partition Partition, + after string, + limit uint, + waitSeconds int) os.Error } type Storage interface { blobref.Fetcher BlobReceiver BlobStatter + BlobEnumerator // Remove 0 or more blobs from provided partition, which // should be empty for the default partition. Removal of @@ -57,11 +69,6 @@ type Storage interface { // items existed but failed to be deleted. Remove(partition Partition, blobs []*blobref.BlobRef) os.Error - // EnumerateBobs sends at most limit SizedBlobRef into dest, - // sorted, as long as they are lexigraphically greater than - // after (if provided). - EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition Partition, after string, limit uint) os.Error - // Returns the blob notification bus for a given partition. GetBlobHub(partition Partition) BlobHub } diff --git a/lib/go/blobserver/localdisk/localdisk.go b/lib/go/blobserver/localdisk/localdisk.go index 455bd8f4e..30b0a1fb4 100644 --- a/lib/go/blobserver/localdisk/localdisk.go +++ b/lib/go/blobserver/localdisk/localdisk.go @@ -124,7 +124,7 @@ func readBlobs(opts readBlobRequest) os.Error { defer dir.Close() names, err := dir.Readdirnames(32768) if err != nil { - return &enumerateError{"readdirnames of " + dirFullPath, err} + return &enumerateError{"localdisk: readdirnames of " + dirFullPath, err} } sort.SortStrings(names) for _, name := range names { @@ -135,7 +135,7 @@ func readBlobs(opts readBlobRequest) os.Error { fullPath := dirFullPath + "/" + name fi, err := os.Stat(fullPath) if err != nil { - return &enumerateError{"stat of file " + fullPath, err} + return &enumerateError{"localdisk: stat of file " + fullPath, err} } if fi.IsDirectory() { @@ -181,7 +181,7 @@ func readBlobs(opts readBlobRequest) os.Error { return nil } -func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint) os.Error { +func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error { dirRoot := ds.root if partition != "" { dirRoot += "/partition/" + string(partition) + "/"