go blobserver: Partition support for enumerate

This commit is contained in:
Brad Fitzpatrick 2011-02-01 20:50:01 -08:00
parent f492b5f396
commit 39f1ebb895
1 changed files with 55 additions and 23 deletions

View File

@ -22,11 +22,14 @@ import (
"http"
"os"
"log"
"regexp"
"sort"
"strconv"
"strings"
)
var validPartitionName = regexp.MustCompile(`^[a-z0-9_]*$`)
const maxEnumerate = 100000
type blobInfo struct {
@ -35,25 +38,35 @@ type blobInfo struct {
os.Error
}
func readBlobs(ch chan *blobInfo, blobPrefix, diskRoot, after string, remain *uint) {
dirFullPath := *flagStorageRoot + "/" + diskRoot
type readBlobRequest struct {
ch chan *blobInfo
after string
remain *uint // limit countdown
dirRoot string
// Not used on initial request, only on recursion
blobPrefix, pathInto string
}
func readBlobs(opts readBlobRequest) {
dirFullPath := opts.dirRoot + "/" + opts.pathInto
dir, err := os.Open(dirFullPath, os.O_RDONLY, 0)
if err != nil {
log.Println("Error opening directory: ", err)
ch <- &blobInfo{Error: err}
opts.ch <- &blobInfo{Error: err}
return
}
defer dir.Close()
names, err := dir.Readdirnames(32768)
if err != nil {
log.Println("Error reading dirnames: ", err)
ch <- &blobInfo{Error: err}
opts.ch <- &blobInfo{Error: err}
return
}
sort.SortStrings(names)
for _, name := range names {
if *remain == 0 {
ch <- &blobInfo{Error: os.ENOSPC}
if *opts.remain == 0 {
opts.ch <- &blobInfo{Error: os.ENOSPC}
return
}
@ -61,64 +74,83 @@ func readBlobs(ch chan *blobInfo, blobPrefix, diskRoot, after string, remain *ui
fi, err := os.Stat(fullPath)
if err != nil {
bi := &blobInfo{Error: err}
ch <- bi
opts.ch <- bi
return
}
if fi.IsDirectory() {
var newBlobPrefix string
if blobPrefix == "" {
if opts.blobPrefix == "" {
newBlobPrefix = name + "-"
} else {
newBlobPrefix = blobPrefix + name
newBlobPrefix = opts.blobPrefix + name
}
if len(after) > 0 {
if len(opts.after) > 0 {
compareLen := len(newBlobPrefix)
if len(after) < compareLen {
compareLen = len(after)
if len(opts.after) < compareLen {
compareLen = len(opts.after)
}
if newBlobPrefix[0:compareLen] < after[0:compareLen] {
if newBlobPrefix[0:compareLen] < opts.after[0:compareLen] {
continue
}
}
readBlobs(ch, newBlobPrefix, diskRoot+"/"+name, after, remain)
ropts := opts
ropts.blobPrefix = newBlobPrefix
ropts.pathInto = opts.pathInto+"/"+name
readBlobs(ropts)
continue
}
if fi.IsRegular() && strings.HasSuffix(name, ".dat") {
blobName := name[0 : len(name)-4]
if blobName <= after {
if blobName <= opts.after {
continue
}
blobRef := blobref.Parse(blobName)
if blobRef != nil {
bi := &blobInfo{BlobRef: blobRef, FileInfo: fi}
ch <- bi
(*remain)--
opts.ch <- bi
(*opts.remain)--
}
continue
}
}
if diskRoot == "" {
ch <- nil
if opts.pathInto == "" {
opts.ch <- nil
}
}
func handleEnumerateBlobs(conn http.ResponseWriter, req *http.Request) {
req.ParseForm()
ch := make(chan *blobInfo, 100)
limit, err := strconv.Atoui(req.FormValue("limit"))
if err != nil || limit > maxEnumerate {
limit = maxEnumerate
}
partition := req.FormValue("partition")
if len(partition) > 50 || !validPartitionName.MatchString(partition) {
conn.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(conn, "Invalid partition.")
return
}
ch := make(chan *blobInfo, 100)
conn.SetHeader("Content-Type", "text/javascript; charset=utf-8")
fmt.Fprintf(conn, "{\n \"blobs\": [\n")
var after string
go readBlobs(ch, "", "", req.FormValue("after"), &limit)
dirRoot := *flagStorageRoot
if dirRoot != "" {
dirRoot += "/partition/" + partition + "/"
}
go readBlobs(readBlobRequest{
ch: ch,
dirRoot: dirRoot,
after: req.FormValue("after"),
remain: &limit,
})
after := ""
needsComma := false
for bi := range ch {
if bi == nil {