Start of indexer indexing file schema's content digests.

This commit is contained in:
Brad Fitzpatrick 2011-06-08 17:49:31 -07:00
parent a4c43c359f
commit fa3715134a
4 changed files with 44 additions and 11 deletions

View File

@ -1 +1 @@
6g version weekly.2011-06-02 8660+
6g version weekly.2011-06-02 8682

View File

@ -121,10 +121,11 @@
"/indexer/": {
"handler": "storage-mysqlindexer",
"handlerArgs": {
"database": "devcamlistore",
"user": "root",
"password": "root",
"host": "127.0.0.1"
"database": "devcamlistore",
"user": "root",
"password": "root",
"host": "127.0.0.1",
"blobSource": "/bs/"
}
},

View File

@ -39,11 +39,16 @@ type Indexer struct {
KeyFetcher blobref.StreamingFetcher // for verifying claims
OwnerBlobRef *blobref.BlobRef
// Used for fetching blobs to find the complete sha1 of schema
// blobs.
BlobSource blobserver.Storage
clientLock sync.Mutex
cachedClients []*mysql.Client
}
func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, os.Error) {
func newFromConfig(ld blobserver.Loader, config jsonconfig.Obj) (blobserver.Storage, os.Error) {
blobPrefix := config.RequiredString("blobSource")
indexer := &Indexer{
SimpleBlobHubPartitionMap: &blobserver.SimpleBlobHubPartitionMap{},
Host: config.OptionalString("host", "localhost"),
@ -55,6 +60,12 @@ func newFromConfig(_ blobserver.Loader, config jsonconfig.Obj) (blobserver.Stora
return nil, err
}
sto, err := ld.GetStorage(blobPrefix)
if err != nil {
return nil, err
}
indexer.BlobSource = sto
//ownerBlobRef = client.SignerPublicKeyBlobref()
//if ownerBlobRef == nil {
// log.Fatalf("Public key not configured.")

View File

@ -17,17 +17,18 @@ limitations under the License.
package mysqlindexer
import (
"camli/blobref"
"camli/blobserver"
"camli/magic"
"camli/schema"
"crypto/sha1"
"io"
"json"
"log"
"os"
mysql "camli/third_party/github.com/Philio/GoMySQL"
"camli/blobref"
"camli/blobserver"
"camli/magic"
"camli/schema"
)
const maxSniffSize = 1024 * 16
@ -140,6 +141,10 @@ func (mi *Indexer) ReceiveBlob(blobRef *blobref.BlobRef, source io.Reader) (rets
if err = populatePermanode(client, blobRef, camli); err != nil {
return
}
case "file":
if err = mi.populateFile(client, blobRef, camli); err != nil {
return
}
}
}
@ -202,3 +207,19 @@ func populatePermanode(client *mysql.Client, blobRef *blobref.BlobRef, camli *sc
blobRef.String(), camli.Signer)
return
}
func (mi *Indexer) populateFile(client *mysql.Client, blobRef *blobref.BlobRef, ss *schema.Superset) (err os.Error) {
seekFetcher, err := blobref.SeekerFromStreamingFetcher(mi.BlobSource)
if err != nil {
return err
}
sha1 := sha1.New()
fr := ss.NewFileReader(seekFetcher)
n, err := io.Copy(sha1, fr)
if err != nil {
return err
}
log.Printf("file %s blobref is %s, size %d", blobRef, blobref.FromHash("sha1", sha1), n)
return nil
}