diff --git a/pkg/mysqlindexer/mysqlindexer.go b/pkg/mysqlindexer/mysqlindexer.go index 7a3a093e9..58f48d8e0 100644 --- a/pkg/mysqlindexer/mysqlindexer.go +++ b/pkg/mysqlindexer/mysqlindexer.go @@ -18,8 +18,11 @@ package mysqlindexer import ( "database/sql" + "errors" "fmt" + "log" "os" + "strconv" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/index" @@ -90,7 +93,75 @@ func (ms *myIndexStorage) Delete(key string) error { } func (ms *myIndexStorage) Find(key string) index.Iterator { - panic("TODO(bradfitz): implement") + return &iter{ + ms: ms, + low: key, + op: ">=", + } +} + +// iter is a iterator over sorted key/value pairs in rows. +type iter struct { + ms *myIndexStorage + low string + op string // ">=" initially, then ">" + err error // accumulated error, returned at Close + + rows *sql.Rows // if non-nil, the rows we're reading from + + batchSize int // how big our LIMIT query was + seen int // how many rows we've seen this query + + key string + value string +} + +var errClosed = errors.New("mysqlindexer: Iterator already closed") + +func (t *iter) Key() string { return t.key } +func (t *iter) Value() string { return t.value } + +func (t *iter) Close() error { + if t.rows != nil { + t.rows.Close() + } + err := t.err + t.err = errClosed + return err +} + +func (t *iter) Next() bool { + if t.err != nil { + return false + } + if t.rows == nil { + const batchSize = 50 + t.batchSize = batchSize + t.rows, t.err = t.ms.db.Query( + "SELECT k, v FROM rows WHERE k "+t.op+" ? ORDER BY k LIMIT "+strconv.Itoa(batchSize), + t.low) + if t.err != nil { + log.Printf("unexpected query error: %v", t.err) + return false + } + t.seen = 0 + t.op = ">" + } + if !t.rows.Next() { + if t.seen == t.batchSize { + t.rows = nil + return t.Next() + } + return false + } + t.err = t.rows.Scan(&t.key, &t.value) + if t.err != nil { + log.Printf("unexpected Scan error: %v", t.err) + return false + } + t.low = t.key + t.seen++ + return true } func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {