From 1773eb0859edafd2cd07511f4e26b4119dd1af64 Mon Sep 17 00:00:00 2001 From: mpl Date: Wed, 28 Oct 2015 00:11:58 +0100 Subject: [PATCH] server/synchandler: add exported constructor and IdleWait method This change allows (for example) to hook a blobserver and an indexer together in a way similar as they would be in a default Camlistore setup. This is particularly useful to setup tests that require a realistic indexer, without having to create a heavier test with e.g. test.World. This actual goal of this change is described below. I kept these changes together because the one described below illustrates the need for the synchandler changes. pkg/index/stress/: stress test the indexer with more data The goal of these benchmkarks is to help us compare the various sorted implementations, so we can pick the most efficient one as a default. A second, harder, goal would be to add tests that help us estimate the reliability of the various kv stores, again so we can rule out the buggy ones, or help fix them. Change-Id: I8a1fe50398a4ba09c03786502b68b6c2599e5984 --- pkg/index/stress/index_test.go | 631 +++++++++++++++++++++++++++++++++ pkg/server/sync.go | 40 ++- pkg/sorted/sqlite/sqlitekv.go | 6 + 3 files changed, 676 insertions(+), 1 deletion(-) create mode 100644 pkg/index/stress/index_test.go diff --git a/pkg/index/stress/index_test.go b/pkg/index/stress/index_test.go new file mode 100644 index 000000000..f426b7c3c --- /dev/null +++ b/pkg/index/stress/index_test.go @@ -0,0 +1,631 @@ +/* +Copyright 2016 The Camlistore Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package stress + +import ( + "bytes" + "crypto/sha1" + "flag" + "fmt" + "hash" + "io" + "io/ioutil" + "log" + "os" + "os/exec" + "path/filepath" + "strings" + "testing" + "time" + + "camlistore.org/pkg/blob" + "camlistore.org/pkg/blobserver" + "camlistore.org/pkg/blobserver/localdisk" + "camlistore.org/pkg/index" + "camlistore.org/pkg/jsonsign" + "camlistore.org/pkg/osutil" + "camlistore.org/pkg/schema" + "camlistore.org/pkg/server" + "camlistore.org/pkg/sorted" + "camlistore.org/pkg/sorted/kvfile" + "camlistore.org/pkg/sorted/leveldb" + "camlistore.org/pkg/sorted/sqlite" + "camlistore.org/pkg/types/camtypes" +) + +var ( + flagTempDir = flag.String("tempDir", os.TempDir(), "dir where we'll write all the benchmarks dirs. In case the default is on too small a partition since we may use lots of data.") + flagBenchDir = flag.String("benchDir", "", "the directory with a prepopulated blob server, needed by any benchmark that does not start with populating a blob server & index. Run a populating bench with -nowipe to obtain such a directory.") + flagNoWipe = flag.Bool("nowipe", false, "do not wipe the test dir at the end of the run.") +) + +func benchmarkPopulate(b *testing.B, dbname string, sortedProvider func(dbfile string) (sorted.KeyValue, error)) { + tempDir, err := ioutil.TempDir(*flagTempDir, "camli-index-stress") + if err != nil { + b.Fatal(err) + } + defer func() { + if *flagNoWipe { + return + } + os.RemoveAll(tempDir) + }() + dbfile := filepath.Join(tempDir, dbname) + idx := populate(b, dbfile, sortedProvider) + if _, err := idx.KeepInMemory(); err != nil { + b.Fatal(err) + } + if err := idx.Close(); err != nil { + b.Fatal(err) + } + b.Logf("size of %v: %v", dbfile, size(b, dbfile)) +} + +func BenchmarkPopulateLevelDB(b *testing.B) { + benchmarkPopulate(b, "leveldb.db", func(dbfile string) (sorted.KeyValue, error) { + return leveldb.NewStorage(dbfile) + }) +} + +func BenchmarkPopulateCznic(b *testing.B) { + benchmarkPopulate(b, "kvfile.db", func(dbfile string) (sorted.KeyValue, error) { + return kvfile.NewStorage(dbfile) + }) +} + +func BenchmarkPopulateSQLite(b *testing.B) { + benchmarkPopulate(b, "sqlite.db", func(dbfile string) (sorted.KeyValue, error) { + return sqlite.NewStorage(dbfile) + }) +} + +func benchmarkReindex(b *testing.B, dbname string, sortedProvider func(dbfile string) (sorted.KeyValue, error)) { + if *flagBenchDir == "" { + b.Skip("Reindexing benchmark needs -benchDir") + } + dbfile := filepath.Join(*flagBenchDir, dbname) + + idx := reindex(b, dbfile, sortedProvider) + if _, err := idx.KeepInMemory(); err != nil { + b.Fatal(err) + } + if err := idx.Close(); err != nil { + b.Fatal(err) + } +} + +func BenchmarkReindexLevelDB(b *testing.B) { + benchmarkReindex(b, "leveldb.db", func(dbfile string) (sorted.KeyValue, error) { + return leveldb.NewStorage(dbfile) + }) +} + +func BenchmarkReindexCznic(b *testing.B) { + benchmarkReindex(b, "kvfile.db", func(dbfile string) (sorted.KeyValue, error) { + return kvfile.NewStorage(dbfile) + }) +} + +func BenchmarkReindexSQLite(b *testing.B) { + benchmarkReindex(b, "sqlite.db", func(dbfile string) (sorted.KeyValue, error) { + return sqlite.NewStorage(dbfile) + }) +} + +// Testing EnumerateBlobMeta because that's one of the few non-corpus index reading ops we still actually use. +func BenchmarkEnumerateBlobMetaLevelDB(b *testing.B) { + if *flagBenchDir == "" { + b.Skip("Enumerating benchmark needs -benchDir") + } + dbfile := filepath.Join(*flagBenchDir, "leveldb.db") + enumerateMeta(b, dbfile, func(dbfile string) (sorted.KeyValue, error) { + return leveldb.NewStorage(dbfile) + }) +} + +// Testing EnumerateBlobMeta because that's one of the few non-corpus index reading ops we still actually use. +func BenchmarkEnumerateBlobMetaCznic(b *testing.B) { + if *flagBenchDir == "" { + b.Skip("Enumerating benchmark needs -benchDir") + } + dbfile := filepath.Join(*flagBenchDir, "kvfile.db") + enumerateMeta(b, dbfile, func(dbfile string) (sorted.KeyValue, error) { + return kvfile.NewStorage(dbfile) + }) +} + +// Testing EnumerateBlobMeta because that's one of the few non-corpus index reading ops we still actually use. +func BenchmarkEnumerateBlobMetaSQLite(b *testing.B) { + if *flagBenchDir == "" { + b.Skip("Enumerating benchmark needs -benchDir") + } + dbfile := filepath.Join(*flagBenchDir, "sqlite.db") + enumerateMeta(b, dbfile, func(dbfile string) (sorted.KeyValue, error) { + return sqlite.NewStorage(dbfile) + }) +} + +// TODO(mpl): allow for setting killTime with a flag ? + +func BenchmarkInterruptLevelDB(b *testing.B) { + if *flagBenchDir == "" { + b.Skip("Interrupt benchmark needs -benchDir") + } + dbfile := filepath.Join(*flagBenchDir, "leveldb.db") + + benchmarkKillReindex(b, 1, dbfile, func(dbfile string) (sorted.KeyValue, error) { + return leveldb.NewStorage(dbfile) + }) +} + +func BenchmarkInterruptCznic(b *testing.B) { + if *flagBenchDir == "" { + b.Skip("Interrupt benchmark needs -benchDir") + } + dbfile := filepath.Join(*flagBenchDir, "kvfile.db") + + // since cznic is much slower than levelDB at reindexing, we interrupt + // it way less often. otherwise we might even blow up the max test run time + // (10m) anyway. + benchmarkKillReindex(b, 10, dbfile, func(dbfile string) (sorted.KeyValue, error) { + return kvfile.NewStorage(dbfile) + }) +} + +func BenchmarkInterruptSQLite(b *testing.B) { + if *flagBenchDir == "" { + b.Skip("Interrupt benchmark needs -benchDir") + } + dbfile := filepath.Join(*flagBenchDir, "sqlite.db") + + benchmarkKillReindex(b, 15, dbfile, func(dbfile string) (sorted.KeyValue, error) { + return sqlite.NewStorage(dbfile) + }) +} + +func benchmarkAll(b *testing.B, dbname string, sortedProvider func(dbfile string) (sorted.KeyValue, error)) { + tempDir, err := ioutil.TempDir(*flagTempDir, "camli-index-stress") + if err != nil { + b.Fatal(err) + } + defer func() { + if *flagNoWipe { + return + } + os.RemoveAll(tempDir) + }() + dbfile := filepath.Join(tempDir, dbname) + stress(b, dbfile, sortedProvider) +} + +func BenchmarkAllLevelDB(b *testing.B) { + benchmarkAll(b, "leveldb.db", func(dbfile string) (sorted.KeyValue, error) { + return leveldb.NewStorage(dbfile) + }) +} + +func BenchmarkAllCznic(b *testing.B) { + benchmarkAll(b, "kvfile.db", func(dbfile string) (sorted.KeyValue, error) { + return kvfile.NewStorage(dbfile) + }) +} + +func BenchmarkAllSQLite(b *testing.B) { + benchmarkAll(b, "sqlite.db", func(dbfile string) (sorted.KeyValue, error) { + return sqlite.NewStorage(dbfile) + }) +} + +func size(b *testing.B, dbfile string) int64 { + fi, err := os.Stat(dbfile) + if err != nil { + b.Fatal(err) + } + if !fi.IsDir() { + return fi.Size() + } + dbdir, err := os.Open(dbfile) + if err != nil { + b.Fatal(err) + } + names, err := dbdir.Readdirnames(-1) + if err != nil { + b.Fatal(err) + } + defer dbdir.Close() + var totalSize int64 + for _, name := range names { + // TODO(mpl): works for leveldb, but what about others ? + if !strings.HasSuffix(name, ".log") { + continue + } + fi, err := os.Stat(filepath.Join(dbfile, name)) + if err != nil { + b.Fatal(err) + } + totalSize += fi.Size() + } + return totalSize +} + +// Populates the bs, and the index at the same time through the sync handler +func populate(b *testing.B, dbfile string, + sortedProvider func(dbfile string) (sorted.KeyValue, error)) *index.Index { + b.Logf("populating %v", dbfile) + kv, err := sortedProvider(dbfile) + if err != nil { + b.Fatal(err) + } + bsRoot := filepath.Join(filepath.Dir(dbfile), "bs") + if err := os.MkdirAll(bsRoot, 0700); err != nil { + b.Fatal(err) + } + dataDir, err := os.Open("testdata") + if err != nil { + b.Fatal(err) + } + fis, err := dataDir.Readdir(-1) + if err != nil { + b.Fatal(err) + } + if len(fis) == 0 { + b.Fatalf("no files in %s dir", "testdata") + } + + ks := doKeyStuff(b) + + bs, err := localdisk.New(bsRoot) + if err != nil { + b.Fatal(err) + } + if _, err := blobserver.Receive(bs, ks.pubKeyRef, strings.NewReader(ks.pubKey)); err != nil { + b.Fatal(err) + } + idx, err := index.New(kv) + if err != nil { + b.Fatal(err) + } + idx.InitBlobSource(bs) + sh := server.NewSyncHandler("/bs/", "/index/", bs, idx, sorted.NewMemoryKeyValue()) + + b.ResetTimer() + for _, v := range fis { + f, err := os.Open(filepath.Join(dataDir.Name(), v.Name())) + if err != nil { + b.Fatal(err) + } + td := &trackDigestReader{r: f} + fm := schema.NewFileMap(v.Name()) + fm.SetModTime(v.ModTime()) + fileRef, err := schema.WriteFileMap(bs, fm, td) + if err != nil { + b.Fatal(err) + } + f.Close() + + unsigned := schema.NewPlannedPermanode(td.Sum()) + unsigned.SetSigner(ks.pubKeyRef) + sr := &jsonsign.SignRequest{ + UnsignedJSON: unsigned.Blob().JSON(), + // TODO(mpl): if we make a bs that discards, replace this with a memory bs that has only the pubkey + Fetcher: bs, + EntityFetcher: ks.entityFetcher, + SignatureTime: time.Unix(0, 0), + } + signed, err := sr.Sign() + if err != nil { + b.Fatal("problem signing: " + err.Error()) + } + pn := blob.SHA1FromString(signed) + // N.B: use blobserver.Receive so that the blob hub gets notified, and the blob gets enqueued into the index + if _, err := blobserver.Receive(bs, pn, strings.NewReader(signed)); err != nil { + b.Fatal(err) + } + + contentAttr := schema.NewSetAttributeClaim(pn, "camliContent", fileRef.String()) + claimTime, ok := fm.ModTime() + if !ok { + b.Fatal(err) + } + contentAttr.SetClaimDate(claimTime) + contentAttr.SetSigner(ks.pubKeyRef) + sr = &jsonsign.SignRequest{ + UnsignedJSON: contentAttr.Blob().JSON(), + // TODO(mpl): if we make a bs that discards, replace this with a memory bs that has only the pubkey + Fetcher: bs, + EntityFetcher: ks.entityFetcher, + SignatureTime: claimTime, + } + signed, err = sr.Sign() + if err != nil { + b.Fatal("problem signing: " + err.Error()) + } + cl := blob.SHA1FromString(signed) + if _, err := blobserver.Receive(bs, cl, strings.NewReader(signed)); err != nil { + b.Fatal(err) + } + } + sh.IdleWait() + + return idx +} + +type keyStuff struct { + secretRingFile string + pubKey string + pubKeyRef blob.Ref + entityFetcher jsonsign.EntityFetcher +} + +func doKeyStuff(b *testing.B) keyStuff { + camliRootPath, err := osutil.GoPackagePath("camlistore.org") + if err != nil { + b.Fatal("Package camlistore.org no found in $GOPATH or $GOPATH not defined") + } + secretRingFile := filepath.Join(camliRootPath, "pkg", "jsonsign", "testdata", "test-secring.gpg") + pubKey := `-----BEGIN PGP PUBLIC KEY BLOCK----- + +xsBNBEzgoVsBCAC/56aEJ9BNIGV9FVP+WzenTAkg12k86YqlwJVAB/VwdMlyXxvi +bCT1RVRfnYxscs14LLfcMWF3zMucw16mLlJCBSLvbZ0jn4h+/8vK5WuAdjw2YzLs +WtBcjWn3lV6tb4RJz5gtD/o1w8VWxwAnAVIWZntKAWmkcChCRgdUeWso76+plxE5 +aRYBJqdT1mctGqNEISd/WYPMgwnWXQsVi3x4z1dYu2tD9uO1dkAff12z1kyZQIBQ +rexKYRRRh9IKAayD4kgS0wdlULjBU98aeEaMz1ckuB46DX3lAYqmmTEL/Rl9cOI0 +Enpn/oOOfYFa5h0AFndZd1blMvruXfdAobjVABEBAAE= +=28/7 +-----END PGP PUBLIC KEY BLOCK-----` + return keyStuff{ + secretRingFile: secretRingFile, + pubKey: pubKey, + pubKeyRef: blob.SHA1FromString(pubKey), + entityFetcher: &jsonsign.CachingEntityFetcher{ + Fetcher: &jsonsign.FileEntityFetcher{File: secretRingFile}, + }, + } +} + +func reindex(b *testing.B, dbfile string, + sortedProvider func(dbfile string) (sorted.KeyValue, error)) *index.Index { + b.Logf("reindexing") + if err := os.RemoveAll(dbfile); err != nil { + b.Fatal(err) + } + kv, err := sortedProvider(dbfile) + if err != nil { + b.Fatal(err) + } + bs, err := localdisk.New(filepath.Join(filepath.Dir(dbfile), "bs")) + if err != nil { + b.Fatal(err) + } + idx, err := index.New(kv) + if err != nil { + b.Fatal(err) + } + idx.InitBlobSource(bs) + + b.ResetTimer() + if err := idx.Reindex(); err != nil { + b.Fatal(err) + } + return idx +} + +func enumerateMeta(b *testing.B, dbfile string, + sortedProvider func(dbfile string) (sorted.KeyValue, error)) int { + b.Logf("enumerating meta blobs") + kv, err := sortedProvider(dbfile) + if err != nil { + b.Fatal(err) + } + bs, err := localdisk.New(filepath.Join(filepath.Dir(dbfile), "bs")) + if err != nil { + b.Fatal(err) + } + idx, err := index.New(kv) + if err != nil { + b.Fatal(err) + } + idx.InitBlobSource(bs) + defer idx.Close() + + ch := make(chan camtypes.BlobMeta, 100) + go func() { + if err := idx.EnumerateBlobMeta(nil, ch); err != nil { + b.Fatal(err) + } + }() + n := 0 + for range ch { + n++ + } + b.Logf("Enumerated %d meta blobs", n) + return n +} + +func benchmarkKillReindex(b *testing.B, killTimeFactor int, dbfile string, + sortedProvider func(dbfile string) (sorted.KeyValue, error)) { + cmd := exec.Command("go", "test", "-c") + if strings.HasSuffix(dbfile, "sqlite.db") { + cmd = exec.Command("go", "test", "--tags", "with_sqlite", "-c") + } + if err := cmd.Run(); err != nil { + b.Fatal(err) + } + i := 2 + for { + // We start and kill the reindexing, with an increasing killTime. Until we get a full index. + if killReindex(b, dbfile, time.Duration(killTimeFactor)*time.Duration(i)*time.Second, sortedProvider) { + break + } + i++ + } +} + +// TODO(mpl): sync from each partial index to another one (the same dest at +// every loop). See at the end if dest indexer is "complete". Or anything else that +// is proof that the (incomplete) index is not corrupted. + +// killReindex starts a reindexing in a new process, and kills that process +// after killTime. It then (naively for now ?) verifies that the kv store file is +// not corrupted by reinitializing an (possibly incomplete) index (with a corpus) +// with it. If the indexing was completed before we could kill the process, it +// returns true, false otherwise. +func killReindex(b *testing.B, dbfile string, killTime time.Duration, + sortedProvider func(dbfile string) (sorted.KeyValue, error)) bool { + cmd := exec.Command(os.Args[0], "-test.run=TestChildIndexer") + cmd.Env = append(cmd.Env, "TEST_BE_CHILD=1", "TEST_BE_CHILD_DBFILE="+dbfile) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + if err := cmd.Start(); err != nil { + b.Fatal(err) + } + + waitc := make(chan error) + go func() { + waitc <- cmd.Wait() + }() + fullIndex := false + select { + case err := <-waitc: + if err == nil { + // indexer finished before we killed it + fullIndex = true + b.Logf("Finished indexing before being killed at %v", killTime) + break + } + // TODO(mpl): do better + if err.Error() != "signal: killed" { + b.Fatalf("unexpected (not killed) error from indexer process: %v %v %v", err, stdout.String(), stderr.String()) + } + case <-time.After(killTime): + if err := cmd.Process.Kill(); err != nil { + b.Fatal(err) + } + err := <-waitc + // TODO(mpl): do better + if err != nil && err.Error() != "signal: killed" { + b.Fatalf("unexpected (not killed) error from indexer process: %v %v %v", err, stdout.String(), stderr.String()) + } + } + + kv, err := sortedProvider(dbfile) + if err != nil { + b.Fatal(err) + } + idx, err := index.New(kv) + if err != nil { + b.Fatal(err) + } + bs, err := localdisk.New(filepath.Join(filepath.Dir(dbfile), "bs")) + if err != nil { + b.Fatal(err) + } + idx.InitBlobSource(bs) + if _, err := idx.KeepInMemory(); err != nil { + b.Fatal(err) + } + if err := idx.Close(); err != nil { + b.Fatal(err) + } + return fullIndex +} + +// Does all the tests (currently, populating, then reindexing) +func stress(b *testing.B, dbfile string, sortedProvider func(dbfile string) (sorted.KeyValue, error)) { + idx := populate(b, dbfile, sortedProvider) + if _, err := idx.KeepInMemory(); err != nil { + b.Fatal(err) + } + if err := idx.Close(); err != nil { + b.Fatal(err) + } + b.Logf("size of %v: %v", dbfile, size(b, dbfile)) + + idx = reindex(b, dbfile, sortedProvider) + if _, err := idx.KeepInMemory(); err != nil { + b.Fatal(err) + } + if err := idx.Close(); err != nil { + b.Fatal(err) + } + enumerateMeta(b, dbfile, sortedProvider) +} + +// trackDigestReader is an io.Reader wrapper which records the digest of what it reads. +type trackDigestReader struct { + r io.Reader + h hash.Hash +} + +func (t *trackDigestReader) Read(p []byte) (n int, err error) { + if t.h == nil { + t.h = sha1.New() + } + n, err = t.r.Read(p) + t.h.Write(p[:n]) + return +} + +func (t *trackDigestReader) Sum() string { + return fmt.Sprintf("sha1-%x", t.h.Sum(nil)) +} + +func TestChildIndexer(t *testing.T) { + if os.Getenv("TEST_BE_CHILD") != "1" { + t.Skip("not a real test; used as a child process by the benchmarks") + } + dbfile := os.Getenv("TEST_BE_CHILD_DBFILE") + if dbfile == "" { + log.Fatal("empty TEST_BE_CHILD_DBFILE") + } + if err := os.RemoveAll(dbfile); err != nil { + log.Fatal(err) + } + var kv sorted.KeyValue + var err error + switch { + case strings.HasSuffix(dbfile, "leveldb.db"): + kv, err = leveldb.NewStorage(dbfile) + case strings.HasSuffix(dbfile, "kvfile.db"): + kv, err = kvfile.NewStorage(dbfile) + case strings.HasSuffix(dbfile, "sqlite.db"): + kv, err = sqlite.NewStorage(dbfile) + default: + log.Fatalf("unknown sorted provider for %v", dbfile) + } + if err != nil { + log.Fatal(err) + } + bs, err := localdisk.New(filepath.Join(filepath.Dir(dbfile), "bs")) + if err != nil { + log.Fatal(err) + } + idx, err := index.New(kv) + if err != nil { + log.Fatal(err) + } + idx.InitBlobSource(bs) + defer func() { + if err := idx.Close(); err != nil { + log.Fatal(err) + } + }() + if err := idx.Reindex(); err != nil { + log.Fatal(err) + } +} diff --git a/pkg/server/sync.go b/pkg/server/sync.go index 284c35037..8a4623090 100644 --- a/pkg/server/sync.go +++ b/pkg/server/sync.go @@ -93,6 +93,11 @@ type SyncHandler struct { vdestBytes int64 // number of blob bytes seen on dest during validate vsrcCount int // number of blobs seen on src during validate vsrcBytes int64 // number of blob bytes seen on src during validate + + // syncLoop tries to send on alarmIdlec each time we've slept for a full + // queueSyncInterval. Initialized as a synchronous chan if we're not an + // idle sync handler, otherwise nil. + alarmIdlec chan struct{} } var ( @@ -214,7 +219,7 @@ func newSyncHandler(fromName, toName string, from blobserver.Storage, to blobReceiverEnumerator, queue sorted.KeyValue) *SyncHandler { return &SyncHandler{ - copierPoolSize: 2, + copierPoolSize: 5, from: from, to: to, fromName: fromName, @@ -225,6 +230,38 @@ func newSyncHandler(fromName, toName string, needCopy: make(map[blob.Ref]uint32), lastFail: make(map[blob.Ref]failDetail), copying: make(map[blob.Ref]*copyStatus), + alarmIdlec: make(chan struct{}), + } +} + +// NewSyncHandler returns a handler that will asynchronously and continuously +// copy blobs from src to dest, if missing on dest. +// Blobs waiting to be copied are stored on pendingQueue. srcName and destName are +// only used for status and debugging messages. +// N.B: blobs should be added to src with a method that notifies the blob hub, +// such as blobserver.Receive. +func NewSyncHandler(srcName, destName string, + src blobserver.Storage, dest blobReceiverEnumerator, + pendingQueue sorted.KeyValue) *SyncHandler { + sh := newSyncHandler(srcName, destName, src, dest, pendingQueue) + go sh.syncLoop() + blobserver.GetHub(sh.from).AddReceiveHook(sh.enqueue) + return sh +} + +// IdleWait waits until the sync handler has finished processing the currently +// queued blobs. +func (sh *SyncHandler) IdleWait() { + if sh.idle { + return + } + <-sh.alarmIdlec +} + +func (sh *SyncHandler) signalIdle() { + select { + case sh.alarmIdlec <- struct{}{}: + default: } } @@ -536,6 +573,7 @@ func (sh *SyncHandler) syncLoop() { d := queueSyncInterval - time.Since(t0) select { case <-time.After(d): + sh.signalIdle() case <-sh.wakec: } } diff --git a/pkg/sorted/sqlite/sqlitekv.go b/pkg/sorted/sqlite/sqlitekv.go index 04f928027..79fdc7ea5 100644 --- a/pkg/sorted/sqlite/sqlitekv.go +++ b/pkg/sorted/sqlite/sqlitekv.go @@ -34,6 +34,12 @@ func init() { sorted.RegisterKeyValue("sqlite", newKeyValueFromConfig) } +// NewStorage is a convenience that calls newKeyValueFromConfig +// with file as the sqlite storage file. +func NewStorage(file string) (sorted.KeyValue, error) { + return newKeyValueFromConfig(jsonconfig.Obj{"file": file}) +} + func newKeyValueFromConfig(cfg jsonconfig.Obj) (sorted.KeyValue, error) { if !compiled { return nil, ErrNotCompiled