From 893b5b2f22c3a1c1c461e72782c4c4134801430f Mon Sep 17 00:00:00 2001 From: mpl Date: Wed, 1 May 2013 18:42:26 +0200 Subject: [PATCH] camput: sqlite backend for have cache and stat cache http://camlistore.org/issue/73 Change-Id: Id6fb95f1980139f2b525321a7e1b553cb11fe727 --- cmd/camput/files.go | 17 +- cmd/camput/sqlitecache.go | 357 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 372 insertions(+), 2 deletions(-) create mode 100644 cmd/camput/sqlitecache.go diff --git a/cmd/camput/files.go b/cmd/camput/files.go index 69d3c2fd2..8c6352bc5 100644 --- a/cmd/camput/files.go +++ b/cmd/camput/files.go @@ -61,6 +61,8 @@ type fileCmd struct { histo string // optional histogram output filename } +var flagUseSQLiteChildCache bool // Use sqlite for the statcache and havecache. + func init() { cmdmain.RegisterCommand("file", func(flags *flag.FlagSet) cmdmain.CommandRunner { cmd := new(fileCmd) @@ -81,6 +83,7 @@ func init() { flags.BoolVar(&cmd.havecache, "havecache", true, "Use the 'have cache', a cache keeping track of what blobs the remote server should already have from previous uploads.") flags.BoolVar(&cmd.memstats, "debug-memstats", false, "Enter debug in-memory mode; collecting stats only. Doesn't upload anything.") flags.StringVar(&cmd.histo, "debug-histogram-file", "", "Optional file to create and write the blob size for each file uploaded. For use with GNU R and hist(read.table(\"filename\")$V1). Requires debug-memstats.") + flags.BoolVar(&flagUseSQLiteChildCache, "sqlitecache", false, "Use sqlite for the statcache and havecache instead of a flat cache.") } else { cmd.havecache = true cmd.statcache = true @@ -260,11 +263,21 @@ func (c *fileCmd) initCaches(up *Uploader) { return } if c.statcache { - cache := NewFlatStatCache(gen) + var cache UploadCache + if flagUseSQLiteChildCache { + cache = NewSQLiteStatCache(gen) + } else { + cache = NewFlatStatCache(gen) + } up.statCache = cache } if c.havecache { - cache := NewFlatHaveCache(gen) + var cache HaveCache + if flagUseSQLiteChildCache { + cache = NewSQLiteHaveCache(gen) + } else { + cache = NewFlatHaveCache(gen) + } up.haveCache = cache up.Client.SetHaveCache(cache) } diff --git a/cmd/camput/sqlitecache.go b/cmd/camput/sqlitecache.go new file mode 100644 index 000000000..d889d7429 --- /dev/null +++ b/cmd/camput/sqlitecache.go @@ -0,0 +1,357 @@ +/* +Copyright 2013 The Camlistore Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bufio" + "fmt" + "io" + "log" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "sync" + + "camlistore.org/pkg/blobref" + "camlistore.org/pkg/client" + "camlistore.org/pkg/osutil" +) + +const ( + cmdName = "sqlite3" + noResult = "no row" + haveTableName = "have" + statTableName = "stat" + testTable = `.tables` + createHaveTable = `CREATE TABLE ` + haveTableName + + ` (blobref VARCHAR(255) NOT NULL PRIMARY KEY,size INT)` + createStatTable = `CREATE TABLE ` + statTableName + + ` (key TEXT NOT NULL PRIMARY KEY, val TEXT)` + // Because of blocking reads on the output, we want to print + // something even when a query returns no result, + // hence the ugly joins. + // TODO(mpl): there's probably a way to do non blocking reads + // on the stdout pipe of the sqlite process, so we would not + // have to use these ugly requests. Suggestion? + blobSizeQuery = `SELECT COALESCE(size, fake.filler) as size + FROM (SELECT '` + noResult + `' AS [filler]) fake + LEFT JOIN ` + haveTableName + + ` ON blobref = ` + statKeyQuery = `SELECT COALESCE(val, fake.filler) as val + FROM (SELECT '` + noResult + `' AS [filler]) fake + LEFT JOIN ` + statTableName + + ` ON key = ` + noteHaveStmt = `INSERT INTO ` + haveTableName + + ` VALUES ('?1', ?2)` + ";\n" + noteStatStmt = `INSERT INTO ` + statTableName + + ` VALUES ('?1', '?2')` + ";\n" + keyNotUnique = "column key is not unique\n" + brefNotUnique = "column blobref is not unique\n" +) + +func checkCmdInstalled() { + _, err := exec.LookPath(cmdName) + if err != nil { + hint := `The binary is not in your $PATH or most likely not installed.` + + ` On debian based distributions, it is usually provided by the sqlite3 package.` + log.Fatalf("%v command could not be found: %v\n"+hint, cmdName, err) + } +} + +type childInfo struct { + r *bufio.Reader // to read the child's stdout + w io.WriteCloser // to write to the child's stdin + proc *os.Process + er *bufio.Reader // to read the child's stderr +} + +func startChild(filename string) (*childInfo, error) { + cmdPath, err := exec.LookPath(cmdName) + if err != nil { + return nil, err + } + pr1, pw1, err := os.Pipe() + if err != nil { + return nil, err + } + pr2, pw2, err := os.Pipe() + if err != nil { + return nil, err + } + pr3, pw3, err := os.Pipe() + if err != nil { + return nil, err + } + args := []string{cmdPath, filename} + fds := []*os.File{pr1, pw2, pw3} + p, err := os.StartProcess(cmdPath, args, &os.ProcAttr{Dir: "/", Files: fds}) + if err != nil { + return nil, err + } + return &childInfo{ + r: bufio.NewReader(pr2), + w: pw1, + proc: p, + er: bufio.NewReader(pr3), + }, nil +} + +// SQLiteStatCache is an UploadCache based on sqlite. +// sqlite3 is called as a child process so we can still +// cross-compile static ARM binaries for Android, and +// use the android system sqlite, rather than having to +// include a big copy of the sqlite libs. +// It stores rows with (key, value) pairs, where +// key = filepath|statFingerprint and +// value = PutResult.BlobRef.String()|PutResult.Size +type SQLiteStatCache struct { + filename string + proc *os.Process + mu sync.Mutex // Guards reads and writes to sqlite. + r *bufio.Reader // where to read the output from the sqlite process + w io.WriteCloser // where to write queries/statements to the sqlite process +} + +func NewSQLiteStatCache(gen string) *SQLiteStatCache { + checkCmdInstalled() + filename := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen)+".db") + out, err := exec.Command(cmdName, filename, testTable).Output() + if err != nil { + log.Fatalf("Failed to test for %v table existence: %v", statTableName, err) + } + if len(out) == 0 { + // file or table does not exist + err = exec.Command(cmdName, filename, createStatTable).Run() + if err != nil { + log.Fatalf("Failed to create %v table for stat cache: %v", statTableName, err) + } + } else { + if string(out) != statTableName+"\n" { + log.Fatalf("Wrong table name for stat cache; was expecting %v, got %q", + haveTableName, out) + } + } + return &SQLiteStatCache{ + filename: filename, + } +} + +func (c *SQLiteStatCache) startSQLiteChild() error { + if c.proc != nil { + return nil + } + ci, err := startChild(c.filename) + if err != nil { + return err + } + go func() { + for { + errStr, err := ci.er.ReadString('\n') + if err != nil { + log.Fatal(err) + } + if !strings.HasSuffix(errStr, keyNotUnique) { + log.Fatalf("Error on stat cache: %v", errStr) + } + } + }() + c.r = ci.r + c.w = ci.w + c.proc = ci.proc + return nil +} + +func (c *SQLiteStatCache) CachedPutResult(pwd, filename string, fi os.FileInfo) (*client.PutResult, error) { + key := cacheKey(pwd, filename) + "|" + string(fileInfoToFingerprint(fi)) + query := fmt.Sprintf("%v'%v';\n", statKeyQuery, key) + c.mu.Lock() + err := c.startSQLiteChild() + if err != nil { + c.mu.Unlock() + return nil, fmt.Errorf("Could not start sqlite child process: %v", err) + } + _, err = c.w.Write([]byte(query)) + if err != nil { + c.mu.Unlock() + return nil, fmt.Errorf("failed to query stat cache: %v", err) + } + out, err := c.r.ReadString('\n') + if err != nil { + c.mu.Unlock() + return nil, fmt.Errorf("failed to read stat cache query result: %v", err) + } + out = strings.TrimRight(out, "\n") + c.mu.Unlock() + + if out == noResult { + return nil, errCacheMiss + } + fields := strings.Split(out, "|") + if len(fields) > 2 { + return nil, fmt.Errorf("Invalid stat cache value; was expecting \"bref|size\", got %q", out) + } + br := blobref.Parse(fields[0]) + if br == nil { + return nil, fmt.Errorf("Invalid blobref in stat cache: %q", fields[0]) + } + blobSize, err := strconv.ParseInt(fields[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("Invalid blob size %q in stat cache: %v", fields[1], err) + } + return &client.PutResult{ + BlobRef: br, + Size: blobSize, + Skipped: true, + }, nil +} + +func (c *SQLiteStatCache) AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult) { + key := cacheKey(pwd, filename) + "|" + string(fileInfoToFingerprint(fi)) + val := pr.BlobRef.String() + "|" + strconv.FormatInt(pr.Size, 10) + repl := strings.NewReplacer("?1", key, "?2", val) + query := repl.Replace(noteStatStmt) + c.mu.Lock() + defer c.mu.Unlock() + err := c.startSQLiteChild() + if err != nil { + log.Fatalf("Could not start sqlite child process: %v", err) + } + cachelog.Printf("Adding to stat cache %v: %v", key, val) + _, err = c.w.Write([]byte(query)) + if err != nil { + log.Fatalf("failed to write to stat cache: %v", err) + } +} + +// SQLiteHacheCache is a HaveCache based on sqlite. +// sqlite3 is called as a child process so we can still +// cross-compile static ARM binaries for Android, and +// use the android system sqlite, rather than having to +// include a big copy of the sqlite libs. +// It stores rows with (key,value) pairs, where +// key = blobref and +// value = blobsize +type SQLiteHaveCache struct { + filename string + proc *os.Process + mu sync.Mutex // Guards reads and writes to sqlite. + r *bufio.Reader // where to read the output from the sqlite process + w io.WriteCloser // where to write queries/statements to the sqlite process +} + +func NewSQLiteHaveCache(gen string) *SQLiteHaveCache { + checkCmdInstalled() + filename := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen)+".db") + out, err := exec.Command(cmdName, filename, testTable).Output() + if err != nil { + log.Fatalf("Failed to test for %v table existence: %v", haveTableName, err) + } + if len(out) == 0 { + // file or table does not exist + err = exec.Command(cmdName, filename, createHaveTable).Run() + if err != nil { + log.Fatalf("Failed to create %v table for have cache: %v", haveTableName, err) + } + } else { + if string(out) != haveTableName+"\n" { + log.Fatalf("Wrong table name for have cache; was expecting %v, got %q", + haveTableName, out) + } + } + return &SQLiteHaveCache{ + filename: filename, + } +} + +func (c *SQLiteHaveCache) startSQLiteChild() error { + if c.proc != nil { + return nil + } + ci, err := startChild(c.filename) + if err != nil { + return err + } + go func() { + for { + errStr, err := ci.er.ReadString('\n') + if err != nil { + log.Fatal(err) + } + if !strings.HasSuffix(errStr, brefNotUnique) { + log.Fatalf("Error on have cache: %v", errStr) + } + } + }() + c.r = ci.r + c.w = ci.w + c.proc = ci.proc + return nil +} + +func (c *SQLiteHaveCache) StatBlobCache(br *blobref.BlobRef) (size int64, ok bool) { + if br == nil { + return + } + // TODO(mpl): is it enough that we know it's a valid blobref to avoid any injection risk ? + query := blobSizeQuery + fmt.Sprintf("'%v';\n", br.String()) + c.mu.Lock() + defer c.mu.Unlock() + err := c.startSQLiteChild() + if err != nil { + log.Fatalf("Could not start sqlite child process: %v", err) + } + _, err = c.w.Write([]byte(query)) + if err != nil { + log.Fatalf("failed to query have cache: %v", err) + } + out, err := c.r.ReadString('\n') + if err != nil { + log.Fatalf("failed to read have cache query result: %v", err) + } + out = strings.TrimRight(out, "\n") + if out == noResult { + return + } + size, err = strconv.ParseInt(out, 10, 64) + if err != nil { + log.Fatalf("Bogus blob size in %v table: %v", haveTableName, err) + } + return size, true +} + +func (c *SQLiteHaveCache) NoteBlobExists(br *blobref.BlobRef, size int64) { + if size < 0 { + log.Fatalf("Got a negative blob size to note in have cache") + } + if br == nil { + return + } + repl := strings.NewReplacer("?1", br.String(), "?2", fmt.Sprint(size)) + query := repl.Replace(noteHaveStmt) + c.mu.Lock() + defer c.mu.Unlock() + err := c.startSQLiteChild() + if err != nil { + log.Fatalf("Could not start sqlite child process: %v", err) + } + _, err = c.w.Write([]byte(query)) + if err != nil { + log.Fatalf("failed to write to have cache: %v", err) + } +}