make pkg/mysqlindexer and camdbinit compile again, even if not working yet

Change-Id: I3fbdfda6c456c079a3ac58ea988e43301e957f11
This commit is contained in:
Brad Fitzpatrick 2012-03-24 19:45:23 -07:00
parent 590c168ecf
commit ef7e3c2e90
8 changed files with 76 additions and 1129 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package main
import (
"database/sql"
"flag"
"fmt"
"os"
@ -24,16 +25,18 @@ import (
"camlistore.org/pkg/mysqlindexer"
mysql "camli/third_party/github.com/Philio/GoMySQL"
_ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv"
)
var flagUser = flag.String("user", "root", "MySQL admin user")
var flagPassword = flag.String("password", "(prompt)", "MySQL admin password")
var flagHost = flag.String("host", "localhost", "MySQ host[:port]")
var flagDatabase = flag.String("database", "", "MySQL camlistore to wipe/create database")
var (
flagUser = flag.String("user", "root", "MySQL admin user")
flagPassword = flag.String("password", "(prompt)", "MySQL admin password")
flagHost = flag.String("host", "localhost", "MySQ host[:port]")
flagDatabase = flag.String("database", "", "MySQL camlistore to wipe/create database")
var flagWipe = flag.Bool("wipe", false, "Wipe the database and re-create it?")
var flagIgnore = flag.Bool("ignoreexists", false, "Treat existence of the database as okay and exit.")
flagWipe = flag.Bool("wipe", false, "Wipe the database and re-create it?")
flagIgnore = flag.Bool("ignoreexists", false, "Treat existence of the database as okay and exit.")
)
func main() {
flag.Parse()
@ -41,7 +44,7 @@ func main() {
exitf("--database flag required")
}
db, err := mysql.DialTCP(*flagHost, *flagUser, *flagPassword, "")
db, err := sql.Open("mymysql", *flagDatabase + "/" + *flagUser + "/" + *flagPassword)
if err != nil {
exitf("Error connecting to database: %v", err)
}
@ -66,25 +69,22 @@ func main() {
do(db, fmt.Sprintf(`REPLACE INTO meta VALUES ('version', '%d')`, mysqlindexer.SchemaVersion()))
}
func do(db *mysql.Client, sql string) {
err := db.Query(sql)
func do(db *sql.DB, sql string) {
_, err := db.Exec(sql)
if err == nil {
return
}
exitf("Error %v running SQL: %s", err, sql)
}
func dbExists(db *mysql.Client, dbname string) bool {
check(db.Query("SHOW DATABASES"))
result, err := db.UseResult()
func dbExists(db *sql.DB, dbname string) bool {
rows, err := db.Query("SHOW DATABASES")
check(err)
defer result.Free()
for {
row := result.FetchRow()
if row == nil {
break
}
if row[0].(string) == dbname {
defer rows.Close()
for rows.Next() {
var db string
check(rows.Scan(&db))
if db == dbname {
return true
}
}

View File

@ -18,7 +18,7 @@ package mysqlindexer
import ()
const requiredSchemaVersion = 18
const requiredSchemaVersion = 19
func SchemaVersion() int {
return requiredSchemaVersion
@ -26,106 +26,13 @@ func SchemaVersion() int {
func SQLCreateTables() []string {
return []string{
`CREATE TABLE blobs (
blobref VARCHAR(128) NOT NULL PRIMARY KEY,
size INTEGER NOT NULL,
type VARCHAR(100))`,
`CREATE TABLE claims (
blobref VARCHAR(128) NOT NULL PRIMARY KEY,
signer VARCHAR(128) NOT NULL,
verifiedkeyid VARCHAR(128) NULL,
date VARCHAR(40) NOT NULL,
INDEX (signer, date),
INDEX (verifiedkeyid, date),
unverified CHAR(1) NULL,
claim VARCHAR(50) NOT NULL,
permanode VARCHAR(128) NOT NULL,
INDEX (permanode, signer, date),
attr VARCHAR(128) NULL,
value VARCHAR(128) NULL)`,
`CREATE TABLE permanodes (
blobref VARCHAR(128) NOT NULL PRIMARY KEY,
unverified CHAR(1) NULL,
signer VARCHAR(128) NOT NULL DEFAULT '',
lastmod VARCHAR(40) NOT NULL DEFAULT '',
INDEX (signer, lastmod))`,
`CREATE TABLE bytesfiles (
schemaref VARCHAR(128) NOT NULL,
camlitype VARCHAR(32) NOT NULL,
wholedigest VARCHAR(128) NOT NULL,
filename VARCHAR(255),
size BIGINT,
mime VARCHAR(255),
PRIMARY KEY(schemaref, wholedigest),
INDEX (wholedigest))`,
// For index.PermanodeOfSignerAttrValue:
// Rows are one per camliType "claim", for claimType "set-attribute" or "add-attribute",
// for attribute values that are known (needed to be indexed, e.g. "camliNamedRoot")
//
// keyid is verified GPG KeyId (e.g. "2931A67C26F5ABDA")
// attr is e.g. "camliNamedRoot"
// value is the claim's "value" field
// claimdate is the "claimDate" field.
// blobref is the blobref of the claim.
// permanode is the claim's "permaNode" field.
`CREATE TABLE signerattrvalue (
keyid VARCHAR(40) NOT NULL,
attr VARCHAR(128) NOT NULL,
value VARCHAR(255) NOT NULL,
claimdate VARCHAR(40) NOT NULL,
INDEX (keyid, attr, value, claimdate),
blobref VARCHAR(128) NOT NULL,
PRIMARY KEY (blobref),
permanode VARCHAR(128) NOT NULL,
INDEX (permanode))`,
// "Shadow" copy of signerattrvalue for fulltext searches.
// Kept in sync witch signerattrvalue directly in the go code for now, not with triggers.
// As of MySQL 5.5, fulltext search is still only available with MyISAM tables
// (see http://dev.mysql.com/doc/refman/5.5/en/fulltext-search.html)
`CREATE TABLE signerattrvalueft (
keyid VARCHAR(40) NOT NULL,
attr VARCHAR(128) NOT NULL,
value VARCHAR(255) NOT NULL,
claimdate VARCHAR(40) NOT NULL,
INDEX (keyid, attr, value, claimdate),
blobref VARCHAR(128) NOT NULL,
PRIMARY KEY (blobref),
permanode VARCHAR(128) NOT NULL,
INDEX (permanode),
FULLTEXT (value)) ENGINE=MyISAM`,
`CREATE TABLE rows (
k VARCHAR(255) NOT NULL PRIMARY KEY,
v VARCHAR(255))`,
`CREATE TABLE meta (
metakey VARCHAR(255) NOT NULL PRIMARY KEY,
value VARCHAR(255) NOT NULL)`,
// Map from blobref (of ASCII armored public key) to keyid
`CREATE TABLE signerkeyid (
blobref VARCHAR(128) NOT NULL,
PRIMARY KEY (blobref),
keyid VARCHAR(128) NOT NULL,
INDEX (keyid)
)`,
// Bi-direction index of camliPath claims
// active is "Y" or "N".
`CREATE TABLE path (
claimref VARCHAR(128) NOT NULL,
PRIMARY KEY (claimref),
claimdate VARCHAR(40) NOT NULL,
keyid VARCHAR(128) NOT NULL,
baseref VARCHAR(128) NOT NULL,
suffix VARCHAR(255) NOT NULL,
targetref VARCHAR(128) NOT NULL,
active CHAR(1) NOT NULL,
INDEX (keyid, baseref, suffix),
INDEX (targetref, keyid),
INDEX (baseref, keyid)
)`,
metakey VARCHAR(255) NOT NULL PRIMARY KEY,
value VARCHAR(255) NOT NULL)`,
}
}

View File

@ -1,51 +0,0 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
nYou may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysqlindexer
import "camlistore/pkg/blobref"
func (mi *Indexer) EnumerateBlobs(dest chan<- blobref.SizedBlobRef, after string, limit uint, waitSeconds int) error {
defer close(dest)
rs, err := mi.db.Query("SELECT blobref, size FROM blobs WHERE blobref > ? ORDER BY blobref LIMIT ?",
after, limit)
if err != nil {
return err
}
defer rs.Close()
return readBlobRefSizeResults(dest, rs)
}
func readBlobRefSizeResults(dest chan<- blobref.SizedBlobRef, rs ResultSet) error {
var (
blobstr string
size int64
)
for rs.Next() {
if err := rs.Scan(&blobstr, &size); err != nil {
return err
}
br := blobref.Parse(blobstr)
if br == nil {
continue
}
dest <- blobref.SizedBlobRef{
BlobRef: br,
Size: size,
}
}
return nil
}

View File

@ -17,60 +17,68 @@ limitations under the License.
package mysqlindexer
import (
"database/sql"
"errors"
"fmt"
"io"
"os"
"strconv"
"camlistore.org/pkg/blobref"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/index"
"camlistore.org/pkg/jsonconfig"
_ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv"
)
type Indexer struct {
*blobserver.SimpleBlobHubPartitionMap
type myIndexStorage struct {
host, user, password, database string
KeyFetcher blobref.StreamingFetcher // for verifying claims
db *sql.DB
}
// Used for fetching blobs to find the complete sha1s of file & bytes
// schema blobs.
BlobSource blobserver.Storage
var _ index.IndexStorage = (*myIndexStorage)(nil)
db *MySQLWrapper
func (ms *myIndexStorage) BeginBatch() index.BatchMutation {
// TODO
return nil
}
func (ms *myIndexStorage) CommitBatch(b index.BatchMutation) error {
return errors.New("TODO(bradfitz): implement")
}
func (ms *myIndexStorage) Get(key string) (value string, err error) {
panic("TODO(bradfitz): implement")
}
func (ms *myIndexStorage) Set(key, value string) error {
return errors.New("TODO(bradfitz): implement")
}
func (ms *myIndexStorage) Delete(key string) error {
return errors.New("TODO(bradfitz): implement")
}
func (ms *myIndexStorage) Find(key string) index.Iterator {
panic("TODO(bradfitz): implement")
}
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) {
blobPrefix := config.RequiredString("blobSource")
db := &MySQLWrapper{
Host: config.OptionalString("host", "localhost"),
User: config.RequiredString("user"),
Password: config.OptionalString("password", ""),
Database: config.RequiredString("database"),
}
indexer := &Indexer{
SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{},
db: db,
is := &myIndexStorage{
host: config.OptionalString("host", "localhost"),
user: config.RequiredString("user"),
password: config.OptionalString("password", ""),
database: config.RequiredString("database"),
}
if err := config.Validate(); err != nil {
return nil, err
}
sto, err := ld.GetStorage(blobPrefix)
if err != nil {
return nil, err
}
indexer.BlobSource = sto
// TODO: connect; set is.db
// TODO: ping it; see if it's alive; else return err
// Good enough, for now:
indexer.KeyFetcher = indexer.BlobSource
indexer := index.New(is)
ok, err := indexer.IsAlive()
if !ok {
return nil, fmt.Errorf("Failed to connect to MySQL: %v", err)
}
version, err := indexer.SchemaVersion()
version, err := is.SchemaVersion()
if err != nil {
return nil, fmt.Errorf("error getting schema version (need to init database?): %v", err)
}
@ -91,36 +99,13 @@ func init() {
blobserver.RegisterStorageConstructor("mysqlindexer", blobserver.StorageConstructor(newFromConfig))
}
func (mi *Indexer) IsAlive() (ok bool, err error) {
err = mi.db.Ping()
ok = err == nil
func (mi *myIndexStorage) IsAlive() (ok bool, err error) {
// TODO(bradfitz): something more efficient here?
_, err = mi.SchemaVersion()
return err == nil, err
}
func (mi *myIndexStorage) SchemaVersion() (version int, err error) {
err = mi.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version)
return
}
func (mi *Indexer) SchemaVersion() (version int, err error) {
rs, err := mi.db.Query("SELECT value FROM meta WHERE metakey='version'")
if err != nil {
return
}
defer rs.Close()
if !rs.Next() {
return 0, nil
}
strVersion := ""
if err = rs.Scan(&strVersion); err != nil {
return
}
return strconv.Atoi(strVersion)
}
func (mi *Indexer) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int64, error) {
return nil, 0, errors.New("Fetch isn't supported by the MySQL indexer")
}
func (mi *Indexer) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, error) {
return nil, 0, errors.New("Fetch isn't supported by the MySQL indexer")
}
func (mi *Indexer) RemoveBlobs(blobs []*blobref.BlobRef) error {
return errors.New("RemoveBlobs isn't supported by the MySQL indexer")
}

View File

@ -1,268 +0,0 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysqlindexer
import (
"errors"
"fmt"
"log"
"strconv"
"strings"
"sync"
mysql "camli/third_party/github.com/camlistore/GoMySQL"
)
var _ = log.Printf
type MySQLWrapper struct {
// Host may optionally end in ":port".
Host, User, Password, Database string
clientLock sync.Mutex
cachedClients []*mysql.Client
}
// ResultSet is a cursor. It starts before the first row.
type ResultSet interface {
// Move cursor to next row, returning true if there's a next
// row.
Next() bool
Scan(ptrs ...interface{}) error
Close() error
}
type emptyResultSet struct{}
func (emptyResultSet) Next() bool { return false }
func (emptyResultSet) Scan(ptrs ...interface{}) error { return errors.New("bogus") }
func (emptyResultSet) Close() error { return nil }
type myRes struct {
mw *MySQLWrapper
c *mysql.Client
sql string
s *mysql.Statement
res *mysql.Result
row mysql.Row // type Row []interface{} (or nil on EOF)
closed bool
}
func (r *myRes) Next() bool {
r.row = r.res.FetchRow()
return r.row != nil
}
func scanAssign(idx int, field *mysql.Field, fromi, destPtr interface{}) error {
switch v := fromi.(type) {
case string:
if strPtr, ok := destPtr.(*string); ok {
*strPtr = v
return nil
}
case int64:
if p, ok := destPtr.(*int64); ok {
*p = v
return nil
}
}
return fmt.Errorf("Scan index %d: invalid conversion from %T -> %T", idx, fromi, destPtr)
}
func decodeColumn(idx int, field *mysql.Field, val interface{}) (interface{}, error) {
var dec interface{}
var err error
switch v := val.(type) {
case int64:
return v, nil
case []byte:
switch field.Type {
case mysql.FIELD_TYPE_TINY, mysql.FIELD_TYPE_SHORT, mysql.FIELD_TYPE_YEAR, mysql.FIELD_TYPE_INT24, mysql.FIELD_TYPE_LONG, mysql.FIELD_TYPE_LONGLONG:
if field.Flags&mysql.FLAG_UNSIGNED != 0 {
dec, err = strconv.ParseUint(string(v), 10, 64)
} else {
dec, err = strconv.ParseInt(string(v), 10, 64)
}
if err != nil {
return nil, fmt.Errorf("mysql: strconv.Atoi64 error on field %d: %v", idx, err)
}
case mysql.FIELD_TYPE_FLOAT, mysql.FIELD_TYPE_DOUBLE:
dec, err = strconv.ParseFloat(string(v), 64)
if err != nil {
return nil, fmt.Errorf("mysql: strconv.Atof64 error on field %d: %v", idx, err)
}
case mysql.FIELD_TYPE_DECIMAL, mysql.FIELD_TYPE_NEWDECIMAL, mysql.FIELD_TYPE_VARCHAR, mysql.FIELD_TYPE_VAR_STRING, mysql.FIELD_TYPE_STRING:
dec = string(v)
default:
return nil, fmt.Errorf("row[%d] was a []byte but unexpected field type %d", idx, field.Type)
}
return dec, nil
}
return nil, fmt.Errorf("expected row[%d] contents to be a []byte, got %T for field type %d", idx, val, field.Type)
}
func (r *myRes) Scan(ptrs ...interface{}) (outerr error) {
defer func() {
if outerr != nil {
log.Printf("Scan error on %q: %v", r.sql, outerr)
}
}()
if r.row == nil {
return errors.New("mysql: Scan called but cursor isn't on a valid row. (Next must return true before calling Scan)")
}
if uint64(len(ptrs)) != r.res.FieldCount() {
return fmt.Errorf("mysql: result set has %d fields doesn't match %d arguments to Scan",
r.res.FieldCount(), len(ptrs))
}
if len(r.row) != len(ptrs) {
panic(fmt.Sprintf("GoMySQL library is confused. row size is %d, expect %d", len(r.row), len(ptrs)))
}
fields := r.res.FetchFields() // just an accessor, doesn't fetch anything
for i, ptr := range ptrs {
field := fields[i]
dec, err := decodeColumn(i, field, r.row[i])
if err != nil {
return err
}
if err := scanAssign(i, field, dec, ptr); err != nil {
return err
}
}
return nil
}
func (r *myRes) Close() error {
if r.closed {
return errors.New("mysqlwrapper: ResultSet already closed")
}
r.closed = true
if err := r.s.Close(); err != nil {
return err
}
if r.res != nil {
r.res.Free()
}
r.mw.releaseConnection(r.c)
r.c = nil
r.s = nil
r.res = nil
return nil
}
func (mw *MySQLWrapper) Execute(sql string, params ...interface{}) error {
rs, err := mw.Query(sql, params...)
if rs != nil {
rs.Close()
}
return err
}
func (mw *MySQLWrapper) Query(sql string, params ...interface{}) (ResultSet, error) {
c, err := mw.getConnection()
if err != nil {
return nil, err
}
s, err := c.Prepare(sql)
if err != nil {
c.Close() // defensive. TODO: figure out when safe not to.
return nil, err
}
if len(params) > 0 {
for i, pv := range params {
// TODO: check that they're all supported.
// fallback: if a Stringer, use that.
_ = i
_ = pv
}
if err := s.BindParams(params...); err != nil {
if strings.Contains(err.Error(), "Invalid parameter number") {
println("Invalid parameters for query: ", sql)
}
c.Close()
return nil, err
}
}
if err := s.Execute(); err != nil {
c.Close() // defensive. TODO: figure out when safe not to.
return nil, err
}
res, err := s.UseResult()
if err != nil {
if ce, ok := err.(*mysql.ClientError); ok && ce.Errno == mysql.CR_NO_RESULT_SET {
mw.releaseConnection(c)
return emptyResultSet{}, nil
}
c.Close() // defensive. TODO: figure out when safe not to.
return nil, err
}
return &myRes{mw: mw, c: c, s: s, sql: sql, res: res}, nil
}
func testClient(client *mysql.Client) error {
err := client.Query("SELECT 1 + 1")
if err != nil {
return err
}
_, err = client.UseResult()
if err != nil {
return err
}
client.FreeResult()
return nil
}
func (mw *MySQLWrapper) Ping() error {
client, err := mw.getConnection()
if err != nil {
return err
}
defer mw.releaseConnection(client)
return testClient(client)
}
// Get a free cached connection or allocate a new one.
func (mw *MySQLWrapper) getConnection() (client *mysql.Client, err error) {
mw.clientLock.Lock()
if len(mw.cachedClients) > 0 {
defer mw.clientLock.Unlock()
client = mw.cachedClients[len(mw.cachedClients)-1]
mw.cachedClients = mw.cachedClients[:len(mw.cachedClients)-1]
// TODO: Outside the mutex, double check that the client is still good.
return
}
mw.clientLock.Unlock()
client, err = mysql.DialTCP(mw.Host, mw.User, mw.Password, mw.Database)
return
}
// Release a client to the cached client pool.
func (mw *MySQLWrapper) releaseConnection(client *mysql.Client) {
// Test the client before returning it.
// TODO: this is overkill probably.
if err := testClient(client); err != nil {
return
}
mw.clientLock.Lock()
defer mw.clientLock.Unlock()
mw.cachedClients = append(mw.cachedClients, client)
}

View File

@ -1,209 +0,0 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
nYou may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysqlindexer
import (
"crypto/sha1"
"io"
"log"
"strings"
"camlistore.org/pkg/blobref"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/index"
"camlistore.org/pkg/jsonsign"
"camlistore.org/pkg/magic"
"camlistore.org/pkg/schema"
"camlistore.org/pkg/search"
)
func (mi *Indexer) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader) (retsb blobref.SizedBlobRef, err error) {
sniffer := new(index.BlobSniffer)
hash := blobRef.Hash()
var written int64
written, err = io.Copy(io.MultiWriter(hash, sniffer), source)
log.Printf("mysqlindexer: hashed+sniffed %d bytes; err %v", written, err)
if err != nil {
return
}
if !blobRef.HashMatches(hash) {
err = blobserver.ErrCorruptBlob
return
}
sniffer.Parse()
mimeType := sniffer.MimeType()
log.Printf("mysqlindexer: type=%v; truncated=%v", mimeType, sniffer.IsTruncated())
if camli, ok := sniffer.Superset(); ok {
switch camli.Type {
case "claim":
if err = mi.populateClaim(blobRef, camli, sniffer); err != nil {
return
}
case "permanode":
if err = mi.populatePermanode(blobRef, camli); err != nil {
return
}
case "file":
if err = mi.populateFile(blobRef, camli); err != nil {
return
}
}
}
if err = mi.db.Execute("INSERT IGNORE INTO blobs (blobref, size, type) VALUES (?, ?, ?)",
blobRef.String(), written, mimeType); err != nil {
log.Printf("mysqlindexer: insert into blobs: %v", err)
return
}
retsb = blobref.SizedBlobRef{BlobRef: blobRef, Size: written}
return
}
func (mi *Indexer) populateClaim(blobRef *blobref.BlobRef, camli *schema.Superset, sniffer *index.BlobSniffer) (err error) {
pnBlobref := blobref.Parse(camli.Permanode)
if pnBlobref == nil {
// Skip bogus claim with malformed permanode.
return
}
verifiedKeyId := ""
if rawJson, err := sniffer.Body(); err == nil {
vr := jsonsign.NewVerificationRequest(string(rawJson), mi.KeyFetcher)
if vr.Verify() {
verifiedKeyId = vr.SignerKeyId
log.Printf("mysqlindex: verified claim %s from %s", blobRef, verifiedKeyId)
if err = mi.db.Execute("INSERT IGNORE INTO signerkeyid (blobref, keyid) "+
"VALUES (?, ?)", vr.CamliSigner.String(), verifiedKeyId); err != nil {
return
}
} else {
log.Printf("mysqlindex: verification failure on claim %s: %v", blobRef, vr.Err)
}
}
if err = mi.db.Execute(
"INSERT IGNORE INTO claims (blobref, signer, verifiedkeyid, date, unverified, claim, permanode, attr, value) "+
"VALUES (?, ?, ?, ?, 'Y', ?, ?, ?, ?)",
blobRef.String(), camli.Signer, verifiedKeyId, camli.ClaimDate,
camli.ClaimType, camli.Permanode,
camli.Attribute, camli.Value); err != nil {
return
}
if verifiedKeyId != "" {
if search.IsIndexedAttribute(camli.Attribute) {
if err = mi.db.Execute("INSERT IGNORE INTO signerattrvalue (keyid, attr, value, claimdate, blobref, permanode) "+
"VALUES (?, ?, ?, ?, ?, ?)",
verifiedKeyId, camli.Attribute, camli.Value,
camli.ClaimDate, blobRef.String(), camli.Permanode); err != nil {
return
}
}
if search.IsFulltextAttribute(camli.Attribute) {
// TODO(mpl): do the DELETEs as well
if err = mi.db.Execute("INSERT IGNORE INTO signerattrvalueft (keyid, attr, value, claimdate, blobref, permanode) "+
"VALUES (?, ?, ?, ?, ?, ?)",
verifiedKeyId, camli.Attribute, camli.Value,
camli.ClaimDate, blobRef.String(), camli.Permanode); err != nil {
return
}
}
if strings.HasPrefix(camli.Attribute, "camliPath:") {
// TODO: deal with set-attribute vs. del-attribute
// properly? I think we get it for free when
// del-attribute has no Value, but we need to deal
// with the case where they explicitly delete the
// current value.
suffix := camli.Attribute[len("camliPath:"):]
active := "Y"
if camli.ClaimType == "del-attribute" {
active = "N"
}
if err = mi.db.Execute("INSERT IGNORE INTO path (claimref, claimdate, keyid, baseref, suffix, targetref, active) "+
"VALUES (?, ?, ?, ?, ?, ?, ?)",
blobRef.String(), camli.ClaimDate, verifiedKeyId, camli.Permanode, suffix, camli.Value, active); err != nil {
return
}
}
}
// And update the lastmod on the permanode row.
if err = mi.db.Execute(
"INSERT IGNORE INTO permanodes (blobref) VALUES (?)",
pnBlobref.String()); err != nil {
return
}
if err = mi.db.Execute(
"UPDATE permanodes SET lastmod=? WHERE blobref=? AND ? > lastmod",
camli.ClaimDate, pnBlobref.String(), camli.ClaimDate); err != nil {
return
}
return nil
}
func (mi *Indexer) populatePermanode(blobRef *blobref.BlobRef, camli *schema.Superset) (err error) {
err = mi.db.Execute(
"INSERT IGNORE INTO permanodes (blobref, unverified, signer, lastmod) "+
"VALUES (?, 'Y', ?, '') "+
"ON DUPLICATE KEY UPDATE unverified = 'Y', signer = ?",
blobRef.String(), camli.Signer, camli.Signer)
return
}
func (mi *Indexer) populateFile(blobRef *blobref.BlobRef, ss *schema.Superset) (err error) {
seekFetcher, err := blobref.SeekerFromStreamingFetcher(mi.BlobSource)
if err != nil {
return err
}
sha1 := sha1.New()
fr, err := ss.NewFileReader(seekFetcher)
if err != nil {
log.Printf("mysqlindex: error indexing file %s: %v", blobRef, err)
return nil
}
mime, reader := magic.MimeTypeFromReader(fr)
n, err := io.Copy(sha1, reader)
if err != nil {
// TODO: job scheduling system to retry this spaced
// out max n times. Right now our options are
// ignoring this error (forever) or returning the
// error and making the indexing try again (likely
// forever failing). Both options suck. For now just
// log and act like all's okay.
log.Printf("mysqlindex: error indexing file %s: %v", blobRef, err)
return nil
}
log.Printf("file %s blobref is %s, size %d", blobRef, blobref.FromHash("sha1", sha1), n)
err = mi.db.Execute(
"INSERT IGNORE INTO bytesfiles (schemaref, camlitype, wholedigest, size, filename, mime) VALUES (?, ?, ?, ?, ?, ?)",
blobRef.String(),
"file",
blobref.FromHash("sha1", sha1).String(),
n,
ss.FileNameString(),
mime,
)
return
}

View File

@ -1,377 +0,0 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
nYou may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysqlindexer
import (
"errors"
"fmt"
"log"
"os"
"strings"
"time"
"camlistore.org/pkg/blobref"
"camlistore.org/pkg/search"
)
// Statically verify that Indexer implements the search.Index interface.
var _ search.Index = (*Indexer)(nil)
type permaNodeRow struct {
blobref string
signer string
lastmod string // "2011-03-13T23:30:19.03946Z"
}
func (mi *Indexer) GetRecentPermanodes(dest chan *search.Result, owner *blobref.BlobRef, limit int) error {
defer close(dest)
if owner == nil {
panic("nil owner")
}
rs, err := mi.db.Query("SELECT blobref, signer, lastmod FROM permanodes WHERE signer = ? AND lastmod <> '' "+
"ORDER BY lastmod DESC LIMIT ?",
owner.String(), limit)
if err != nil {
return err
}
defer rs.Close()
var blobstr, signerstr, modstr string
for rs.Next() {
if err := rs.Scan(&blobstr, &signerstr, &modstr); err != nil {
return err
}
br := blobref.Parse(blobstr)
if br == nil {
continue
}
signer := blobref.Parse(signerstr)
if signer == nil {
continue
}
modstr = trimRFC3339Subseconds(modstr)
t, err := time.Parse(time.RFC3339, modstr)
if err != nil {
log.Printf("Skipping; error parsing time %q: %v", modstr, err)
continue
}
dest <- &search.Result{
BlobRef: br,
Signer: signer,
LastModTime: t.Unix(),
}
}
return nil
}
func trimRFC3339Subseconds(s string) string {
if !strings.HasSuffix(s, "Z") || len(s) < 20 || s[19] != '.' {
return s
}
return s[:19] + "Z"
}
type claimsRow struct {
blobref, signer, date, claim, unverified, permanode, attr, value string
}
func (mi *Indexer) GetOwnerClaims(permanode, owner *blobref.BlobRef) (claims search.ClaimList, err error) {
claims = make(search.ClaimList, 0)
// TODO: ignore rows where unverified = 'N'
rs, err := mi.db.Query("SELECT blobref, date, claim, attr, value FROM claims WHERE permanode = ? AND signer = ?",
permanode.String(), owner.String())
if err != nil {
return
}
defer rs.Close()
var row claimsRow
for rs.Next() {
err = rs.Scan(&row.blobref, &row.date, &row.claim, &row.attr, &row.value)
if err != nil {
return
}
t, err := time.Parse(time.RFC3339, trimRFC3339Subseconds(row.date))
if err != nil {
log.Printf("Skipping; error parsing time %q: %v", row.date, err)
continue
}
claims = append(claims, &search.Claim{
BlobRef: blobref.Parse(row.blobref),
Signer: owner,
Permanode: permanode,
Type: row.claim,
Date: t,
Attr: row.attr,
Value: row.value,
})
}
return
}
func (mi *Indexer) GetBlobMimeType(blob *blobref.BlobRef) (mime string, size int64, err error) {
rs, err := mi.db.Query("SELECT type, size FROM blobs WHERE blobref=?", blob.String())
if err != nil {
return
}
defer rs.Close()
if !rs.Next() {
err = os.ENOENT
return
}
err = rs.Scan(&mime, &size)
return
}
func (mi *Indexer) SearchPermanodesWithAttr(dest chan<- *blobref.BlobRef, request *search.PermanodeByAttrRequest) error {
defer close(dest)
keyId, err := mi.keyIdOfSigner(request.Signer)
if err != nil {
return err
}
query := ""
var rs ResultSet
if request.Attribute == "" && !request.FuzzyMatch {
return errors.New("mysqlindexer: Attribute is required if FuzzyMatch is off.")
}
if request.Attribute == "" {
query = "SELECT permanode FROM signerattrvalueft WHERE keyid = ? AND MATCH(value) AGAINST (?) AND claimdate <> '' LIMIT ?"
rs, err = mi.db.Query(query, keyId, request.Query, request.MaxResults)
if err != nil {
return err
}
} else {
if request.FuzzyMatch {
query = "SELECT permanode FROM signerattrvalueft WHERE keyid = ? AND attr = ? AND MATCH(value) AGAINST (?) AND claimdate <> '' LIMIT ?"
rs, err = mi.db.Query(query, keyId, request.Attribute,
request.Query, request.MaxResults)
if err != nil {
return err
}
} else {
query = "SELECT permanode FROM signerattrvalue WHERE keyid = ? AND attr = ? AND value = ? AND claimdate <> '' ORDER BY claimdate DESC LIMIT ?"
rs, err = mi.db.Query(query, keyId, request.Attribute,
request.Query, request.MaxResults)
if err != nil {
return err
}
}
}
defer rs.Close()
pn := ""
for rs.Next() {
if err := rs.Scan(&pn); err != nil {
return err
}
br := blobref.Parse(pn)
if br == nil {
continue
}
dest <- br
}
return nil
}
func (mi *Indexer) ExistingFileSchemas(wholeDigest *blobref.BlobRef) (files []*blobref.BlobRef, err error) {
rs, err := mi.db.Query("SELECT schemaref FROM bytesfiles WHERE wholedigest=?", wholeDigest.String())
if err != nil {
return
}
defer rs.Close()
ref := ""
for rs.Next() {
if err := rs.Scan(&ref); err != nil {
return nil, err
}
files = append(files, blobref.Parse(ref))
}
return
}
func (mi *Indexer) GetFileInfo(fileRef *blobref.BlobRef) (*search.FileInfo, error) {
rs, err := mi.db.Query("SELECT size, filename, mime FROM bytesfiles WHERE schemaref=?",
fileRef.String())
if err != nil {
return nil, err
}
defer rs.Close()
if !rs.Next() {
return nil, os.ENOENT
}
var fi search.FileInfo
err = rs.Scan(&fi.Size, &fi.FileName, &fi.MimeType)
return &fi, err
}
func (mi *Indexer) keyIdOfSigner(signer *blobref.BlobRef) (keyid string, err error) {
rs, err := mi.db.Query("SELECT keyid FROM signerkeyid WHERE blobref=?", signer.String())
if err != nil {
return
}
defer rs.Close()
if !rs.Next() {
return "", fmt.Errorf("mysqlindexer: failed to find keyid of signer %q", signer.String())
}
err = rs.Scan(&keyid)
return
}
func (mi *Indexer) PermanodeOfSignerAttrValue(signer *blobref.BlobRef, attr, val string) (permanode *blobref.BlobRef, err error) {
keyId, err := mi.keyIdOfSigner(signer)
if err != nil {
return nil, err
}
rs, err := mi.db.Query("SELECT permanode FROM signerattrvalue WHERE keyid=? AND attr=? AND value=? ORDER BY claimdate DESC LIMIT 1",
keyId, attr, val)
if err != nil {
return
}
defer rs.Close()
if !rs.Next() {
return nil, errors.New("mysqlindexer: no signerattrvalue match")
}
var blobstr string
if err = rs.Scan(&blobstr); err != nil {
return
}
return blobref.Parse(blobstr), nil
}
func (mi *Indexer) PathsOfSignerTarget(signer, target *blobref.BlobRef) (paths []*search.Path, err error) {
keyId, err := mi.keyIdOfSigner(signer)
if err != nil {
return
}
rs, err := mi.db.Query("SELECT claimref, claimdate, baseref, suffix, active FROM path WHERE keyid=? AND targetref=?",
keyId, target.String())
if err != nil {
return
}
defer rs.Close()
mostRecent := make(map[string]*search.Path)
maxClaimDates := make(map[string]string)
var claimRef, claimDate, baseRef, suffix, active string
for rs.Next() {
if err = rs.Scan(&claimRef, &claimDate, &baseRef, &suffix, &active); err != nil {
return
}
key := baseRef + "/" + suffix
if claimDate > maxClaimDates[key] {
maxClaimDates[key] = claimDate
if active == "Y" {
mostRecent[key] = &search.Path{
Claim: blobref.MustParse(claimRef),
ClaimDate: claimDate,
Base: blobref.MustParse(baseRef),
Suffix: suffix,
}
} else {
delete(mostRecent, key)
}
}
}
paths = make([]*search.Path, 0)
for _, v := range mostRecent {
paths = append(paths, v)
}
return paths, nil
}
func (mi *Indexer) PathLookup(signer, base *blobref.BlobRef, suffix string, at time.Time) (*search.Path, error) {
// TODO: pass along the at time to a new helper function to
// filter? maybe not worth it, since this list should be
// small.
paths, err := mi.PathsLookup(signer, base, suffix)
if err != nil {
return nil, err
}
var (
newest = int64(0)
atSeconds = int64(0)
best *search.Path
)
if at != nil {
atSeconds = at.Unix()
}
for _, path := range paths {
t, err := time.Parse(time.RFC3339, trimRFC3339Subseconds(path.ClaimDate))
if err != nil {
continue
}
secs := t.Unix()
if atSeconds != 0 && secs > atSeconds {
// Too new
continue
}
if newest > secs {
// Too old
continue
}
// Just right
newest, best = secs, path
}
if best == nil {
return nil, os.ENOENT
}
return best, nil
}
func (mi *Indexer) PathsLookup(signer, base *blobref.BlobRef, suffix string) (paths []*search.Path, err error) {
keyId, err := mi.keyIdOfSigner(signer)
if err != nil {
return
}
rs, err := mi.db.Query("SELECT claimref, claimdate, targetref FROM path "+
"WHERE keyid=? AND baseref=? AND suffix=?",
keyId, base.String(), suffix)
if err != nil {
return
}
defer rs.Close()
var claimref, claimdate, targetref string
for rs.Next() {
if err = rs.Scan(&claimref, &claimdate, &targetref); err != nil {
return
}
t, err := time.Parse(time.RFC3339, trimRFC3339Subseconds(claimdate))
if err != nil {
log.Printf("Skipping bogus path row with bad time: %q", claimref)
continue
}
_ = t // TODO: use this?
paths = append(paths, &search.Path{
Claim: blobref.Parse(claimref),
ClaimDate: claimdate,
Base: base,
Target: blobref.Parse(targetref),
Suffix: suffix,
})
}
return
}

View File

@ -1,40 +0,0 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
nYou may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package mysqlindexer
import (
"camlistore.org/pkg/blobref"
"fmt"
"strings"
)
func (mi *Indexer) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) error {
quotedBlobRefs := []string{}
for _, br := range blobs {
quotedBlobRefs = append(quotedBlobRefs, fmt.Sprintf("%q", br.String()))
}
sql := "SELECT blobref, size FROM blobs WHERE blobref IN (" +
strings.Join(quotedBlobRefs, ", ") + ")"
rs, err := mi.db.Query(sql)
if err != nil {
return err
}
defer rs.Close()
return readBlobRefSizeResults(dest, rs)
}