Merge "camput: have cache and stat cache based on github.com/cznic/kv"

This commit is contained in:
Brad Fitzpatrick 2013-08-31 07:36:22 +00:00 committed by Gerrit Code Review
commit 1da2e15f80
6 changed files with 385 additions and 659 deletions

View File

@ -27,6 +27,7 @@ import (
type HaveCache interface {
StatBlobCache(br blob.Ref) (size int64, ok bool)
NoteBlobExists(br blob.Ref, size int64)
Close() error
}
// UploadCache is the "stat cache" for regular files. Given a current
@ -45,4 +46,5 @@ type UploadCache interface {
// for this file when it was uploaded (with -filenodes), and the cache entry
// will reflect that.
AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult, withPermanode bool)
Close() error
}

View File

@ -209,6 +209,12 @@ func (c *fileCmd) RunCommand(args []string) error {
if len(args) == 0 {
return cmdmain.UsageError("No files or directories given.")
}
if c.havecache {
defer up.haveCache.Close()
}
if c.statcache {
defer up.statCache.Close()
}
for _, filename := range args {
fi, err := os.Stat(filename)
if err != nil {
@ -266,23 +272,11 @@ func (c *fileCmd) initCaches(up *Uploader) {
return
}
if c.statcache {
var cache UploadCache
if flagUseSQLiteChildCache {
cache = NewSQLiteStatCache(gen)
} else {
cache = NewFlatStatCache(gen)
}
up.statCache = cache
up.statCache = NewKvStatCache(gen)
}
if c.havecache {
var cache HaveCache
if flagUseSQLiteChildCache {
cache = NewSQLiteHaveCache(gen)
} else {
cache = NewFlatHaveCache(gen)
}
up.haveCache = cache
up.Client.SetHaveCache(cache)
up.haveCache = NewKvHaveCache(gen)
up.Client.SetHaveCache(up.haveCache)
}
}

View File

@ -1,269 +0,0 @@
/*
Copyright 2011 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bufio"
"bytes"
"errors"
"fmt"
"hash/crc32"
"io"
"log"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/client"
"camlistore.org/pkg/osutil"
)
type statFingerprint string
var cleanSysStat func(v interface{}) interface{}
func fileInfoToFingerprint(fi os.FileInfo) statFingerprint {
// We calculate the CRC32 of the underlying system stat structure to get
// ctime, owner, group, etc. This is overkill (e.g. we don't care about
// the inode or device number probably), but works.
sysHash := uint32(0)
if sys := fi.Sys(); sys != nil {
if clean := cleanSysStat; clean != nil {
// TODO: don't clean bad fields, but provide a
// portable way to extract all good fields.
// This is a Linux+Mac-specific hack for now.
sys = clean(sys)
}
var buf bytes.Buffer
fmt.Fprintf(&buf, "%#v", sys)
sysHash = crc32.ChecksumIEEE(buf.Bytes())
}
return statFingerprint(fmt.Sprintf("%dB/%dMOD/sys-%d", fi.Size(), fi.ModTime().UnixNano(), sysHash))
}
type fileInfoPutRes struct {
Fingerprint statFingerprint
Result client.PutResult
}
// FlatStatCache is an ugly hack, until leveldb-go is ready
// (http://code.google.com/p/leveldb-go/)
type FlatStatCache struct {
mu sync.RWMutex
filename string
m map[string]fileInfoPutRes
af *os.File // for appending
}
func escapeGen(gen string) string {
// Good enough:
return url.QueryEscape(gen)
}
func NewFlatStatCache(gen string) *FlatStatCache {
filename := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen))
fc := &FlatStatCache{
filename: filename,
m: make(map[string]fileInfoPutRes),
}
f, err := os.Open(filename)
if os.IsNotExist(err) {
return fc
}
if err != nil {
log.Fatalf("opening camput stat cache: %v", filename, err)
}
defer f.Close()
br := bufio.NewReader(f)
for {
ln, err := br.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
log.Printf("Warning: (ignoring) reading stat cache: %v", err)
break
}
ln = strings.TrimSpace(ln)
f := strings.Split(ln, "\t")
if len(f) < 3 {
continue
}
filename, fp, putres := f[0], statFingerprint(f[1]), f[2]
f = strings.Split(putres, "/")
if len(f) != 2 {
continue
}
blobrefStr := f[0]
blobSize, err := strconv.ParseInt(f[1], 10, 64)
if err != nil {
continue
}
fc.m[filename] = fileInfoPutRes{
Fingerprint: fp,
Result: client.PutResult{
BlobRef: blob.ParseOrZero(blobrefStr),
Size: blobSize,
Skipped: true, // is this used?
},
}
}
vlog.Printf("Flatcache read %d entries from %s", len(fc.m), filename)
return fc
}
var _ UploadCache = (*FlatStatCache)(nil)
var errCacheMiss = errors.New("not in cache")
// flatCacheKey returns the key used for a stat entry in the flat cache.
// It is the cleaned absolute path of joining pwd and filename, to which
// "|Perm" is appended if -filenodes is being used.
func flatCacheKey(pwd, filename string, withPermanode bool) string {
var fullPath string
if filepath.IsAbs(filename) {
fullPath = filepath.Clean(filename)
} else {
fullPath = filepath.Join(pwd, filename)
}
if withPermanode {
return fmt.Sprintf("%v|Perm", fullPath)
}
return fullPath
}
func (c *FlatStatCache) CachedPutResult(pwd, filename string, fi os.FileInfo, withPermanode bool) (*client.PutResult, error) {
c.mu.RLock()
defer c.mu.RUnlock()
fp := fileInfoToFingerprint(fi)
key := flatCacheKey(pwd, filename, withPermanode)
val, ok := c.m[key]
if !ok {
cachelog.Printf("cache MISS on %q: not in cache", key)
return nil, errCacheMiss
}
if val.Fingerprint != fp {
cachelog.Printf("cache MISS on %q: stats not equal:\n%#v\n%#v", key, val.Fingerprint, fp)
return nil, errCacheMiss
}
pr := val.Result
return &pr, nil
}
func (c *FlatStatCache) AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult, withPermanode bool) {
c.mu.Lock()
defer c.mu.Unlock()
key := flatCacheKey(pwd, filename, withPermanode)
val := fileInfoPutRes{fileInfoToFingerprint(fi), *pr}
cachelog.Printf("Adding to stat cache %q: %v", key, val)
c.m[key] = val
if c.af == nil {
var err error
c.af, err = os.OpenFile(c.filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Printf("opening stat cache for append: %v", err)
return
}
}
// TODO: flocking. see leveldb-go.
c.af.Seek(0, os.SEEK_END)
c.af.Write([]byte(fmt.Sprintf("%s\t%s\t%s/%d\n", key, val.Fingerprint, val.Result.BlobRef.String(), val.Result.Size)))
}
type FlatHaveCache struct {
mu sync.RWMutex
filename string
m map[string]int64 // blobref string -> size
af *os.File // appending file
}
func NewFlatHaveCache(gen string) *FlatHaveCache {
filename := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen))
c := &FlatHaveCache{
filename: filename,
m: make(map[string]int64),
}
f, err := os.Open(filename)
if os.IsNotExist(err) {
return c
}
if err != nil {
log.Fatalf("opening camput have-cache: %v", filename, err)
}
br := bufio.NewReader(f)
for {
ln, err := br.ReadString('\n')
if err == io.EOF {
break
}
if err != nil {
log.Printf("Warning: (ignoring) reading have-cache: %v", err)
break
}
f := strings.Fields(strings.TrimSpace(ln))
if len(f) == 2 {
br, sizea := f[0], f[1]
if size, err := strconv.ParseInt(sizea, 10, 64); err == nil && size >= 0 {
c.m[br] = size
}
}
}
return c
}
func (c *FlatHaveCache) StatBlobCache(br blob.Ref) (size int64, ok bool) {
c.mu.RLock()
defer c.mu.RUnlock()
size, ok = c.m[br.String()]
return
}
func (c *FlatHaveCache) NoteBlobExists(br blob.Ref, size int64) {
c.mu.Lock()
defer c.mu.Unlock()
if size < 0 {
panic("negative size")
}
k := br.String()
if c.m[k] == size {
// dup
return
}
c.m[k] = size
if c.af == nil {
var err error
c.af, err = os.OpenFile(c.filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600)
if err != nil {
log.Printf("opening have-cache for append: %v", err)
return
}
}
// TODO: flocking. see leveldb-go.
c.af.Seek(0, os.SEEK_END)
c.af.Write([]byte(fmt.Sprintf("%s %d\n", k, size)))
}

328
cmd/camput/kvcache.go Normal file
View File

@ -0,0 +1,328 @@
/*
Copyright 2013 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"hash/crc32"
"log"
"net/url"
"os"
"path/filepath"
"strconv"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/client"
"camlistore.org/pkg/kvutil"
"camlistore.org/pkg/osutil"
"camlistore.org/third_party/github.com/cznic/kv"
)
var errCacheMiss = errors.New("not in cache")
// KvHaveCache is a HaveCache on top of a single
// mutable database file on disk using github.com/cznic/kv.
// It stores the blobref in binary as the key, and
// the blobsize in binary as the value.
// Access to the cache is restricted to one process
// at a time with a lock file. Close should be called
// to remove the lock.
type KvHaveCache struct {
filename string
db *kv.DB
}
func NewKvHaveCache(gen string) *KvHaveCache {
fullPath := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen)+".kv")
db, err := kvutil.Open(fullPath, nil)
if err != nil {
log.Fatalf("Could not create/open new have cache at %v, %v", fullPath, err)
}
return &KvHaveCache{
filename: fullPath,
db: db,
}
}
// Close should be called to commit all the writes
// to the db and to unlock the file.
func (c *KvHaveCache) Close() error {
return c.db.Close()
}
func (c *KvHaveCache) StatBlobCache(br blob.Ref) (size int64, ok bool) {
if !br.Valid() {
return
}
binBr, _ := br.MarshalBinary()
binVal, err := c.db.Get(nil, binBr)
if err != nil {
log.Fatalf("Could not query have cache %v for %v: %v", c.filename, br, err)
}
if binVal == nil {
cachelog.Printf("have cache MISS on %v", br)
return
}
val, err := strconv.Atoi(string(binVal))
if err != nil {
log.Fatalf("Could not decode have cache binary value for %v: %v", br, err)
}
cachelog.Printf("have cache HIT on %v", br)
return int64(val), true
}
func (c *KvHaveCache) NoteBlobExists(br blob.Ref, size int64) {
if !br.Valid() {
return
}
if size < 0 {
log.Fatalf("Got a negative blob size to note in have cache for %v", br)
}
binBr, _ := br.MarshalBinary()
binVal := []byte(strconv.Itoa(int(size)))
cachelog.Printf("Adding to have cache %v: %q", br, binVal)
_, _, err := c.db.Put(nil, binBr,
func(binBr, old []byte) ([]byte, bool, error) {
// We do not overwrite dups
if old != nil {
return nil, false, nil
}
return binVal, true, nil
})
if err != nil {
log.Fatalf("Could not write %v in have cache: %v", br, err)
}
}
// KvStatCache is an UploadCache on top of a single
// mutable database file on disk using github.com/cznic/kv.
// It stores a binary combination of an os.FileInfo fingerprint and
// a client.Putresult as the key, and the blobsize in binary as
// the value.
// Access to the cache is restricted to one process
// at a time with a lock file. Close should be called
// to remove the lock.
type KvStatCache struct {
filename string
db *kv.DB
}
func NewKvStatCache(gen string) *KvStatCache {
fullPath := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen)+".kv")
db, err := kvutil.Open(fullPath, nil)
if err != nil {
log.Fatalf("Could not create/open new stat cache at %v, %v", fullPath, err)
}
return &KvStatCache{
filename: fullPath,
db: db,
}
}
// Close should be called to commit all the writes
// to the db and to unlock the file.
func (c *KvStatCache) Close() error {
return c.db.Close()
}
func (c *KvStatCache) CachedPutResult(pwd, filename string, fi os.FileInfo, withPermanode bool) (*client.PutResult, error) {
fullPath := fullpath(pwd, filename)
cacheKey := &statCacheKey{
Filepath: fullPath,
Permanode: withPermanode,
}
binKey, err := cacheKey.marshalBinary()
binVal, err := c.db.Get(nil, binKey)
if err != nil {
log.Fatalf("Could not query stat cache %v for %q: %v", binKey, fullPath, err)
}
if binVal == nil {
cachelog.Printf("stat cache MISS on %q", binKey)
return nil, errCacheMiss
}
val := &statCacheValue{}
if err = val.unmarshalBinary(binVal); err != nil {
return nil, fmt.Errorf("Bogus stat cached value for %q: %v", binKey, err)
}
fp := fileInfoToFingerprint(fi)
if val.Fingerprint != fp {
cachelog.Printf("cache MISS on %q: stats not equal:\n%#v\n%#v", binKey, val.Fingerprint, fp)
return nil, errCacheMiss
}
cachelog.Printf("stat cache HIT on %q", binKey)
return &(val.Result), nil
}
func (c *KvStatCache) AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult, withPermanode bool) {
fullPath := fullpath(pwd, filename)
cacheKey := &statCacheKey{
Filepath: fullPath,
Permanode: withPermanode,
}
val := &statCacheValue{fileInfoToFingerprint(fi), *pr}
binKey, err := cacheKey.marshalBinary()
if err != nil {
log.Fatalf("Could not add %q to stat cache: %v", binKey, err)
}
binVal, err := val.marshalBinary()
if err != nil {
log.Fatalf("Could not add %q to stat cache: %v", binKey, err)
}
cachelog.Printf("Adding to stat cache %q: %q", binKey, binVal)
_, _, err = c.db.Put(nil, binKey,
func(binKey, old []byte) ([]byte, bool, error) {
// We do not overwrite dups
if old != nil {
return nil, false, nil
}
return binVal, true, nil
})
if err != nil {
log.Fatalf("Could not add %q to stat cache: %v", binKey, err)
}
}
type statCacheKey struct {
Filepath string
Permanode bool // whether -filenodes is being used.
}
// marshalBinary returns a more compact binary
// representation of the contents of sk.
func (sk *statCacheKey) marshalBinary() ([]byte, error) {
if sk == nil {
return nil, errors.New("Can not marshal from a nil stat cache key")
}
data := make([]byte, 0, len(sk.Filepath)+3)
data = append(data, 1) // version number
data = append(data, sk.Filepath...)
data = append(data, '|')
if sk.Permanode {
data = append(data, 1)
}
return data, nil
}
type statFingerprint string
type statCacheValue struct {
Fingerprint statFingerprint
Result client.PutResult
}
// marshalBinary returns a more compact binary
// representation of the contents of scv.
func (scv *statCacheValue) marshalBinary() ([]byte, error) {
if scv == nil {
return nil, errors.New("Can not marshal from a nil stat cache value")
}
binBr, _ := scv.Result.BlobRef.MarshalBinary()
// Blob size fits on 4 bytes when binary encoded
data := make([]byte, 0, len(scv.Fingerprint)+1+4+1+len(binBr))
buf := bytes.NewBuffer(data)
_, err := buf.WriteString(string(scv.Fingerprint))
if err != nil {
return nil, fmt.Errorf("Could not write fingerprint %v: %v", scv.Fingerprint, err)
}
err = buf.WriteByte('|')
if err != nil {
return nil, fmt.Errorf("Could not write '|': %v", err)
}
err = binary.Write(buf, binary.BigEndian, int32(scv.Result.Size))
if err != nil {
return nil, fmt.Errorf("Could not write blob size %d: %v", scv.Result.Size, err)
}
err = buf.WriteByte('|')
if err != nil {
return nil, fmt.Errorf("Could not write '|': %v", err)
}
_, err = buf.Write(binBr)
if err != nil {
return nil, fmt.Errorf("Could not write binary blobref %q: %v", binBr, err)
}
return buf.Bytes(), nil
}
func (scv *statCacheValue) unmarshalBinary(data []byte) error {
if scv == nil {
return errors.New("Can't unmarshalBinary into a nil stat cache value")
}
if scv.Fingerprint != "" {
return errors.New("Can't unmarshalBinary into a non empty stat cache value")
}
parts := bytes.SplitN(data, []byte("|"), 3)
fingerprint := string(parts[0])
buf := bytes.NewReader(parts[1])
var size int32
err := binary.Read(buf, binary.BigEndian, &size)
if err != nil {
return fmt.Errorf("Could not decode blob size from stat cache value part %q: %v", parts[1], err)
}
br := new(blob.Ref)
if err := br.UnmarshalBinary(parts[2]); err != nil {
return fmt.Errorf("Could not unmarshalBinary for %q: %v", parts[2], err)
}
scv.Fingerprint = statFingerprint(fingerprint)
scv.Result = client.PutResult{
BlobRef: *br,
Size: int64(size),
Skipped: true,
}
return nil
}
func fullpath(pwd, filename string) string {
var fullPath string
if filepath.IsAbs(filename) {
fullPath = filepath.Clean(filename)
} else {
fullPath = filepath.Join(pwd, filename)
}
return fullPath
}
func escapeGen(gen string) string {
// Good enough:
return url.QueryEscape(gen)
}
var cleanSysStat func(v interface{}) interface{}
func fileInfoToFingerprint(fi os.FileInfo) statFingerprint {
// We calculate the CRC32 of the underlying system stat structure to get
// ctime, owner, group, etc. This is overkill (e.g. we don't care about
// the inode or device number probably), but works.
sysHash := uint32(0)
if sys := fi.Sys(); sys != nil {
if clean := cleanSysStat; clean != nil {
// TODO: don't clean bad fields, but provide a
// portable way to extract all good fields.
// This is a Linux+Mac-specific hack for now.
sys = clean(sys)
}
c32 := crc32.NewIEEE()
fmt.Fprintf(c32, "%#v", sys)
sysHash = c32.Sum32()
}
return statFingerprint(fmt.Sprintf("%dB/%dMOD/sys-%d", fi.Size(), fi.ModTime().UnixNano(), sysHash))
}

View File

@ -1,375 +0,0 @@
/*
Copyright 2013 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bufio"
"fmt"
"io"
"log"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"sync"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/client"
"camlistore.org/pkg/osutil"
)
const (
cmdName = "sqlite3"
noResult = "no row"
haveTableName = "have"
statTableName = "stat"
testTable = `.tables`
createHaveTable = `CREATE TABLE ` + haveTableName +
` (blobref VARCHAR(255) NOT NULL PRIMARY KEY,size INT)`
createStatTable = `CREATE TABLE ` + statTableName +
` (key TEXT NOT NULL PRIMARY KEY, val TEXT)`
// Because of blocking reads on the output, we want to print
// something even when a query returns no result,
// hence the ugly joins.
// TODO(mpl): there's probably a way to do non blocking reads
// on the stdout pipe of the sqlite process, so we would not
// have to use these ugly requests. Suggestion?
blobSizeQuery = `SELECT COALESCE(size, fake.filler) as size
FROM (SELECT '` + noResult + `' AS [filler]) fake
LEFT JOIN ` + haveTableName +
` ON blobref = `
statKeyQuery = `SELECT COALESCE(val, fake.filler) as val
FROM (SELECT '` + noResult + `' AS [filler]) fake
LEFT JOIN ` + statTableName +
` ON key = `
noteHaveStmt = `INSERT INTO ` + haveTableName +
` VALUES ('?1', ?2)` + ";\n"
noteStatStmt = `INSERT INTO ` + statTableName +
` VALUES ('?1', '?2')` + ";\n"
keyNotUnique = "column key is not unique\n"
brefNotUnique = "column blobref is not unique\n"
)
func checkCmdInstalled() {
_, err := exec.LookPath(cmdName)
if err != nil {
hint := `The binary is not in your $PATH or most likely not installed.` +
` On debian based distributions, it is usually provided by the sqlite3 package.`
log.Fatalf("%v command could not be found: %v\n"+hint, cmdName, err)
}
}
type childInfo struct {
r *bufio.Reader // to read the child's stdout
w io.WriteCloser // to write to the child's stdin
proc *os.Process
er *bufio.Reader // to read the child's stderr
}
func startChild(filename string) (*childInfo, error) {
cmdPath, err := exec.LookPath(cmdName)
if err != nil {
return nil, err
}
pr1, pw1, err := os.Pipe()
if err != nil {
return nil, err
}
pr2, pw2, err := os.Pipe()
if err != nil {
return nil, err
}
pr3, pw3, err := os.Pipe()
if err != nil {
return nil, err
}
args := []string{cmdPath, filename}
fds := []*os.File{pr1, pw2, pw3}
p, err := os.StartProcess(cmdPath, args, &os.ProcAttr{Dir: "/", Files: fds})
if err != nil {
return nil, err
}
return &childInfo{
r: bufio.NewReader(pr2),
w: pw1,
proc: p,
er: bufio.NewReader(pr3),
}, nil
}
// SQLiteStatCache is an UploadCache based on sqlite.
// sqlite3 is called as a child process so we can still
// cross-compile static ARM binaries for Android, and
// use the android system sqlite, rather than having to
// include a big copy of the sqlite libs.
// It stores rows with (key, value) pairs, where
// key = filepath|statFingerprint and
// value = PutResult.BlobRef.String()|PutResult.Size
type SQLiteStatCache struct {
filename string
proc *os.Process
mu sync.Mutex // Guards reads and writes to sqlite.
r *bufio.Reader // where to read the output from the sqlite process
w io.WriteCloser // where to write queries/statements to the sqlite process
}
func NewSQLiteStatCache(gen string) *SQLiteStatCache {
checkCmdInstalled()
filename := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen)+".db")
out, err := exec.Command(cmdName, filename, testTable).Output()
if err != nil {
log.Fatalf("Failed to test for %v table existence: %v", statTableName, err)
}
if len(out) == 0 {
// file or table does not exist
err = exec.Command(cmdName, filename, createStatTable).Run()
if err != nil {
log.Fatalf("Failed to create %v table for stat cache: %v", statTableName, err)
}
} else {
if string(out) != statTableName+"\n" {
log.Fatalf("Wrong table name for stat cache; was expecting %v, got %q",
haveTableName, out)
}
}
return &SQLiteStatCache{
filename: filename,
}
}
func (c *SQLiteStatCache) startSQLiteChild() error {
if c.proc != nil {
return nil
}
ci, err := startChild(c.filename)
if err != nil {
return err
}
go func() {
for {
errStr, err := ci.er.ReadString('\n')
if err != nil {
log.Fatal(err)
}
if !strings.HasSuffix(errStr, keyNotUnique) {
log.Fatalf("Error on stat cache: %v", errStr)
}
}
}()
c.r = ci.r
c.w = ci.w
c.proc = ci.proc
return nil
}
// sqliteCacheKey returns the key used for a stat entry in the sqlite cache.
// It is the cleaned absolute path of joining pwd and filename,
// concatenated with a fingerprint based on the file's info. If
// -filenodes is being used, the suffix "|Perm" is also appended.
func sqliteCacheKey(pwd, filename string, fi os.FileInfo, withPermanode bool) string {
var fullPath string
if filepath.IsAbs(filename) {
fullPath = filepath.Clean(filename)
} else {
fullPath = filepath.Join(pwd, filename)
}
key := fmt.Sprintf("%v|%v", fullPath, string(fileInfoToFingerprint(fi)))
if withPermanode {
return fmt.Sprintf("%v|Perm", key)
}
return key
}
func (c *SQLiteStatCache) CachedPutResult(pwd, filename string, fi os.FileInfo, withPermanode bool) (*client.PutResult, error) {
key := sqliteCacheKey(pwd, filename, fi, withPermanode)
query := fmt.Sprintf("%v'%v';\n", statKeyQuery, key)
c.mu.Lock()
err := c.startSQLiteChild()
if err != nil {
c.mu.Unlock()
return nil, fmt.Errorf("Could not start sqlite child process: %v", err)
}
_, err = c.w.Write([]byte(query))
if err != nil {
c.mu.Unlock()
return nil, fmt.Errorf("failed to query stat cache: %v", err)
}
out, err := c.r.ReadString('\n')
if err != nil {
c.mu.Unlock()
return nil, fmt.Errorf("failed to read stat cache query result: %v", err)
}
out = strings.TrimRight(out, "\n")
c.mu.Unlock()
if out == noResult {
return nil, errCacheMiss
}
fields := strings.Split(out, "|")
if len(fields) > 2 {
return nil, fmt.Errorf("Invalid stat cache value; was expecting \"bref|size\", got %q", out)
}
br, ok := blob.Parse(fields[0])
if !ok {
return nil, fmt.Errorf("Invalid blobref in stat cache: %q", fields[0])
}
blobSize, err := strconv.ParseInt(fields[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("Invalid blob size %q in stat cache: %v", fields[1], err)
}
return &client.PutResult{
BlobRef: br,
Size: blobSize,
Skipped: true,
}, nil
}
func (c *SQLiteStatCache) AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult, withPermanode bool) {
key := sqliteCacheKey(pwd, filename, fi, withPermanode)
val := pr.BlobRef.String() + "|" + strconv.FormatInt(pr.Size, 10)
repl := strings.NewReplacer("?1", key, "?2", val)
query := repl.Replace(noteStatStmt)
c.mu.Lock()
defer c.mu.Unlock()
err := c.startSQLiteChild()
if err != nil {
log.Fatalf("Could not start sqlite child process: %v", err)
}
cachelog.Printf("Adding to stat cache %v: %v", key, val)
_, err = c.w.Write([]byte(query))
if err != nil {
log.Fatalf("failed to write to stat cache: %v", err)
}
}
// SQLiteHacheCache is a HaveCache based on sqlite.
// sqlite3 is called as a child process so we can still
// cross-compile static ARM binaries for Android, and
// use the android system sqlite, rather than having to
// include a big copy of the sqlite libs.
// It stores rows with (key,value) pairs, where
// key = blobref and
// value = blobsize
type SQLiteHaveCache struct {
filename string
proc *os.Process
mu sync.Mutex // Guards reads and writes to sqlite.
r *bufio.Reader // where to read the output from the sqlite process
w io.WriteCloser // where to write queries/statements to the sqlite process
}
func NewSQLiteHaveCache(gen string) *SQLiteHaveCache {
checkCmdInstalled()
filename := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen)+".db")
out, err := exec.Command(cmdName, filename, testTable).Output()
if err != nil {
log.Fatalf("Failed to test for %v table existence: %v", haveTableName, err)
}
if len(out) == 0 {
// file or table does not exist
err = exec.Command(cmdName, filename, createHaveTable).Run()
if err != nil {
log.Fatalf("Failed to create %v table for have cache: %v", haveTableName, err)
}
} else {
if string(out) != haveTableName+"\n" {
log.Fatalf("Wrong table name for have cache; was expecting %v, got %q",
haveTableName, out)
}
}
return &SQLiteHaveCache{
filename: filename,
}
}
func (c *SQLiteHaveCache) startSQLiteChild() error {
if c.proc != nil {
return nil
}
ci, err := startChild(c.filename)
if err != nil {
return err
}
go func() {
for {
errStr, err := ci.er.ReadString('\n')
if err != nil {
log.Fatal(err)
}
if !strings.HasSuffix(errStr, brefNotUnique) {
log.Fatalf("Error on have cache: %v", errStr)
}
}
}()
c.r = ci.r
c.w = ci.w
c.proc = ci.proc
return nil
}
func (c *SQLiteHaveCache) StatBlobCache(br blob.Ref) (size int64, ok bool) {
if !br.Valid() {
return
}
// TODO(mpl): is it enough that we know it's a valid blobref to avoid any injection risk ?
query := blobSizeQuery + fmt.Sprintf("'%v';\n", br.String())
c.mu.Lock()
defer c.mu.Unlock()
err := c.startSQLiteChild()
if err != nil {
log.Fatalf("Could not start sqlite child process: %v", err)
}
_, err = c.w.Write([]byte(query))
if err != nil {
log.Fatalf("failed to query have cache: %v", err)
}
out, err := c.r.ReadString('\n')
if err != nil {
log.Fatalf("failed to read have cache query result: %v", err)
}
out = strings.TrimRight(out, "\n")
if out == noResult {
return
}
size, err = strconv.ParseInt(out, 10, 64)
if err != nil {
log.Fatalf("Bogus blob size in %v table: %v", haveTableName, err)
}
return size, true
}
func (c *SQLiteHaveCache) NoteBlobExists(br blob.Ref, size int64) {
if size < 0 {
log.Fatalf("Got a negative blob size to note in have cache")
}
if !br.Valid() {
return
}
repl := strings.NewReplacer("?1", br.String(), "?2", fmt.Sprint(size))
query := repl.Replace(noteHaveStmt)
c.mu.Lock()
defer c.mu.Unlock()
err := c.startSQLiteChild()
if err != nil {
log.Fatalf("Could not start sqlite child process: %v", err)
}
_, err = c.w.Write([]byte(query))
if err != nil {
log.Fatalf("failed to write to have cache: %v", err)
}
}

46
pkg/kvutil/kvutil.go Normal file
View File

@ -0,0 +1,46 @@
/*
Copyright 2013 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package kvutil contains helpers related to
// github.com/cznic/kv.
package kvutil
import (
"io"
"os"
"camlistore.org/third_party/github.com/camlistore/lock"
"camlistore.org/third_party/github.com/cznic/kv"
)
// Open opens the named kv DB file for reading/writing. It
// creates the file if it does not exist yet.
func Open(filePath string, opts *kv.Options) (*kv.DB, error) {
// TODO(mpl): use it in index pkg and such
createOpen := kv.Open
if _, err := os.Stat(filePath); os.IsNotExist(err) {
createOpen = kv.Create
}
if opts == nil {
opts = &kv.Options{}
}
if opts.Locker == nil {
opts.Locker = func(fullPath string) (io.Closer, error) {
return lock.Lock(filePath + ".lock")
}
}
return createOpen(filePath, opts)
}