Spec and start of longpoll enumerate.

This commit is contained in:
Brad Fitzpatrick 2011-02-26 14:03:10 -08:00
parent 88c1992692
commit f34b4b35d2
5 changed files with 62 additions and 13 deletions

View File

@ -19,6 +19,23 @@ URL GET parameters:
so be sure to pay attention to the presence
of a "continueAfter" key in the JSON response.
maxwaitsec optional The client may send this, an integer max
number of seconds the client is willing to
wait for the arrival of blobs. If the
server supports long-polling (an optional
feature), then the server will return
immediately if any blobs or available, else
it will wait for this number of seconds.
This option isn't supported with 'after'.
The server's reply must include
"canLongPoll" set to true if the server
supports this feature. Even if the server
supports long polling, the server may cap
'maxwaitsec' and wait for less time than
requested by the client.
Response:
HTTP/1.1 200 OK
@ -32,6 +49,7 @@ Content-Type: text/javascript
"size": 3},
],
"after": "sha1-0beec7b5ea3f0fdbc95d0dd47f3c5bc275da8a33",
"canLongPoll": true,
}
Response keys:
@ -44,3 +62,6 @@ Response keys:
should be passed to the next request's "after"
request parameter.
canLongPoll optional Set to true (type boolean) if the server supports
long polling. If not true, the server ignores
the client's "maxwaitsec" parameter.

View File

@ -41,19 +41,33 @@ func CreateEnumerateHandler(storage blobserver.Storage, partition blobserver.Par
}
}
func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage blobserver.Storage, partition blobserver.Partition) {
func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage blobserver.BlobEnumerator, partition blobserver.Partition) {
limit, err := strconv.Atoui(req.FormValue("limit"))
if err != nil || limit > maxEnumerate {
limit = maxEnumerate
}
waitSeconds := 0
if waitStr := req.FormValue("maxwaitsec"); waitStr != "" {
waitSeconds, _ = strconv.Atoi(waitStr)
switch {
case waitSeconds < 0:
waitSeconds = 0
case waitSeconds > 30:
// TODO: don't hard-code 30. push this up into a blobserver interface
// for getting the configuration of the server (ultimately a flag in
// in the binary)
waitSeconds = 30
}
}
conn.SetHeader("Content-Type", "text/javascript; charset=utf-8")
fmt.Fprintf(conn, "{\n \"blobs\": [\n")
blobch := make(chan *blobref.SizedBlobRef, 100)
resultch := make(chan os.Error, 1)
go func() {
resultch <- storage.EnumerateBlobs(blobch, partition, req.FormValue("after"), limit+1)
resultch <- storage.EnumerateBlobs(blobch, partition, req.FormValue("after"), limit+1, waitSeconds)
}()
after := ""
@ -101,5 +115,9 @@ func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request, storage b
if after != "" {
fmt.Fprintf(conn, ",\n \"continueAfter\": \"%s\"", after)
}
const longPollSupported = true
if longPollSupported {
fmt.Fprintf(conn, ",\n \"canLongPoll\": true")
}
fmt.Fprintf(conn, "\n}\n")
}

View File

@ -79,6 +79,9 @@ func handleStat(conn http.ResponseWriter, req *http.Request, storage blobserver.
case waitSeconds < 0:
waitSeconds = 0
case waitSeconds > 30:
// TODO: don't hard-code 30. push this up into a blobserver interface
// for getting the configuration of the server (ultimately a flag in
// in the binary)
waitSeconds = 30
}
}

View File

@ -41,15 +41,27 @@ type BlobStatter interface {
// waitSeconds is the max time to wait for the blobs to exist
// in the given partition, or 0 for no delay.
Stat(dest chan *blobref.SizedBlobRef,
partition Partition,
blobs []*blobref.BlobRef,
waitSeconds int) os.Error
partition Partition,
blobs []*blobref.BlobRef,
waitSeconds int) os.Error
}
type BlobEnumerator interface {
// EnumerateBobs sends at most limit SizedBlobRef into dest,
// sorted, as long as they are lexigraphically greater than
// after (if provided).
EnumerateBlobs(dest chan *blobref.SizedBlobRef,
partition Partition,
after string,
limit uint,
waitSeconds int) os.Error
}
type Storage interface {
blobref.Fetcher
BlobReceiver
BlobStatter
BlobEnumerator
// Remove 0 or more blobs from provided partition, which
// should be empty for the default partition. Removal of
@ -57,11 +69,6 @@ type Storage interface {
// items existed but failed to be deleted.
Remove(partition Partition, blobs []*blobref.BlobRef) os.Error
// EnumerateBobs sends at most limit SizedBlobRef into dest,
// sorted, as long as they are lexigraphically greater than
// after (if provided).
EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition Partition, after string, limit uint) os.Error
// Returns the blob notification bus for a given partition.
GetBlobHub(partition Partition) BlobHub
}

View File

@ -124,7 +124,7 @@ func readBlobs(opts readBlobRequest) os.Error {
defer dir.Close()
names, err := dir.Readdirnames(32768)
if err != nil {
return &enumerateError{"readdirnames of " + dirFullPath, err}
return &enumerateError{"localdisk: readdirnames of " + dirFullPath, err}
}
sort.SortStrings(names)
for _, name := range names {
@ -135,7 +135,7 @@ func readBlobs(opts readBlobRequest) os.Error {
fullPath := dirFullPath + "/" + name
fi, err := os.Stat(fullPath)
if err != nil {
return &enumerateError{"stat of file " + fullPath, err}
return &enumerateError{"localdisk: stat of file " + fullPath, err}
}
if fi.IsDirectory() {
@ -181,7 +181,7 @@ func readBlobs(opts readBlobRequest) os.Error {
return nil
}
func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint) os.Error {
func (ds *diskStorage) EnumerateBlobs(dest chan *blobref.SizedBlobRef, partition blobserver.Partition, after string, limit uint, waitSeconds int) os.Error {
dirRoot := ds.root
if partition != "" {
dirRoot += "/partition/" + string(partition) + "/"