From ef7e3c2e90c6ee8300019659172ce57a9bb1688f Mon Sep 17 00:00:00 2001 From: Brad Fitzpatrick Date: Sat, 24 Mar 2012 19:45:23 -0700 Subject: [PATCH] make pkg/mysqlindexer and camdbinit compile again, even if not working yet Change-Id: I3fbdfda6c456c079a3ac58ea988e43301e957f11 --- clients/go/camdbinit/camdbinit.go | 40 ++-- pkg/mysqlindexer/dbschema.go | 107 +-------- pkg/mysqlindexer/enumerate.go | 51 ---- pkg/mysqlindexer/mysqlindexer.go | 113 ++++----- pkg/mysqlindexer/mywrap.go | 268 --------------------- pkg/mysqlindexer/receive.go | 209 ----------------- pkg/mysqlindexer/search.go | 377 ------------------------------ pkg/mysqlindexer/stat.go | 40 ---- 8 files changed, 76 insertions(+), 1129 deletions(-) delete mode 100644 pkg/mysqlindexer/enumerate.go delete mode 100644 pkg/mysqlindexer/mywrap.go delete mode 100644 pkg/mysqlindexer/receive.go delete mode 100644 pkg/mysqlindexer/search.go delete mode 100644 pkg/mysqlindexer/stat.go diff --git a/clients/go/camdbinit/camdbinit.go b/clients/go/camdbinit/camdbinit.go index 198ba8274..e42b041b0 100644 --- a/clients/go/camdbinit/camdbinit.go +++ b/clients/go/camdbinit/camdbinit.go @@ -17,6 +17,7 @@ limitations under the License. package main import ( + "database/sql" "flag" "fmt" "os" @@ -24,16 +25,18 @@ import ( "camlistore.org/pkg/mysqlindexer" - mysql "camli/third_party/github.com/Philio/GoMySQL" + _ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv" ) -var flagUser = flag.String("user", "root", "MySQL admin user") -var flagPassword = flag.String("password", "(prompt)", "MySQL admin password") -var flagHost = flag.String("host", "localhost", "MySQ host[:port]") -var flagDatabase = flag.String("database", "", "MySQL camlistore to wipe/create database") +var ( + flagUser = flag.String("user", "root", "MySQL admin user") + flagPassword = flag.String("password", "(prompt)", "MySQL admin password") + flagHost = flag.String("host", "localhost", "MySQ host[:port]") + flagDatabase = flag.String("database", "", "MySQL camlistore to wipe/create database") -var flagWipe = flag.Bool("wipe", false, "Wipe the database and re-create it?") -var flagIgnore = flag.Bool("ignoreexists", false, "Treat existence of the database as okay and exit.") + flagWipe = flag.Bool("wipe", false, "Wipe the database and re-create it?") + flagIgnore = flag.Bool("ignoreexists", false, "Treat existence of the database as okay and exit.") +) func main() { flag.Parse() @@ -41,7 +44,7 @@ func main() { exitf("--database flag required") } - db, err := mysql.DialTCP(*flagHost, *flagUser, *flagPassword, "") + db, err := sql.Open("mymysql", *flagDatabase + "/" + *flagUser + "/" + *flagPassword) if err != nil { exitf("Error connecting to database: %v", err) } @@ -66,25 +69,22 @@ func main() { do(db, fmt.Sprintf(`REPLACE INTO meta VALUES ('version', '%d')`, mysqlindexer.SchemaVersion())) } -func do(db *mysql.Client, sql string) { - err := db.Query(sql) +func do(db *sql.DB, sql string) { + _, err := db.Exec(sql) if err == nil { return } exitf("Error %v running SQL: %s", err, sql) } -func dbExists(db *mysql.Client, dbname string) bool { - check(db.Query("SHOW DATABASES")) - result, err := db.UseResult() +func dbExists(db *sql.DB, dbname string) bool { + rows, err := db.Query("SHOW DATABASES") check(err) - defer result.Free() - for { - row := result.FetchRow() - if row == nil { - break - } - if row[0].(string) == dbname { + defer rows.Close() + for rows.Next() { + var db string + check(rows.Scan(&db)) + if db == dbname { return true } } diff --git a/pkg/mysqlindexer/dbschema.go b/pkg/mysqlindexer/dbschema.go index 130837a7f..0929b9f8a 100644 --- a/pkg/mysqlindexer/dbschema.go +++ b/pkg/mysqlindexer/dbschema.go @@ -18,7 +18,7 @@ package mysqlindexer import () -const requiredSchemaVersion = 18 +const requiredSchemaVersion = 19 func SchemaVersion() int { return requiredSchemaVersion @@ -26,106 +26,13 @@ func SchemaVersion() int { func SQLCreateTables() []string { return []string{ - - `CREATE TABLE blobs ( -blobref VARCHAR(128) NOT NULL PRIMARY KEY, -size INTEGER NOT NULL, -type VARCHAR(100))`, - - `CREATE TABLE claims ( -blobref VARCHAR(128) NOT NULL PRIMARY KEY, -signer VARCHAR(128) NOT NULL, -verifiedkeyid VARCHAR(128) NULL, -date VARCHAR(40) NOT NULL, - INDEX (signer, date), - INDEX (verifiedkeyid, date), -unverified CHAR(1) NULL, -claim VARCHAR(50) NOT NULL, -permanode VARCHAR(128) NOT NULL, - INDEX (permanode, signer, date), -attr VARCHAR(128) NULL, -value VARCHAR(128) NULL)`, - - `CREATE TABLE permanodes ( -blobref VARCHAR(128) NOT NULL PRIMARY KEY, -unverified CHAR(1) NULL, -signer VARCHAR(128) NOT NULL DEFAULT '', -lastmod VARCHAR(40) NOT NULL DEFAULT '', -INDEX (signer, lastmod))`, - - `CREATE TABLE bytesfiles ( -schemaref VARCHAR(128) NOT NULL, -camlitype VARCHAR(32) NOT NULL, -wholedigest VARCHAR(128) NOT NULL, -filename VARCHAR(255), -size BIGINT, -mime VARCHAR(255), -PRIMARY KEY(schemaref, wholedigest), -INDEX (wholedigest))`, - - // For index.PermanodeOfSignerAttrValue: - // Rows are one per camliType "claim", for claimType "set-attribute" or "add-attribute", - // for attribute values that are known (needed to be indexed, e.g. "camliNamedRoot") - // - // keyid is verified GPG KeyId (e.g. "2931A67C26F5ABDA") - // attr is e.g. "camliNamedRoot" - // value is the claim's "value" field - // claimdate is the "claimDate" field. - // blobref is the blobref of the claim. - // permanode is the claim's "permaNode" field. - `CREATE TABLE signerattrvalue ( -keyid VARCHAR(40) NOT NULL, -attr VARCHAR(128) NOT NULL, -value VARCHAR(255) NOT NULL, -claimdate VARCHAR(40) NOT NULL, -INDEX (keyid, attr, value, claimdate), -blobref VARCHAR(128) NOT NULL, -PRIMARY KEY (blobref), -permanode VARCHAR(128) NOT NULL, -INDEX (permanode))`, - - // "Shadow" copy of signerattrvalue for fulltext searches. - // Kept in sync witch signerattrvalue directly in the go code for now, not with triggers. - // As of MySQL 5.5, fulltext search is still only available with MyISAM tables - // (see http://dev.mysql.com/doc/refman/5.5/en/fulltext-search.html) - `CREATE TABLE signerattrvalueft ( -keyid VARCHAR(40) NOT NULL, -attr VARCHAR(128) NOT NULL, -value VARCHAR(255) NOT NULL, -claimdate VARCHAR(40) NOT NULL, -INDEX (keyid, attr, value, claimdate), -blobref VARCHAR(128) NOT NULL, -PRIMARY KEY (blobref), -permanode VARCHAR(128) NOT NULL, -INDEX (permanode), -FULLTEXT (value)) ENGINE=MyISAM`, + `CREATE TABLE rows ( + k VARCHAR(255) NOT NULL PRIMARY KEY, + v VARCHAR(255))`, `CREATE TABLE meta ( -metakey VARCHAR(255) NOT NULL PRIMARY KEY, -value VARCHAR(255) NOT NULL)`, - - // Map from blobref (of ASCII armored public key) to keyid - `CREATE TABLE signerkeyid ( -blobref VARCHAR(128) NOT NULL, -PRIMARY KEY (blobref), -keyid VARCHAR(128) NOT NULL, -INDEX (keyid) -)`, - - // Bi-direction index of camliPath claims - // active is "Y" or "N". - `CREATE TABLE path ( -claimref VARCHAR(128) NOT NULL, -PRIMARY KEY (claimref), -claimdate VARCHAR(40) NOT NULL, -keyid VARCHAR(128) NOT NULL, -baseref VARCHAR(128) NOT NULL, -suffix VARCHAR(255) NOT NULL, -targetref VARCHAR(128) NOT NULL, -active CHAR(1) NOT NULL, -INDEX (keyid, baseref, suffix), -INDEX (targetref, keyid), -INDEX (baseref, keyid) -)`, + metakey VARCHAR(255) NOT NULL PRIMARY KEY, + value VARCHAR(255) NOT NULL)`, } } + diff --git a/pkg/mysqlindexer/enumerate.go b/pkg/mysqlindexer/enumerate.go deleted file mode 100644 index 397874bb0..000000000 --- a/pkg/mysqlindexer/enumerate.go +++ /dev/null @@ -1,51 +0,0 @@ -/* -Copyright 2011 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -nYou may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package mysqlindexer - -import "camlistore/pkg/blobref" - -func (mi *Indexer) EnumerateBlobs(dest chan<- blobref.SizedBlobRef, after string, limit uint, waitSeconds int) error { - defer close(dest) - rs, err := mi.db.Query("SELECT blobref, size FROM blobs WHERE blobref > ? ORDER BY blobref LIMIT ?", - after, limit) - if err != nil { - return err - } - defer rs.Close() - return readBlobRefSizeResults(dest, rs) -} - -func readBlobRefSizeResults(dest chan<- blobref.SizedBlobRef, rs ResultSet) error { - var ( - blobstr string - size int64 - ) - for rs.Next() { - if err := rs.Scan(&blobstr, &size); err != nil { - return err - } - br := blobref.Parse(blobstr) - if br == nil { - continue - } - dest <- blobref.SizedBlobRef{ - BlobRef: br, - Size: size, - } - } - return nil -} diff --git a/pkg/mysqlindexer/mysqlindexer.go b/pkg/mysqlindexer/mysqlindexer.go index 67864f089..c0bdeb6e6 100644 --- a/pkg/mysqlindexer/mysqlindexer.go +++ b/pkg/mysqlindexer/mysqlindexer.go @@ -17,60 +17,68 @@ limitations under the License. package mysqlindexer import ( + "database/sql" "errors" "fmt" - "io" "os" - "strconv" - "camlistore.org/pkg/blobref" "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/index" "camlistore.org/pkg/jsonconfig" + + _ "camlistore.org/third_party/github.com/ziutek/mymysql/godrv" ) -type Indexer struct { - *blobserver.SimpleBlobHubPartitionMap +type myIndexStorage struct { + host, user, password, database string - KeyFetcher blobref.StreamingFetcher // for verifying claims + db *sql.DB +} - // Used for fetching blobs to find the complete sha1s of file & bytes - // schema blobs. - BlobSource blobserver.Storage +var _ index.IndexStorage = (*myIndexStorage)(nil) - db *MySQLWrapper +func (ms *myIndexStorage) BeginBatch() index.BatchMutation { + // TODO + return nil +} + +func (ms *myIndexStorage) CommitBatch(b index.BatchMutation) error { + return errors.New("TODO(bradfitz): implement") +} + +func (ms *myIndexStorage) Get(key string) (value string, err error) { + panic("TODO(bradfitz): implement") +} + +func (ms *myIndexStorage) Set(key, value string) error { + return errors.New("TODO(bradfitz): implement") +} + +func (ms *myIndexStorage) Delete(key string) error { + return errors.New("TODO(bradfitz): implement") +} + +func (ms *myIndexStorage) Find(key string) index.Iterator { + panic("TODO(bradfitz): implement") } func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, error) { - blobPrefix := config.RequiredString("blobSource") - db := &MySQLWrapper{ - Host: config.OptionalString("host", "localhost"), - User: config.RequiredString("user"), - Password: config.OptionalString("password", ""), - Database: config.RequiredString("database"), - } - indexer := &Indexer{ - SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{}, - db: db, + is := &myIndexStorage{ + host: config.OptionalString("host", "localhost"), + user: config.RequiredString("user"), + password: config.OptionalString("password", ""), + database: config.RequiredString("database"), } if err := config.Validate(); err != nil { return nil, err } - sto, err := ld.GetStorage(blobPrefix) - if err != nil { - return nil, err - } - indexer.BlobSource = sto + // TODO: connect; set is.db + // TODO: ping it; see if it's alive; else return err - // Good enough, for now: - indexer.KeyFetcher = indexer.BlobSource + indexer := index.New(is) - ok, err := indexer.IsAlive() - if !ok { - return nil, fmt.Errorf("Failed to connect to MySQL: %v", err) - } - - version, err := indexer.SchemaVersion() + version, err := is.SchemaVersion() if err != nil { return nil, fmt.Errorf("error getting schema version (need to init database?): %v", err) } @@ -91,36 +99,13 @@ func init() { blobserver.RegisterStorageConstructor("mysqlindexer", blobserver.StorageConstructor(newFromConfig)) } -func (mi *Indexer) IsAlive() (ok bool, err error) { - err = mi.db.Ping() - ok = err == nil +func (mi *myIndexStorage) IsAlive() (ok bool, err error) { + // TODO(bradfitz): something more efficient here? + _, err = mi.SchemaVersion() + return err == nil, err +} + +func (mi *myIndexStorage) SchemaVersion() (version int, err error) { + err = mi.db.QueryRow("SELECT value FROM meta WHERE metakey='version'").Scan(&version) return } - -func (mi *Indexer) SchemaVersion() (version int, err error) { - rs, err := mi.db.Query("SELECT value FROM meta WHERE metakey='version'") - if err != nil { - return - } - defer rs.Close() - if !rs.Next() { - return 0, nil - } - strVersion := "" - if err = rs.Scan(&strVersion); err != nil { - return - } - return strconv.Atoi(strVersion) -} - -func (mi *Indexer) Fetch(blob *blobref.BlobRef) (blobref.ReadSeekCloser, int64, error) { - return nil, 0, errors.New("Fetch isn't supported by the MySQL indexer") -} - -func (mi *Indexer) FetchStreaming(blob *blobref.BlobRef) (io.ReadCloser, int64, error) { - return nil, 0, errors.New("Fetch isn't supported by the MySQL indexer") -} - -func (mi *Indexer) RemoveBlobs(blobs []*blobref.BlobRef) error { - return errors.New("RemoveBlobs isn't supported by the MySQL indexer") -} diff --git a/pkg/mysqlindexer/mywrap.go b/pkg/mysqlindexer/mywrap.go deleted file mode 100644 index 5b98dc6d3..000000000 --- a/pkg/mysqlindexer/mywrap.go +++ /dev/null @@ -1,268 +0,0 @@ -/* -Copyright 2011 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package mysqlindexer - -import ( - "errors" - "fmt" - "log" - "strconv" - "strings" - "sync" - - mysql "camli/third_party/github.com/camlistore/GoMySQL" -) - -var _ = log.Printf - -type MySQLWrapper struct { - // Host may optionally end in ":port". - Host, User, Password, Database string - - clientLock sync.Mutex - cachedClients []*mysql.Client -} - -// ResultSet is a cursor. It starts before the first row. -type ResultSet interface { - // Move cursor to next row, returning true if there's a next - // row. - Next() bool - - Scan(ptrs ...interface{}) error - Close() error -} - -type emptyResultSet struct{} - -func (emptyResultSet) Next() bool { return false } -func (emptyResultSet) Scan(ptrs ...interface{}) error { return errors.New("bogus") } -func (emptyResultSet) Close() error { return nil } - -type myRes struct { - mw *MySQLWrapper - c *mysql.Client - sql string - s *mysql.Statement - res *mysql.Result - - row mysql.Row // type Row []interface{} (or nil on EOF) - closed bool -} - -func (r *myRes) Next() bool { - r.row = r.res.FetchRow() - return r.row != nil -} - -func scanAssign(idx int, field *mysql.Field, fromi, destPtr interface{}) error { - switch v := fromi.(type) { - case string: - if strPtr, ok := destPtr.(*string); ok { - *strPtr = v - return nil - } - case int64: - if p, ok := destPtr.(*int64); ok { - *p = v - return nil - } - } - return fmt.Errorf("Scan index %d: invalid conversion from %T -> %T", idx, fromi, destPtr) -} - -func decodeColumn(idx int, field *mysql.Field, val interface{}) (interface{}, error) { - var dec interface{} - var err error - switch v := val.(type) { - case int64: - return v, nil - case []byte: - switch field.Type { - case mysql.FIELD_TYPE_TINY, mysql.FIELD_TYPE_SHORT, mysql.FIELD_TYPE_YEAR, mysql.FIELD_TYPE_INT24, mysql.FIELD_TYPE_LONG, mysql.FIELD_TYPE_LONGLONG: - if field.Flags&mysql.FLAG_UNSIGNED != 0 { - dec, err = strconv.ParseUint(string(v), 10, 64) - } else { - dec, err = strconv.ParseInt(string(v), 10, 64) - } - if err != nil { - return nil, fmt.Errorf("mysql: strconv.Atoi64 error on field %d: %v", idx, err) - } - case mysql.FIELD_TYPE_FLOAT, mysql.FIELD_TYPE_DOUBLE: - dec, err = strconv.ParseFloat(string(v), 64) - if err != nil { - return nil, fmt.Errorf("mysql: strconv.Atof64 error on field %d: %v", idx, err) - } - case mysql.FIELD_TYPE_DECIMAL, mysql.FIELD_TYPE_NEWDECIMAL, mysql.FIELD_TYPE_VARCHAR, mysql.FIELD_TYPE_VAR_STRING, mysql.FIELD_TYPE_STRING: - dec = string(v) - default: - return nil, fmt.Errorf("row[%d] was a []byte but unexpected field type %d", idx, field.Type) - } - return dec, nil - } - return nil, fmt.Errorf("expected row[%d] contents to be a []byte, got %T for field type %d", idx, val, field.Type) -} - -func (r *myRes) Scan(ptrs ...interface{}) (outerr error) { - defer func() { - if outerr != nil { - log.Printf("Scan error on %q: %v", r.sql, outerr) - } - }() - if r.row == nil { - return errors.New("mysql: Scan called but cursor isn't on a valid row. (Next must return true before calling Scan)") - } - if uint64(len(ptrs)) != r.res.FieldCount() { - return fmt.Errorf("mysql: result set has %d fields doesn't match %d arguments to Scan", - r.res.FieldCount(), len(ptrs)) - } - if len(r.row) != len(ptrs) { - panic(fmt.Sprintf("GoMySQL library is confused. row size is %d, expect %d", len(r.row), len(ptrs))) - } - fields := r.res.FetchFields() // just an accessor, doesn't fetch anything - - for i, ptr := range ptrs { - field := fields[i] - dec, err := decodeColumn(i, field, r.row[i]) - if err != nil { - return err - } - - if err := scanAssign(i, field, dec, ptr); err != nil { - return err - } - - } - return nil -} - -func (r *myRes) Close() error { - if r.closed { - return errors.New("mysqlwrapper: ResultSet already closed") - } - r.closed = true - if err := r.s.Close(); err != nil { - return err - } - if r.res != nil { - r.res.Free() - } - r.mw.releaseConnection(r.c) - r.c = nil - r.s = nil - r.res = nil - return nil -} - -func (mw *MySQLWrapper) Execute(sql string, params ...interface{}) error { - rs, err := mw.Query(sql, params...) - if rs != nil { - rs.Close() - } - return err -} - -func (mw *MySQLWrapper) Query(sql string, params ...interface{}) (ResultSet, error) { - c, err := mw.getConnection() - if err != nil { - return nil, err - } - s, err := c.Prepare(sql) - if err != nil { - c.Close() // defensive. TODO: figure out when safe not to. - return nil, err - } - if len(params) > 0 { - for i, pv := range params { - // TODO: check that they're all supported. - // fallback: if a Stringer, use that. - _ = i - _ = pv - } - if err := s.BindParams(params...); err != nil { - if strings.Contains(err.Error(), "Invalid parameter number") { - println("Invalid parameters for query: ", sql) - } - c.Close() - return nil, err - } - } - if err := s.Execute(); err != nil { - c.Close() // defensive. TODO: figure out when safe not to. - return nil, err - } - res, err := s.UseResult() - if err != nil { - if ce, ok := err.(*mysql.ClientError); ok && ce.Errno == mysql.CR_NO_RESULT_SET { - mw.releaseConnection(c) - return emptyResultSet{}, nil - } - c.Close() // defensive. TODO: figure out when safe not to. - return nil, err - } - return &myRes{mw: mw, c: c, s: s, sql: sql, res: res}, nil -} - -func testClient(client *mysql.Client) error { - err := client.Query("SELECT 1 + 1") - if err != nil { - return err - } - _, err = client.UseResult() - if err != nil { - return err - } - client.FreeResult() - return nil -} - -func (mw *MySQLWrapper) Ping() error { - client, err := mw.getConnection() - if err != nil { - return err - } - defer mw.releaseConnection(client) - return testClient(client) -} - -// Get a free cached connection or allocate a new one. -func (mw *MySQLWrapper) getConnection() (client *mysql.Client, err error) { - mw.clientLock.Lock() - if len(mw.cachedClients) > 0 { - defer mw.clientLock.Unlock() - client = mw.cachedClients[len(mw.cachedClients)-1] - mw.cachedClients = mw.cachedClients[:len(mw.cachedClients)-1] - // TODO: Outside the mutex, double check that the client is still good. - return - } - mw.clientLock.Unlock() - - client, err = mysql.DialTCP(mw.Host, mw.User, mw.Password, mw.Database) - return -} - -// Release a client to the cached client pool. -func (mw *MySQLWrapper) releaseConnection(client *mysql.Client) { - // Test the client before returning it. - // TODO: this is overkill probably. - if err := testClient(client); err != nil { - return - } - mw.clientLock.Lock() - defer mw.clientLock.Unlock() - mw.cachedClients = append(mw.cachedClients, client) -} diff --git a/pkg/mysqlindexer/receive.go b/pkg/mysqlindexer/receive.go deleted file mode 100644 index 1e1d7ab31..000000000 --- a/pkg/mysqlindexer/receive.go +++ /dev/null @@ -1,209 +0,0 @@ -/* -Copyright 2011 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -nYou may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package mysqlindexer - -import ( - "crypto/sha1" - "io" - "log" - "strings" - - "camlistore.org/pkg/blobref" - "camlistore.org/pkg/blobserver" - "camlistore.org/pkg/index" - "camlistore.org/pkg/jsonsign" - "camlistore.org/pkg/magic" - "camlistore.org/pkg/schema" - "camlistore.org/pkg/search" -) - -func (mi *Indexer) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader) (retsb blobref.SizedBlobRef, err error) { - sniffer := new(index.BlobSniffer) - hash := blobRef.Hash() - var written int64 - written, err = io.Copy(io.MultiWriter(hash, sniffer), source) - log.Printf("mysqlindexer: hashed+sniffed %d bytes; err %v", written, err) - if err != nil { - return - } - - if !blobRef.HashMatches(hash) { - err = blobserver.ErrCorruptBlob - return - } - - sniffer.Parse() - mimeType := sniffer.MimeType() - log.Printf("mysqlindexer: type=%v; truncated=%v", mimeType, sniffer.IsTruncated()) - - if camli, ok := sniffer.Superset(); ok { - switch camli.Type { - case "claim": - if err = mi.populateClaim(blobRef, camli, sniffer); err != nil { - return - } - case "permanode": - if err = mi.populatePermanode(blobRef, camli); err != nil { - return - } - case "file": - if err = mi.populateFile(blobRef, camli); err != nil { - return - } - } - } - - if err = mi.db.Execute("INSERT IGNORE INTO blobs (blobref, size, type) VALUES (?, ?, ?)", - blobRef.String(), written, mimeType); err != nil { - log.Printf("mysqlindexer: insert into blobs: %v", err) - return - } - - retsb = blobref.SizedBlobRef{BlobRef: blobRef, Size: written} - return -} - -func (mi *Indexer) populateClaim(blobRef *blobref.BlobRef, camli *schema.Superset, sniffer *index.BlobSniffer) (err error) { - pnBlobref := blobref.Parse(camli.Permanode) - if pnBlobref == nil { - // Skip bogus claim with malformed permanode. - return - } - - verifiedKeyId := "" - if rawJson, err := sniffer.Body(); err == nil { - vr := jsonsign.NewVerificationRequest(string(rawJson), mi.KeyFetcher) - if vr.Verify() { - verifiedKeyId = vr.SignerKeyId - log.Printf("mysqlindex: verified claim %s from %s", blobRef, verifiedKeyId) - - if err = mi.db.Execute("INSERT IGNORE INTO signerkeyid (blobref, keyid) "+ - "VALUES (?, ?)", vr.CamliSigner.String(), verifiedKeyId); err != nil { - return - } - } else { - log.Printf("mysqlindex: verification failure on claim %s: %v", blobRef, vr.Err) - } - } - - if err = mi.db.Execute( - "INSERT IGNORE INTO claims (blobref, signer, verifiedkeyid, date, unverified, claim, permanode, attr, value) "+ - "VALUES (?, ?, ?, ?, 'Y', ?, ?, ?, ?)", - blobRef.String(), camli.Signer, verifiedKeyId, camli.ClaimDate, - camli.ClaimType, camli.Permanode, - camli.Attribute, camli.Value); err != nil { - return - } - - if verifiedKeyId != "" { - if search.IsIndexedAttribute(camli.Attribute) { - if err = mi.db.Execute("INSERT IGNORE INTO signerattrvalue (keyid, attr, value, claimdate, blobref, permanode) "+ - "VALUES (?, ?, ?, ?, ?, ?)", - verifiedKeyId, camli.Attribute, camli.Value, - camli.ClaimDate, blobRef.String(), camli.Permanode); err != nil { - return - } - } - if search.IsFulltextAttribute(camli.Attribute) { - // TODO(mpl): do the DELETEs as well - if err = mi.db.Execute("INSERT IGNORE INTO signerattrvalueft (keyid, attr, value, claimdate, blobref, permanode) "+ - "VALUES (?, ?, ?, ?, ?, ?)", - verifiedKeyId, camli.Attribute, camli.Value, - camli.ClaimDate, blobRef.String(), camli.Permanode); err != nil { - return - } - } - if strings.HasPrefix(camli.Attribute, "camliPath:") { - // TODO: deal with set-attribute vs. del-attribute - // properly? I think we get it for free when - // del-attribute has no Value, but we need to deal - // with the case where they explicitly delete the - // current value. - suffix := camli.Attribute[len("camliPath:"):] - active := "Y" - if camli.ClaimType == "del-attribute" { - active = "N" - } - if err = mi.db.Execute("INSERT IGNORE INTO path (claimref, claimdate, keyid, baseref, suffix, targetref, active) "+ - "VALUES (?, ?, ?, ?, ?, ?, ?)", - blobRef.String(), camli.ClaimDate, verifiedKeyId, camli.Permanode, suffix, camli.Value, active); err != nil { - return - } - } - } - - // And update the lastmod on the permanode row. - if err = mi.db.Execute( - "INSERT IGNORE INTO permanodes (blobref) VALUES (?)", - pnBlobref.String()); err != nil { - return - } - if err = mi.db.Execute( - "UPDATE permanodes SET lastmod=? WHERE blobref=? AND ? > lastmod", - camli.ClaimDate, pnBlobref.String(), camli.ClaimDate); err != nil { - return - } - - return nil -} - -func (mi *Indexer) populatePermanode(blobRef *blobref.BlobRef, camli *schema.Superset) (err error) { - err = mi.db.Execute( - "INSERT IGNORE INTO permanodes (blobref, unverified, signer, lastmod) "+ - "VALUES (?, 'Y', ?, '') "+ - "ON DUPLICATE KEY UPDATE unverified = 'Y', signer = ?", - blobRef.String(), camli.Signer, camli.Signer) - return -} - -func (mi *Indexer) populateFile(blobRef *blobref.BlobRef, ss *schema.Superset) (err error) { - seekFetcher, err := blobref.SeekerFromStreamingFetcher(mi.BlobSource) - if err != nil { - return err - } - - sha1 := sha1.New() - fr, err := ss.NewFileReader(seekFetcher) - if err != nil { - log.Printf("mysqlindex: error indexing file %s: %v", blobRef, err) - return nil - } - mime, reader := magic.MimeTypeFromReader(fr) - n, err := io.Copy(sha1, reader) - if err != nil { - // TODO: job scheduling system to retry this spaced - // out max n times. Right now our options are - // ignoring this error (forever) or returning the - // error and making the indexing try again (likely - // forever failing). Both options suck. For now just - // log and act like all's okay. - log.Printf("mysqlindex: error indexing file %s: %v", blobRef, err) - return nil - } - - log.Printf("file %s blobref is %s, size %d", blobRef, blobref.FromHash("sha1", sha1), n) - err = mi.db.Execute( - "INSERT IGNORE INTO bytesfiles (schemaref, camlitype, wholedigest, size, filename, mime) VALUES (?, ?, ?, ?, ?, ?)", - blobRef.String(), - "file", - blobref.FromHash("sha1", sha1).String(), - n, - ss.FileNameString(), - mime, - ) - return -} diff --git a/pkg/mysqlindexer/search.go b/pkg/mysqlindexer/search.go deleted file mode 100644 index c4d6a364f..000000000 --- a/pkg/mysqlindexer/search.go +++ /dev/null @@ -1,377 +0,0 @@ -/* -Copyright 2011 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -nYou may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package mysqlindexer - -import ( - "errors" - "fmt" - "log" - "os" - "strings" - "time" - - "camlistore.org/pkg/blobref" - "camlistore.org/pkg/search" -) - -// Statically verify that Indexer implements the search.Index interface. -var _ search.Index = (*Indexer)(nil) - -type permaNodeRow struct { - blobref string - signer string - lastmod string // "2011-03-13T23:30:19.03946Z" -} - -func (mi *Indexer) GetRecentPermanodes(dest chan *search.Result, owner *blobref.BlobRef, limit int) error { - defer close(dest) - if owner == nil { - panic("nil owner") - } - - rs, err := mi.db.Query("SELECT blobref, signer, lastmod FROM permanodes WHERE signer = ? AND lastmod <> '' "+ - "ORDER BY lastmod DESC LIMIT ?", - owner.String(), limit) - if err != nil { - return err - } - defer rs.Close() - - var blobstr, signerstr, modstr string - for rs.Next() { - if err := rs.Scan(&blobstr, &signerstr, &modstr); err != nil { - return err - } - br := blobref.Parse(blobstr) - if br == nil { - continue - } - signer := blobref.Parse(signerstr) - if signer == nil { - continue - } - modstr = trimRFC3339Subseconds(modstr) - t, err := time.Parse(time.RFC3339, modstr) - if err != nil { - log.Printf("Skipping; error parsing time %q: %v", modstr, err) - continue - } - dest <- &search.Result{ - BlobRef: br, - Signer: signer, - LastModTime: t.Unix(), - } - } - return nil -} - -func trimRFC3339Subseconds(s string) string { - if !strings.HasSuffix(s, "Z") || len(s) < 20 || s[19] != '.' { - return s - } - return s[:19] + "Z" -} - -type claimsRow struct { - blobref, signer, date, claim, unverified, permanode, attr, value string -} - -func (mi *Indexer) GetOwnerClaims(permanode, owner *blobref.BlobRef) (claims search.ClaimList, err error) { - claims = make(search.ClaimList, 0) - - // TODO: ignore rows where unverified = 'N' - rs, err := mi.db.Query("SELECT blobref, date, claim, attr, value FROM claims WHERE permanode = ? AND signer = ?", - permanode.String(), owner.String()) - if err != nil { - return - } - defer rs.Close() - - var row claimsRow - for rs.Next() { - err = rs.Scan(&row.blobref, &row.date, &row.claim, &row.attr, &row.value) - if err != nil { - return - } - t, err := time.Parse(time.RFC3339, trimRFC3339Subseconds(row.date)) - if err != nil { - log.Printf("Skipping; error parsing time %q: %v", row.date, err) - continue - } - claims = append(claims, &search.Claim{ - BlobRef: blobref.Parse(row.blobref), - Signer: owner, - Permanode: permanode, - Type: row.claim, - Date: t, - Attr: row.attr, - Value: row.value, - }) - } - return -} - -func (mi *Indexer) GetBlobMimeType(blob *blobref.BlobRef) (mime string, size int64, err error) { - rs, err := mi.db.Query("SELECT type, size FROM blobs WHERE blobref=?", blob.String()) - if err != nil { - return - } - defer rs.Close() - if !rs.Next() { - err = os.ENOENT - return - } - err = rs.Scan(&mime, &size) - return -} - -func (mi *Indexer) SearchPermanodesWithAttr(dest chan<- *blobref.BlobRef, request *search.PermanodeByAttrRequest) error { - defer close(dest) - keyId, err := mi.keyIdOfSigner(request.Signer) - if err != nil { - return err - } - query := "" - var rs ResultSet - if request.Attribute == "" && !request.FuzzyMatch { - return errors.New("mysqlindexer: Attribute is required if FuzzyMatch is off.") - } - if request.Attribute == "" { - query = "SELECT permanode FROM signerattrvalueft WHERE keyid = ? AND MATCH(value) AGAINST (?) AND claimdate <> '' LIMIT ?" - rs, err = mi.db.Query(query, keyId, request.Query, request.MaxResults) - if err != nil { - return err - } - } else { - if request.FuzzyMatch { - query = "SELECT permanode FROM signerattrvalueft WHERE keyid = ? AND attr = ? AND MATCH(value) AGAINST (?) AND claimdate <> '' LIMIT ?" - rs, err = mi.db.Query(query, keyId, request.Attribute, - request.Query, request.MaxResults) - if err != nil { - return err - } - } else { - query = "SELECT permanode FROM signerattrvalue WHERE keyid = ? AND attr = ? AND value = ? AND claimdate <> '' ORDER BY claimdate DESC LIMIT ?" - rs, err = mi.db.Query(query, keyId, request.Attribute, - request.Query, request.MaxResults) - if err != nil { - return err - } - } - } - defer rs.Close() - - pn := "" - for rs.Next() { - if err := rs.Scan(&pn); err != nil { - return err - } - br := blobref.Parse(pn) - if br == nil { - continue - } - dest <- br - } - return nil -} - -func (mi *Indexer) ExistingFileSchemas(wholeDigest *blobref.BlobRef) (files []*blobref.BlobRef, err error) { - rs, err := mi.db.Query("SELECT schemaref FROM bytesfiles WHERE wholedigest=?", wholeDigest.String()) - if err != nil { - return - } - defer rs.Close() - - ref := "" - for rs.Next() { - if err := rs.Scan(&ref); err != nil { - return nil, err - } - files = append(files, blobref.Parse(ref)) - } - return -} - -func (mi *Indexer) GetFileInfo(fileRef *blobref.BlobRef) (*search.FileInfo, error) { - rs, err := mi.db.Query("SELECT size, filename, mime FROM bytesfiles WHERE schemaref=?", - fileRef.String()) - if err != nil { - return nil, err - } - defer rs.Close() - if !rs.Next() { - return nil, os.ENOENT - } - var fi search.FileInfo - err = rs.Scan(&fi.Size, &fi.FileName, &fi.MimeType) - return &fi, err -} - -func (mi *Indexer) keyIdOfSigner(signer *blobref.BlobRef) (keyid string, err error) { - rs, err := mi.db.Query("SELECT keyid FROM signerkeyid WHERE blobref=?", signer.String()) - if err != nil { - return - } - defer rs.Close() - - if !rs.Next() { - return "", fmt.Errorf("mysqlindexer: failed to find keyid of signer %q", signer.String()) - } - err = rs.Scan(&keyid) - return -} - -func (mi *Indexer) PermanodeOfSignerAttrValue(signer *blobref.BlobRef, attr, val string) (permanode *blobref.BlobRef, err error) { - keyId, err := mi.keyIdOfSigner(signer) - if err != nil { - return nil, err - } - - rs, err := mi.db.Query("SELECT permanode FROM signerattrvalue WHERE keyid=? AND attr=? AND value=? ORDER BY claimdate DESC LIMIT 1", - keyId, attr, val) - if err != nil { - return - } - defer rs.Close() - - if !rs.Next() { - return nil, errors.New("mysqlindexer: no signerattrvalue match") - } - var blobstr string - if err = rs.Scan(&blobstr); err != nil { - return - } - return blobref.Parse(blobstr), nil -} - -func (mi *Indexer) PathsOfSignerTarget(signer, target *blobref.BlobRef) (paths []*search.Path, err error) { - keyId, err := mi.keyIdOfSigner(signer) - if err != nil { - return - } - - rs, err := mi.db.Query("SELECT claimref, claimdate, baseref, suffix, active FROM path WHERE keyid=? AND targetref=?", - keyId, target.String()) - if err != nil { - return - } - defer rs.Close() - - mostRecent := make(map[string]*search.Path) - maxClaimDates := make(map[string]string) - var claimRef, claimDate, baseRef, suffix, active string - for rs.Next() { - if err = rs.Scan(&claimRef, &claimDate, &baseRef, &suffix, &active); err != nil { - return - } - - key := baseRef + "/" + suffix - - if claimDate > maxClaimDates[key] { - maxClaimDates[key] = claimDate - if active == "Y" { - mostRecent[key] = &search.Path{ - Claim: blobref.MustParse(claimRef), - ClaimDate: claimDate, - Base: blobref.MustParse(baseRef), - Suffix: suffix, - } - } else { - delete(mostRecent, key) - } - } - } - paths = make([]*search.Path, 0) - for _, v := range mostRecent { - paths = append(paths, v) - } - return paths, nil -} - -func (mi *Indexer) PathLookup(signer, base *blobref.BlobRef, suffix string, at time.Time) (*search.Path, error) { - // TODO: pass along the at time to a new helper function to - // filter? maybe not worth it, since this list should be - // small. - paths, err := mi.PathsLookup(signer, base, suffix) - if err != nil { - return nil, err - } - var ( - newest = int64(0) - atSeconds = int64(0) - best *search.Path - ) - if at != nil { - atSeconds = at.Unix() - } - for _, path := range paths { - t, err := time.Parse(time.RFC3339, trimRFC3339Subseconds(path.ClaimDate)) - if err != nil { - continue - } - secs := t.Unix() - if atSeconds != 0 && secs > atSeconds { - // Too new - continue - } - if newest > secs { - // Too old - continue - } - // Just right - newest, best = secs, path - } - if best == nil { - return nil, os.ENOENT - } - return best, nil -} - -func (mi *Indexer) PathsLookup(signer, base *blobref.BlobRef, suffix string) (paths []*search.Path, err error) { - keyId, err := mi.keyIdOfSigner(signer) - if err != nil { - return - } - rs, err := mi.db.Query("SELECT claimref, claimdate, targetref FROM path "+ - "WHERE keyid=? AND baseref=? AND suffix=?", - keyId, base.String(), suffix) - if err != nil { - return - } - defer rs.Close() - - var claimref, claimdate, targetref string - for rs.Next() { - if err = rs.Scan(&claimref, &claimdate, &targetref); err != nil { - return - } - t, err := time.Parse(time.RFC3339, trimRFC3339Subseconds(claimdate)) - if err != nil { - log.Printf("Skipping bogus path row with bad time: %q", claimref) - continue - } - _ = t // TODO: use this? - paths = append(paths, &search.Path{ - Claim: blobref.Parse(claimref), - ClaimDate: claimdate, - Base: base, - Target: blobref.Parse(targetref), - Suffix: suffix, - }) - } - return -} diff --git a/pkg/mysqlindexer/stat.go b/pkg/mysqlindexer/stat.go deleted file mode 100644 index 4db3192e1..000000000 --- a/pkg/mysqlindexer/stat.go +++ /dev/null @@ -1,40 +0,0 @@ -/* -Copyright 2011 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -nYou may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package mysqlindexer - -import ( - "camlistore.org/pkg/blobref" - - "fmt" - "strings" -) - -func (mi *Indexer) StatBlobs(dest chan<- blobref.SizedBlobRef, blobs []*blobref.BlobRef, waitSeconds int) error { - quotedBlobRefs := []string{} - for _, br := range blobs { - quotedBlobRefs = append(quotedBlobRefs, fmt.Sprintf("%q", br.String())) - } - sql := "SELECT blobref, size FROM blobs WHERE blobref IN (" + - strings.Join(quotedBlobRefs, ", ") + ")" - - rs, err := mi.db.Query(sql) - if err != nil { - return err - } - defer rs.Close() - return readBlobRefSizeResults(dest, rs) -}