From 8d71c9bb60c125f13ec9de2cf4de22fea0db778d Mon Sep 17 00:00:00 2001 From: mpl Date: Fri, 30 Aug 2013 15:21:49 +0200 Subject: [PATCH] camput: have cache and stat cache based on github.com/cznic/kv Added pkg/kvutil/kvutil.Open for convenience Change-Id: Ie8db58e2c644a7358f252548bc57cccff7627497 --- cmd/camput/cache.go | 2 + cmd/camput/files.go | 24 +-- cmd/camput/flatcache.go | 269 --------------------------- cmd/camput/kvcache.go | 328 +++++++++++++++++++++++++++++++++ cmd/camput/sqlitecache.go | 375 -------------------------------------- pkg/kvutil/kvutil.go | 46 +++++ 6 files changed, 385 insertions(+), 659 deletions(-) delete mode 100644 cmd/camput/flatcache.go create mode 100644 cmd/camput/kvcache.go delete mode 100644 cmd/camput/sqlitecache.go create mode 100644 pkg/kvutil/kvutil.go diff --git a/cmd/camput/cache.go b/cmd/camput/cache.go index 24062f856..4ca230a8c 100644 --- a/cmd/camput/cache.go +++ b/cmd/camput/cache.go @@ -27,6 +27,7 @@ import ( type HaveCache interface { StatBlobCache(br blob.Ref) (size int64, ok bool) NoteBlobExists(br blob.Ref, size int64) + Close() error } // UploadCache is the "stat cache" for regular files. Given a current @@ -45,4 +46,5 @@ type UploadCache interface { // for this file when it was uploaded (with -filenodes), and the cache entry // will reflect that. AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult, withPermanode bool) + Close() error } diff --git a/cmd/camput/files.go b/cmd/camput/files.go index d040e53c1..7e069eebd 100644 --- a/cmd/camput/files.go +++ b/cmd/camput/files.go @@ -209,6 +209,12 @@ func (c *fileCmd) RunCommand(args []string) error { if len(args) == 0 { return cmdmain.UsageError("No files or directories given.") } + if c.havecache { + defer up.haveCache.Close() + } + if c.statcache { + defer up.statCache.Close() + } for _, filename := range args { fi, err := os.Stat(filename) if err != nil { @@ -266,23 +272,11 @@ func (c *fileCmd) initCaches(up *Uploader) { return } if c.statcache { - var cache UploadCache - if flagUseSQLiteChildCache { - cache = NewSQLiteStatCache(gen) - } else { - cache = NewFlatStatCache(gen) - } - up.statCache = cache + up.statCache = NewKvStatCache(gen) } if c.havecache { - var cache HaveCache - if flagUseSQLiteChildCache { - cache = NewSQLiteHaveCache(gen) - } else { - cache = NewFlatHaveCache(gen) - } - up.haveCache = cache - up.Client.SetHaveCache(cache) + up.haveCache = NewKvHaveCache(gen) + up.Client.SetHaveCache(up.haveCache) } } diff --git a/cmd/camput/flatcache.go b/cmd/camput/flatcache.go deleted file mode 100644 index b0ba5d4ac..000000000 --- a/cmd/camput/flatcache.go +++ /dev/null @@ -1,269 +0,0 @@ -/* -Copyright 2011 Google Inc. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "bufio" - "bytes" - "errors" - "fmt" - "hash/crc32" - "io" - "log" - "net/url" - "os" - "path/filepath" - "strconv" - "strings" - "sync" - - "camlistore.org/pkg/blob" - "camlistore.org/pkg/client" - "camlistore.org/pkg/osutil" -) - -type statFingerprint string - -var cleanSysStat func(v interface{}) interface{} - -func fileInfoToFingerprint(fi os.FileInfo) statFingerprint { - // We calculate the CRC32 of the underlying system stat structure to get - // ctime, owner, group, etc. This is overkill (e.g. we don't care about - // the inode or device number probably), but works. - sysHash := uint32(0) - if sys := fi.Sys(); sys != nil { - if clean := cleanSysStat; clean != nil { - // TODO: don't clean bad fields, but provide a - // portable way to extract all good fields. - // This is a Linux+Mac-specific hack for now. - sys = clean(sys) - } - var buf bytes.Buffer - fmt.Fprintf(&buf, "%#v", sys) - sysHash = crc32.ChecksumIEEE(buf.Bytes()) - } - return statFingerprint(fmt.Sprintf("%dB/%dMOD/sys-%d", fi.Size(), fi.ModTime().UnixNano(), sysHash)) -} - -type fileInfoPutRes struct { - Fingerprint statFingerprint - Result client.PutResult -} - -// FlatStatCache is an ugly hack, until leveldb-go is ready -// (http://code.google.com/p/leveldb-go/) -type FlatStatCache struct { - mu sync.RWMutex - filename string - m map[string]fileInfoPutRes - af *os.File // for appending -} - -func escapeGen(gen string) string { - // Good enough: - return url.QueryEscape(gen) -} - -func NewFlatStatCache(gen string) *FlatStatCache { - filename := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen)) - fc := &FlatStatCache{ - filename: filename, - m: make(map[string]fileInfoPutRes), - } - - f, err := os.Open(filename) - if os.IsNotExist(err) { - return fc - } - if err != nil { - log.Fatalf("opening camput stat cache: %v", filename, err) - } - defer f.Close() - br := bufio.NewReader(f) - for { - ln, err := br.ReadString('\n') - if err == io.EOF { - break - } - if err != nil { - log.Printf("Warning: (ignoring) reading stat cache: %v", err) - break - } - ln = strings.TrimSpace(ln) - f := strings.Split(ln, "\t") - if len(f) < 3 { - continue - } - filename, fp, putres := f[0], statFingerprint(f[1]), f[2] - f = strings.Split(putres, "/") - if len(f) != 2 { - continue - } - blobrefStr := f[0] - blobSize, err := strconv.ParseInt(f[1], 10, 64) - if err != nil { - continue - } - - fc.m[filename] = fileInfoPutRes{ - Fingerprint: fp, - Result: client.PutResult{ - BlobRef: blob.ParseOrZero(blobrefStr), - Size: blobSize, - Skipped: true, // is this used? - }, - } - } - vlog.Printf("Flatcache read %d entries from %s", len(fc.m), filename) - return fc -} - -var _ UploadCache = (*FlatStatCache)(nil) - -var errCacheMiss = errors.New("not in cache") - -// flatCacheKey returns the key used for a stat entry in the flat cache. -// It is the cleaned absolute path of joining pwd and filename, to which -// "|Perm" is appended if -filenodes is being used. -func flatCacheKey(pwd, filename string, withPermanode bool) string { - var fullPath string - if filepath.IsAbs(filename) { - fullPath = filepath.Clean(filename) - } else { - fullPath = filepath.Join(pwd, filename) - } - if withPermanode { - return fmt.Sprintf("%v|Perm", fullPath) - } - return fullPath -} - -func (c *FlatStatCache) CachedPutResult(pwd, filename string, fi os.FileInfo, withPermanode bool) (*client.PutResult, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - fp := fileInfoToFingerprint(fi) - - key := flatCacheKey(pwd, filename, withPermanode) - val, ok := c.m[key] - if !ok { - cachelog.Printf("cache MISS on %q: not in cache", key) - return nil, errCacheMiss - } - if val.Fingerprint != fp { - cachelog.Printf("cache MISS on %q: stats not equal:\n%#v\n%#v", key, val.Fingerprint, fp) - return nil, errCacheMiss - } - pr := val.Result - return &pr, nil -} - -func (c *FlatStatCache) AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult, withPermanode bool) { - c.mu.Lock() - defer c.mu.Unlock() - key := flatCacheKey(pwd, filename, withPermanode) - val := fileInfoPutRes{fileInfoToFingerprint(fi), *pr} - - cachelog.Printf("Adding to stat cache %q: %v", key, val) - - c.m[key] = val - if c.af == nil { - var err error - c.af, err = os.OpenFile(c.filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) - if err != nil { - log.Printf("opening stat cache for append: %v", err) - return - } - } - // TODO: flocking. see leveldb-go. - c.af.Seek(0, os.SEEK_END) - c.af.Write([]byte(fmt.Sprintf("%s\t%s\t%s/%d\n", key, val.Fingerprint, val.Result.BlobRef.String(), val.Result.Size))) -} - -type FlatHaveCache struct { - mu sync.RWMutex - filename string - m map[string]int64 // blobref string -> size - af *os.File // appending file -} - -func NewFlatHaveCache(gen string) *FlatHaveCache { - filename := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen)) - c := &FlatHaveCache{ - filename: filename, - m: make(map[string]int64), - } - f, err := os.Open(filename) - if os.IsNotExist(err) { - return c - } - if err != nil { - log.Fatalf("opening camput have-cache: %v", filename, err) - } - br := bufio.NewReader(f) - for { - ln, err := br.ReadString('\n') - if err == io.EOF { - break - } - if err != nil { - log.Printf("Warning: (ignoring) reading have-cache: %v", err) - break - } - f := strings.Fields(strings.TrimSpace(ln)) - if len(f) == 2 { - br, sizea := f[0], f[1] - if size, err := strconv.ParseInt(sizea, 10, 64); err == nil && size >= 0 { - c.m[br] = size - } - } - } - return c -} - -func (c *FlatHaveCache) StatBlobCache(br blob.Ref) (size int64, ok bool) { - c.mu.RLock() - defer c.mu.RUnlock() - size, ok = c.m[br.String()] - return -} - -func (c *FlatHaveCache) NoteBlobExists(br blob.Ref, size int64) { - c.mu.Lock() - defer c.mu.Unlock() - if size < 0 { - panic("negative size") - } - k := br.String() - if c.m[k] == size { - // dup - return - } - c.m[k] = size - - if c.af == nil { - var err error - c.af, err = os.OpenFile(c.filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0600) - if err != nil { - log.Printf("opening have-cache for append: %v", err) - return - } - } - // TODO: flocking. see leveldb-go. - c.af.Seek(0, os.SEEK_END) - c.af.Write([]byte(fmt.Sprintf("%s %d\n", k, size))) -} diff --git a/cmd/camput/kvcache.go b/cmd/camput/kvcache.go new file mode 100644 index 000000000..e80c11a38 --- /dev/null +++ b/cmd/camput/kvcache.go @@ -0,0 +1,328 @@ +/* +Copyright 2013 The Camlistore Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "bytes" + "encoding/binary" + "errors" + "fmt" + "hash/crc32" + "log" + "net/url" + "os" + "path/filepath" + "strconv" + + "camlistore.org/pkg/blob" + "camlistore.org/pkg/client" + "camlistore.org/pkg/kvutil" + "camlistore.org/pkg/osutil" + "camlistore.org/third_party/github.com/cznic/kv" +) + +var errCacheMiss = errors.New("not in cache") + +// KvHaveCache is a HaveCache on top of a single +// mutable database file on disk using github.com/cznic/kv. +// It stores the blobref in binary as the key, and +// the blobsize in binary as the value. +// Access to the cache is restricted to one process +// at a time with a lock file. Close should be called +// to remove the lock. +type KvHaveCache struct { + filename string + db *kv.DB +} + +func NewKvHaveCache(gen string) *KvHaveCache { + fullPath := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen)+".kv") + db, err := kvutil.Open(fullPath, nil) + if err != nil { + log.Fatalf("Could not create/open new have cache at %v, %v", fullPath, err) + } + return &KvHaveCache{ + filename: fullPath, + db: db, + } +} + +// Close should be called to commit all the writes +// to the db and to unlock the file. +func (c *KvHaveCache) Close() error { + return c.db.Close() +} + +func (c *KvHaveCache) StatBlobCache(br blob.Ref) (size int64, ok bool) { + if !br.Valid() { + return + } + binBr, _ := br.MarshalBinary() + binVal, err := c.db.Get(nil, binBr) + if err != nil { + log.Fatalf("Could not query have cache %v for %v: %v", c.filename, br, err) + } + if binVal == nil { + cachelog.Printf("have cache MISS on %v", br) + return + } + val, err := strconv.Atoi(string(binVal)) + if err != nil { + log.Fatalf("Could not decode have cache binary value for %v: %v", br, err) + } + cachelog.Printf("have cache HIT on %v", br) + return int64(val), true +} + +func (c *KvHaveCache) NoteBlobExists(br blob.Ref, size int64) { + if !br.Valid() { + return + } + if size < 0 { + log.Fatalf("Got a negative blob size to note in have cache for %v", br) + } + binBr, _ := br.MarshalBinary() + binVal := []byte(strconv.Itoa(int(size))) + cachelog.Printf("Adding to have cache %v: %q", br, binVal) + _, _, err := c.db.Put(nil, binBr, + func(binBr, old []byte) ([]byte, bool, error) { + // We do not overwrite dups + if old != nil { + return nil, false, nil + } + return binVal, true, nil + }) + if err != nil { + log.Fatalf("Could not write %v in have cache: %v", br, err) + } +} + +// KvStatCache is an UploadCache on top of a single +// mutable database file on disk using github.com/cznic/kv. +// It stores a binary combination of an os.FileInfo fingerprint and +// a client.Putresult as the key, and the blobsize in binary as +// the value. +// Access to the cache is restricted to one process +// at a time with a lock file. Close should be called +// to remove the lock. +type KvStatCache struct { + filename string + db *kv.DB +} + +func NewKvStatCache(gen string) *KvStatCache { + fullPath := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen)+".kv") + db, err := kvutil.Open(fullPath, nil) + if err != nil { + log.Fatalf("Could not create/open new stat cache at %v, %v", fullPath, err) + } + return &KvStatCache{ + filename: fullPath, + db: db, + } +} + +// Close should be called to commit all the writes +// to the db and to unlock the file. +func (c *KvStatCache) Close() error { + return c.db.Close() +} + +func (c *KvStatCache) CachedPutResult(pwd, filename string, fi os.FileInfo, withPermanode bool) (*client.PutResult, error) { + fullPath := fullpath(pwd, filename) + cacheKey := &statCacheKey{ + Filepath: fullPath, + Permanode: withPermanode, + } + binKey, err := cacheKey.marshalBinary() + binVal, err := c.db.Get(nil, binKey) + if err != nil { + log.Fatalf("Could not query stat cache %v for %q: %v", binKey, fullPath, err) + } + if binVal == nil { + cachelog.Printf("stat cache MISS on %q", binKey) + return nil, errCacheMiss + } + val := &statCacheValue{} + if err = val.unmarshalBinary(binVal); err != nil { + return nil, fmt.Errorf("Bogus stat cached value for %q: %v", binKey, err) + } + fp := fileInfoToFingerprint(fi) + if val.Fingerprint != fp { + cachelog.Printf("cache MISS on %q: stats not equal:\n%#v\n%#v", binKey, val.Fingerprint, fp) + return nil, errCacheMiss + } + cachelog.Printf("stat cache HIT on %q", binKey) + return &(val.Result), nil +} + +func (c *KvStatCache) AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult, withPermanode bool) { + fullPath := fullpath(pwd, filename) + cacheKey := &statCacheKey{ + Filepath: fullPath, + Permanode: withPermanode, + } + val := &statCacheValue{fileInfoToFingerprint(fi), *pr} + + binKey, err := cacheKey.marshalBinary() + if err != nil { + log.Fatalf("Could not add %q to stat cache: %v", binKey, err) + } + binVal, err := val.marshalBinary() + if err != nil { + log.Fatalf("Could not add %q to stat cache: %v", binKey, err) + } + cachelog.Printf("Adding to stat cache %q: %q", binKey, binVal) + _, _, err = c.db.Put(nil, binKey, + func(binKey, old []byte) ([]byte, bool, error) { + // We do not overwrite dups + if old != nil { + return nil, false, nil + } + return binVal, true, nil + }) + if err != nil { + log.Fatalf("Could not add %q to stat cache: %v", binKey, err) + } +} + +type statCacheKey struct { + Filepath string + Permanode bool // whether -filenodes is being used. +} + +// marshalBinary returns a more compact binary +// representation of the contents of sk. +func (sk *statCacheKey) marshalBinary() ([]byte, error) { + if sk == nil { + return nil, errors.New("Can not marshal from a nil stat cache key") + } + data := make([]byte, 0, len(sk.Filepath)+3) + data = append(data, 1) // version number + data = append(data, sk.Filepath...) + data = append(data, '|') + if sk.Permanode { + data = append(data, 1) + } + return data, nil +} + +type statFingerprint string + +type statCacheValue struct { + Fingerprint statFingerprint + Result client.PutResult +} + +// marshalBinary returns a more compact binary +// representation of the contents of scv. +func (scv *statCacheValue) marshalBinary() ([]byte, error) { + if scv == nil { + return nil, errors.New("Can not marshal from a nil stat cache value") + } + binBr, _ := scv.Result.BlobRef.MarshalBinary() + // Blob size fits on 4 bytes when binary encoded + data := make([]byte, 0, len(scv.Fingerprint)+1+4+1+len(binBr)) + buf := bytes.NewBuffer(data) + _, err := buf.WriteString(string(scv.Fingerprint)) + if err != nil { + return nil, fmt.Errorf("Could not write fingerprint %v: %v", scv.Fingerprint, err) + } + err = buf.WriteByte('|') + if err != nil { + return nil, fmt.Errorf("Could not write '|': %v", err) + } + err = binary.Write(buf, binary.BigEndian, int32(scv.Result.Size)) + if err != nil { + return nil, fmt.Errorf("Could not write blob size %d: %v", scv.Result.Size, err) + } + err = buf.WriteByte('|') + if err != nil { + return nil, fmt.Errorf("Could not write '|': %v", err) + } + _, err = buf.Write(binBr) + if err != nil { + return nil, fmt.Errorf("Could not write binary blobref %q: %v", binBr, err) + } + return buf.Bytes(), nil +} + +func (scv *statCacheValue) unmarshalBinary(data []byte) error { + if scv == nil { + return errors.New("Can't unmarshalBinary into a nil stat cache value") + } + if scv.Fingerprint != "" { + return errors.New("Can't unmarshalBinary into a non empty stat cache value") + } + + parts := bytes.SplitN(data, []byte("|"), 3) + fingerprint := string(parts[0]) + buf := bytes.NewReader(parts[1]) + var size int32 + err := binary.Read(buf, binary.BigEndian, &size) + if err != nil { + return fmt.Errorf("Could not decode blob size from stat cache value part %q: %v", parts[1], err) + } + br := new(blob.Ref) + if err := br.UnmarshalBinary(parts[2]); err != nil { + return fmt.Errorf("Could not unmarshalBinary for %q: %v", parts[2], err) + } + + scv.Fingerprint = statFingerprint(fingerprint) + scv.Result = client.PutResult{ + BlobRef: *br, + Size: int64(size), + Skipped: true, + } + return nil +} + +func fullpath(pwd, filename string) string { + var fullPath string + if filepath.IsAbs(filename) { + fullPath = filepath.Clean(filename) + } else { + fullPath = filepath.Join(pwd, filename) + } + return fullPath +} + +func escapeGen(gen string) string { + // Good enough: + return url.QueryEscape(gen) +} + +var cleanSysStat func(v interface{}) interface{} + +func fileInfoToFingerprint(fi os.FileInfo) statFingerprint { + // We calculate the CRC32 of the underlying system stat structure to get + // ctime, owner, group, etc. This is overkill (e.g. we don't care about + // the inode or device number probably), but works. + sysHash := uint32(0) + if sys := fi.Sys(); sys != nil { + if clean := cleanSysStat; clean != nil { + // TODO: don't clean bad fields, but provide a + // portable way to extract all good fields. + // This is a Linux+Mac-specific hack for now. + sys = clean(sys) + } + c32 := crc32.NewIEEE() + fmt.Fprintf(c32, "%#v", sys) + sysHash = c32.Sum32() + } + return statFingerprint(fmt.Sprintf("%dB/%dMOD/sys-%d", fi.Size(), fi.ModTime().UnixNano(), sysHash)) +} diff --git a/cmd/camput/sqlitecache.go b/cmd/camput/sqlitecache.go deleted file mode 100644 index bd90eb74d..000000000 --- a/cmd/camput/sqlitecache.go +++ /dev/null @@ -1,375 +0,0 @@ -/* -Copyright 2013 The Camlistore Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package main - -import ( - "bufio" - "fmt" - "io" - "log" - "os" - "os/exec" - "path/filepath" - "strconv" - "strings" - "sync" - - "camlistore.org/pkg/blob" - "camlistore.org/pkg/client" - "camlistore.org/pkg/osutil" -) - -const ( - cmdName = "sqlite3" - noResult = "no row" - haveTableName = "have" - statTableName = "stat" - testTable = `.tables` - createHaveTable = `CREATE TABLE ` + haveTableName + - ` (blobref VARCHAR(255) NOT NULL PRIMARY KEY,size INT)` - createStatTable = `CREATE TABLE ` + statTableName + - ` (key TEXT NOT NULL PRIMARY KEY, val TEXT)` - // Because of blocking reads on the output, we want to print - // something even when a query returns no result, - // hence the ugly joins. - // TODO(mpl): there's probably a way to do non blocking reads - // on the stdout pipe of the sqlite process, so we would not - // have to use these ugly requests. Suggestion? - blobSizeQuery = `SELECT COALESCE(size, fake.filler) as size - FROM (SELECT '` + noResult + `' AS [filler]) fake - LEFT JOIN ` + haveTableName + - ` ON blobref = ` - statKeyQuery = `SELECT COALESCE(val, fake.filler) as val - FROM (SELECT '` + noResult + `' AS [filler]) fake - LEFT JOIN ` + statTableName + - ` ON key = ` - noteHaveStmt = `INSERT INTO ` + haveTableName + - ` VALUES ('?1', ?2)` + ";\n" - noteStatStmt = `INSERT INTO ` + statTableName + - ` VALUES ('?1', '?2')` + ";\n" - keyNotUnique = "column key is not unique\n" - brefNotUnique = "column blobref is not unique\n" -) - -func checkCmdInstalled() { - _, err := exec.LookPath(cmdName) - if err != nil { - hint := `The binary is not in your $PATH or most likely not installed.` + - ` On debian based distributions, it is usually provided by the sqlite3 package.` - log.Fatalf("%v command could not be found: %v\n"+hint, cmdName, err) - } -} - -type childInfo struct { - r *bufio.Reader // to read the child's stdout - w io.WriteCloser // to write to the child's stdin - proc *os.Process - er *bufio.Reader // to read the child's stderr -} - -func startChild(filename string) (*childInfo, error) { - cmdPath, err := exec.LookPath(cmdName) - if err != nil { - return nil, err - } - pr1, pw1, err := os.Pipe() - if err != nil { - return nil, err - } - pr2, pw2, err := os.Pipe() - if err != nil { - return nil, err - } - pr3, pw3, err := os.Pipe() - if err != nil { - return nil, err - } - args := []string{cmdPath, filename} - fds := []*os.File{pr1, pw2, pw3} - p, err := os.StartProcess(cmdPath, args, &os.ProcAttr{Dir: "/", Files: fds}) - if err != nil { - return nil, err - } - return &childInfo{ - r: bufio.NewReader(pr2), - w: pw1, - proc: p, - er: bufio.NewReader(pr3), - }, nil -} - -// SQLiteStatCache is an UploadCache based on sqlite. -// sqlite3 is called as a child process so we can still -// cross-compile static ARM binaries for Android, and -// use the android system sqlite, rather than having to -// include a big copy of the sqlite libs. -// It stores rows with (key, value) pairs, where -// key = filepath|statFingerprint and -// value = PutResult.BlobRef.String()|PutResult.Size -type SQLiteStatCache struct { - filename string - proc *os.Process - mu sync.Mutex // Guards reads and writes to sqlite. - r *bufio.Reader // where to read the output from the sqlite process - w io.WriteCloser // where to write queries/statements to the sqlite process -} - -func NewSQLiteStatCache(gen string) *SQLiteStatCache { - checkCmdInstalled() - filename := filepath.Join(osutil.CacheDir(), "camput.statcache."+escapeGen(gen)+".db") - out, err := exec.Command(cmdName, filename, testTable).Output() - if err != nil { - log.Fatalf("Failed to test for %v table existence: %v", statTableName, err) - } - if len(out) == 0 { - // file or table does not exist - err = exec.Command(cmdName, filename, createStatTable).Run() - if err != nil { - log.Fatalf("Failed to create %v table for stat cache: %v", statTableName, err) - } - } else { - if string(out) != statTableName+"\n" { - log.Fatalf("Wrong table name for stat cache; was expecting %v, got %q", - haveTableName, out) - } - } - return &SQLiteStatCache{ - filename: filename, - } -} - -func (c *SQLiteStatCache) startSQLiteChild() error { - if c.proc != nil { - return nil - } - ci, err := startChild(c.filename) - if err != nil { - return err - } - go func() { - for { - errStr, err := ci.er.ReadString('\n') - if err != nil { - log.Fatal(err) - } - if !strings.HasSuffix(errStr, keyNotUnique) { - log.Fatalf("Error on stat cache: %v", errStr) - } - } - }() - c.r = ci.r - c.w = ci.w - c.proc = ci.proc - return nil -} - -// sqliteCacheKey returns the key used for a stat entry in the sqlite cache. -// It is the cleaned absolute path of joining pwd and filename, -// concatenated with a fingerprint based on the file's info. If -// -filenodes is being used, the suffix "|Perm" is also appended. -func sqliteCacheKey(pwd, filename string, fi os.FileInfo, withPermanode bool) string { - var fullPath string - if filepath.IsAbs(filename) { - fullPath = filepath.Clean(filename) - } else { - fullPath = filepath.Join(pwd, filename) - } - key := fmt.Sprintf("%v|%v", fullPath, string(fileInfoToFingerprint(fi))) - if withPermanode { - return fmt.Sprintf("%v|Perm", key) - } - return key -} - -func (c *SQLiteStatCache) CachedPutResult(pwd, filename string, fi os.FileInfo, withPermanode bool) (*client.PutResult, error) { - key := sqliteCacheKey(pwd, filename, fi, withPermanode) - query := fmt.Sprintf("%v'%v';\n", statKeyQuery, key) - c.mu.Lock() - err := c.startSQLiteChild() - if err != nil { - c.mu.Unlock() - return nil, fmt.Errorf("Could not start sqlite child process: %v", err) - } - _, err = c.w.Write([]byte(query)) - if err != nil { - c.mu.Unlock() - return nil, fmt.Errorf("failed to query stat cache: %v", err) - } - out, err := c.r.ReadString('\n') - if err != nil { - c.mu.Unlock() - return nil, fmt.Errorf("failed to read stat cache query result: %v", err) - } - out = strings.TrimRight(out, "\n") - c.mu.Unlock() - - if out == noResult { - return nil, errCacheMiss - } - fields := strings.Split(out, "|") - if len(fields) > 2 { - return nil, fmt.Errorf("Invalid stat cache value; was expecting \"bref|size\", got %q", out) - } - br, ok := blob.Parse(fields[0]) - if !ok { - return nil, fmt.Errorf("Invalid blobref in stat cache: %q", fields[0]) - } - blobSize, err := strconv.ParseInt(fields[1], 10, 64) - if err != nil { - return nil, fmt.Errorf("Invalid blob size %q in stat cache: %v", fields[1], err) - } - return &client.PutResult{ - BlobRef: br, - Size: blobSize, - Skipped: true, - }, nil -} - -func (c *SQLiteStatCache) AddCachedPutResult(pwd, filename string, fi os.FileInfo, pr *client.PutResult, withPermanode bool) { - key := sqliteCacheKey(pwd, filename, fi, withPermanode) - val := pr.BlobRef.String() + "|" + strconv.FormatInt(pr.Size, 10) - repl := strings.NewReplacer("?1", key, "?2", val) - query := repl.Replace(noteStatStmt) - c.mu.Lock() - defer c.mu.Unlock() - err := c.startSQLiteChild() - if err != nil { - log.Fatalf("Could not start sqlite child process: %v", err) - } - cachelog.Printf("Adding to stat cache %v: %v", key, val) - _, err = c.w.Write([]byte(query)) - if err != nil { - log.Fatalf("failed to write to stat cache: %v", err) - } -} - -// SQLiteHacheCache is a HaveCache based on sqlite. -// sqlite3 is called as a child process so we can still -// cross-compile static ARM binaries for Android, and -// use the android system sqlite, rather than having to -// include a big copy of the sqlite libs. -// It stores rows with (key,value) pairs, where -// key = blobref and -// value = blobsize -type SQLiteHaveCache struct { - filename string - proc *os.Process - mu sync.Mutex // Guards reads and writes to sqlite. - r *bufio.Reader // where to read the output from the sqlite process - w io.WriteCloser // where to write queries/statements to the sqlite process -} - -func NewSQLiteHaveCache(gen string) *SQLiteHaveCache { - checkCmdInstalled() - filename := filepath.Join(osutil.CacheDir(), "camput.havecache."+escapeGen(gen)+".db") - out, err := exec.Command(cmdName, filename, testTable).Output() - if err != nil { - log.Fatalf("Failed to test for %v table existence: %v", haveTableName, err) - } - if len(out) == 0 { - // file or table does not exist - err = exec.Command(cmdName, filename, createHaveTable).Run() - if err != nil { - log.Fatalf("Failed to create %v table for have cache: %v", haveTableName, err) - } - } else { - if string(out) != haveTableName+"\n" { - log.Fatalf("Wrong table name for have cache; was expecting %v, got %q", - haveTableName, out) - } - } - return &SQLiteHaveCache{ - filename: filename, - } -} - -func (c *SQLiteHaveCache) startSQLiteChild() error { - if c.proc != nil { - return nil - } - ci, err := startChild(c.filename) - if err != nil { - return err - } - go func() { - for { - errStr, err := ci.er.ReadString('\n') - if err != nil { - log.Fatal(err) - } - if !strings.HasSuffix(errStr, brefNotUnique) { - log.Fatalf("Error on have cache: %v", errStr) - } - } - }() - c.r = ci.r - c.w = ci.w - c.proc = ci.proc - return nil -} - -func (c *SQLiteHaveCache) StatBlobCache(br blob.Ref) (size int64, ok bool) { - if !br.Valid() { - return - } - // TODO(mpl): is it enough that we know it's a valid blobref to avoid any injection risk ? - query := blobSizeQuery + fmt.Sprintf("'%v';\n", br.String()) - c.mu.Lock() - defer c.mu.Unlock() - err := c.startSQLiteChild() - if err != nil { - log.Fatalf("Could not start sqlite child process: %v", err) - } - _, err = c.w.Write([]byte(query)) - if err != nil { - log.Fatalf("failed to query have cache: %v", err) - } - out, err := c.r.ReadString('\n') - if err != nil { - log.Fatalf("failed to read have cache query result: %v", err) - } - out = strings.TrimRight(out, "\n") - if out == noResult { - return - } - size, err = strconv.ParseInt(out, 10, 64) - if err != nil { - log.Fatalf("Bogus blob size in %v table: %v", haveTableName, err) - } - return size, true -} - -func (c *SQLiteHaveCache) NoteBlobExists(br blob.Ref, size int64) { - if size < 0 { - log.Fatalf("Got a negative blob size to note in have cache") - } - if !br.Valid() { - return - } - repl := strings.NewReplacer("?1", br.String(), "?2", fmt.Sprint(size)) - query := repl.Replace(noteHaveStmt) - c.mu.Lock() - defer c.mu.Unlock() - err := c.startSQLiteChild() - if err != nil { - log.Fatalf("Could not start sqlite child process: %v", err) - } - _, err = c.w.Write([]byte(query)) - if err != nil { - log.Fatalf("failed to write to have cache: %v", err) - } -} diff --git a/pkg/kvutil/kvutil.go b/pkg/kvutil/kvutil.go new file mode 100644 index 000000000..1c0fcb56d --- /dev/null +++ b/pkg/kvutil/kvutil.go @@ -0,0 +1,46 @@ +/* +Copyright 2013 The Camlistore Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package kvutil contains helpers related to +// github.com/cznic/kv. +package kvutil + +import ( + "io" + "os" + + "camlistore.org/third_party/github.com/camlistore/lock" + "camlistore.org/third_party/github.com/cznic/kv" +) + +// Open opens the named kv DB file for reading/writing. It +// creates the file if it does not exist yet. +func Open(filePath string, opts *kv.Options) (*kv.DB, error) { + // TODO(mpl): use it in index pkg and such + createOpen := kv.Open + if _, err := os.Stat(filePath); os.IsNotExist(err) { + createOpen = kv.Create + } + if opts == nil { + opts = &kv.Options{} + } + if opts.Locker == nil { + opts.Locker = func(fullPath string) (io.Closer, error) { + return lock.Lock(filePath + ".lock") + } + } + return createOpen(filePath, opts) +}