mirror of https://github.com/perkeep/perkeep.git
Rewritten sync handler.
Fix deadlock, much better status page, show per-blob status & errors, clear errors when they've resolved themselves, fix known data race. Change-Id: I968de0de4f308ff0a410adceb181a0712800d401
This commit is contained in:
parent
023fba5629
commit
c1892b5ae5
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||
package server
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"html"
|
||||
|
@ -24,23 +25,24 @@ import (
|
|||
"io/ioutil"
|
||||
"log"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"camlistore.org/pkg/blob"
|
||||
"camlistore.org/pkg/blobserver"
|
||||
"camlistore.org/pkg/constants"
|
||||
"camlistore.org/pkg/context"
|
||||
"camlistore.org/pkg/index"
|
||||
"camlistore.org/pkg/jsonconfig"
|
||||
"camlistore.org/pkg/readerutil"
|
||||
"camlistore.org/pkg/sorted"
|
||||
"camlistore.org/pkg/types"
|
||||
)
|
||||
|
||||
var queueSyncInterval = 5 * time.Second
|
||||
|
||||
const (
|
||||
maxErrors = 20
|
||||
maxRecentErrors = 20
|
||||
queueSyncInterval = 5 * time.Second
|
||||
)
|
||||
|
||||
// The SyncHandler handles async replication in one direction between
|
||||
|
@ -50,36 +52,36 @@ const (
|
|||
// blobs; instead, it records blobs it has received and queues them
|
||||
// for async replication soon, or whenever it can.
|
||||
type SyncHandler struct {
|
||||
// TODO: rate control + tunable
|
||||
// TODO: expose copierPoolSize as tunable
|
||||
|
||||
blobserver.NoImplStorage
|
||||
|
||||
// TODO: rate control tunables
|
||||
fromName, toName string
|
||||
from blobserver.Storage
|
||||
to blobserver.BlobReceiver
|
||||
queue sorted.KeyValue
|
||||
toIndex bool // whether this sync is from a blob storage to an index
|
||||
idle bool // if true, the handler does nothing other than providing the discovery.
|
||||
copierPoolSize int
|
||||
|
||||
idle bool // if true, the handler does nothing other than providing the discovery.
|
||||
|
||||
copierPoolSize int
|
||||
|
||||
// blobc receives a blob to copy. It's an optimization only to wake up
|
||||
// the syncer from idle sleep periods and sends are non-blocking and may
|
||||
// drop blobs. The queue is the actual source of truth.
|
||||
blobc chan blob.SizedRef
|
||||
// wakec wakes up the blob syncer loop when a blob is received.
|
||||
wakec chan bool
|
||||
|
||||
mu sync.Mutex // protects following
|
||||
status string
|
||||
blobStatus map[string]fmt.Stringer // stringer called with mu held
|
||||
recentErrors []timestampedError
|
||||
copying map[blob.Ref]*copyStatus // to start time
|
||||
needCopy map[blob.Ref]uint32 // blobs needing to be copied. some might be in lastFail too.
|
||||
lastFail map[blob.Ref]failDetail // subset of needCopy that previously failed, and why
|
||||
bytesRemain int64 // sum of needCopy values
|
||||
recentErrors []blob.Ref // up to maxRecentErrors, recent first. valid if still in lastFail.
|
||||
recentCopyTime time.Time
|
||||
totalCopies int64
|
||||
totalCopyBytes int64
|
||||
totalErrors int64
|
||||
}
|
||||
|
||||
var (
|
||||
_ blobserver.Storage = (*SyncHandler)(nil)
|
||||
_ blobserver.HandlerIniter = (*SyncHandler)(nil)
|
||||
)
|
||||
|
||||
func (sh *SyncHandler) String() string {
|
||||
return fmt.Sprintf("[SyncHandler %v -> %v]", sh.fromName, sh.toName)
|
||||
}
|
||||
|
@ -88,33 +90,25 @@ func (sh *SyncHandler) logf(format string, args ...interface{}) {
|
|||
log.Printf(sh.String()+" "+format, args...)
|
||||
}
|
||||
|
||||
var (
|
||||
_ blobserver.Storage = (*SyncHandler)(nil)
|
||||
_ blobserver.HandlerIniter = (*SyncHandler)(nil)
|
||||
)
|
||||
|
||||
func init() {
|
||||
blobserver.RegisterHandlerConstructor("sync", newSyncFromConfig)
|
||||
}
|
||||
|
||||
func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, error) {
|
||||
var (
|
||||
from = conf.RequiredString("from")
|
||||
to = conf.RequiredString("to")
|
||||
fullSync = conf.OptionalBool("fullSyncOnStart", false)
|
||||
blockFullSync = conf.OptionalBool("blockingFullSyncOnStart", false)
|
||||
idle = conf.OptionalBool("idle", false)
|
||||
queueConf = conf.OptionalObject("queue")
|
||||
from = conf.RequiredString("from")
|
||||
to = conf.RequiredString("to")
|
||||
fullSync = conf.OptionalBool("fullSyncOnStart", false)
|
||||
blockFullSync = conf.OptionalBool("blockingFullSyncOnStart", false)
|
||||
idle = conf.OptionalBool("idle", false)
|
||||
queueConf = conf.OptionalObject("queue")
|
||||
copierPoolSize = conf.OptionalInt("copierPoolSize", 5)
|
||||
)
|
||||
if err := conf.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if idle {
|
||||
synch, err := createIdleSyncHandler(from, to)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return synch, nil
|
||||
return newIdleSyncHandler(from, to), nil
|
||||
}
|
||||
if len(queueConf) == 0 {
|
||||
return nil, errors.New(`Missing required "queue" object`)
|
||||
|
@ -139,20 +133,29 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler,
|
|||
}
|
||||
}
|
||||
|
||||
sh, err := createSyncHandler(from, to, fromBs, toBs, q, isToIndex)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
sh := newSyncHandler(from, to, fromBs, toBs, q)
|
||||
sh.toIndex = isToIndex
|
||||
sh.copierPoolSize = copierPoolSize
|
||||
if err := sh.readQueueToMemory(); err != nil {
|
||||
return nil, fmt.Errorf("Error reading sync queue to memory: %v", err)
|
||||
}
|
||||
|
||||
if fullSync || blockFullSync {
|
||||
sh.logf("Doing full sync")
|
||||
didFullSync := make(chan bool, 1)
|
||||
go func() {
|
||||
n := sh.runSync("queue", sh.enumerateQueuedBlobs)
|
||||
sh.logf("Queue sync copied %d blobs", n)
|
||||
n = sh.runSync("full", blobserverEnumerator(context.TODO(), fromBs))
|
||||
for {
|
||||
n := sh.runSync("queue", sh.enumeratePendingBlobs)
|
||||
if n > 0 {
|
||||
sh.logf("Queue sync copied %d blobs", n)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
n := sh.runSync("full", blobserverEnumerator(context.TODO(), fromBs))
|
||||
sh.logf("Full sync copied %d blobs", n)
|
||||
didFullSync <- true
|
||||
sh.syncQueueLoop()
|
||||
sh.syncLoop()
|
||||
}()
|
||||
if blockFullSync {
|
||||
sh.logf("Blocking startup, waiting for full sync from %q to %q", from, to)
|
||||
|
@ -160,7 +163,7 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler,
|
|||
sh.logf("Full sync complete.")
|
||||
}
|
||||
} else {
|
||||
go sh.syncQueueLoop()
|
||||
go sh.syncLoop()
|
||||
}
|
||||
|
||||
blobserver.GetHub(fromBs).AddReceiveHook(sh.enqueue)
|
||||
|
@ -180,38 +183,31 @@ func (sh *SyncHandler) InitHandler(hl blobserver.FindHandlerByTyper) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type timestampedError struct {
|
||||
t time.Time
|
||||
err error
|
||||
}
|
||||
|
||||
func createSyncHandler(fromName, toName string,
|
||||
func newSyncHandler(fromName, toName string,
|
||||
from blobserver.Storage, to blobserver.BlobReceiver,
|
||||
queue sorted.KeyValue, isToIndex bool) (*SyncHandler, error) {
|
||||
|
||||
h := &SyncHandler{
|
||||
copierPoolSize: 3,
|
||||
queue sorted.KeyValue) *SyncHandler {
|
||||
return &SyncHandler{
|
||||
copierPoolSize: 2,
|
||||
from: from,
|
||||
to: to,
|
||||
fromName: fromName,
|
||||
toName: toName,
|
||||
queue: queue,
|
||||
toIndex: isToIndex,
|
||||
blobc: make(chan blob.SizedRef, 8),
|
||||
wakec: make(chan bool),
|
||||
status: "not started",
|
||||
blobStatus: make(map[string]fmt.Stringer),
|
||||
needCopy: make(map[blob.Ref]uint32),
|
||||
lastFail: make(map[blob.Ref]failDetail),
|
||||
copying: make(map[blob.Ref]*copyStatus),
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func createIdleSyncHandler(fromName, toName string) (*SyncHandler, error) {
|
||||
h := &SyncHandler{
|
||||
func newIdleSyncHandler(fromName, toName string) *SyncHandler {
|
||||
return &SyncHandler{
|
||||
fromName: fromName,
|
||||
toName: toName,
|
||||
idle: true,
|
||||
status: "disabled",
|
||||
}
|
||||
return h, nil
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) discoveryMap() map[string]interface{} {
|
||||
|
@ -223,74 +219,102 @@ func (sh *SyncHandler) discoveryMap() map[string]interface{} {
|
|||
}
|
||||
}
|
||||
|
||||
// readQueueToMemory slurps in the pending queue from disk (or
|
||||
// wherever) to memory. Even with millions of blobs, it's not much
|
||||
// memory. The point of the persistent queue is to survive restarts if
|
||||
// the "fullSyncOnStart" option is off. With "fullSyncOnStart" set to
|
||||
// true, this is a little pointless (we'd figure out what's missing
|
||||
// eventually), but this might save us a few minutes (let us start
|
||||
// syncing missing blobs a few minutes earlier) since we won't have to
|
||||
// wait to figure out what the destination is missing.
|
||||
func (sh *SyncHandler) readQueueToMemory() error {
|
||||
errc := make(chan error, 1)
|
||||
blobs := make(chan blob.SizedRef, 16)
|
||||
intr := make(chan struct{})
|
||||
defer close(intr)
|
||||
go func() {
|
||||
errc <- sh.enumerateQueuedBlobs(blobs, intr)
|
||||
}()
|
||||
n := 0
|
||||
for sb := range blobs {
|
||||
sh.addBlobToCopy(sb)
|
||||
n++
|
||||
}
|
||||
sh.logf("Added %d pending blobs from sync queue to pending list", n)
|
||||
return <-errc
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
sh.mu.Lock()
|
||||
defer sh.mu.Unlock()
|
||||
|
||||
fmt.Fprintf(rw, "<h1>%s to %s Sync Status</h1><p><b>Current status: </b>%s</p>",
|
||||
sh.fromName, sh.toName, html.EscapeString(sh.status))
|
||||
f := func(p string, a ...interface{}) {
|
||||
fmt.Fprintf(rw, p, a...)
|
||||
}
|
||||
now := time.Now()
|
||||
f("<h1>Sync Status (for %s to %s)</h1>", sh.fromName, sh.toName)
|
||||
f("<p><b>Current status: </b>%s</p>", html.EscapeString(sh.status))
|
||||
if sh.idle {
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Fprintf(rw, "<h2>Stats:</h2><ul>")
|
||||
fmt.Fprintf(rw, "<li>Blobs copied: %d</li>", sh.totalCopies)
|
||||
fmt.Fprintf(rw, "<li>Bytes copied: %d</li>", sh.totalCopyBytes)
|
||||
f("<h2>Stats:</h2><ul>")
|
||||
f("<li>Source: %s</li>", html.EscapeString(storageDesc(sh.from)))
|
||||
f("<li>Target: %s</li>", html.EscapeString(storageDesc(sh.to)))
|
||||
f("<li>Blobs synced: %d</li>", sh.totalCopies)
|
||||
f("<li>Bytes synced: %d</li>", sh.totalCopyBytes)
|
||||
f("<li>Blobs yet to copy: %d</li>", len(sh.needCopy))
|
||||
f("<li>Bytes yet to copy: %d</li>", sh.bytesRemain)
|
||||
if !sh.recentCopyTime.IsZero() {
|
||||
fmt.Fprintf(rw, "<li>Most recent copy: %s</li>", sh.recentCopyTime.Format(time.RFC3339))
|
||||
f("<li>Most recent copy: %s (%v ago)</li>", sh.recentCopyTime.Format(time.RFC3339), now.Sub(sh.recentCopyTime))
|
||||
}
|
||||
fmt.Fprintf(rw, "<li>Copy errors: %d</li>", sh.totalErrors)
|
||||
fmt.Fprintf(rw, "</ul>")
|
||||
clarification := ""
|
||||
if len(sh.needCopy) == 0 && sh.totalErrors > 0 {
|
||||
clarification = "(all since resolved)"
|
||||
}
|
||||
f("<li>Previous copy errors: %d %s</li>", sh.totalErrors, clarification)
|
||||
f("</ul>")
|
||||
|
||||
if len(sh.blobStatus) > 0 {
|
||||
fmt.Fprintf(rw, "<h2>Current Copies:</h2><ul>")
|
||||
for blobstr, sfn := range sh.blobStatus {
|
||||
fmt.Fprintf(rw, "<li>%s: %s</li>\n",
|
||||
blobstr, html.EscapeString(sfn.String()))
|
||||
if len(sh.copying) > 0 {
|
||||
f("<h2>Currently Copying</h2><ul>")
|
||||
copying := make([]blob.Ref, 0, len(sh.copying))
|
||||
for br := range sh.copying {
|
||||
copying = append(copying, br)
|
||||
}
|
||||
fmt.Fprintf(rw, "</ul>")
|
||||
sort.Sort(blob.ByRef(copying))
|
||||
for _, br := range copying {
|
||||
f("<li>%s</li>\n", sh.copying[br])
|
||||
}
|
||||
f("</ul>")
|
||||
}
|
||||
|
||||
if len(sh.recentErrors) > 0 {
|
||||
fmt.Fprintf(rw, "<h2>Recent Errors:</h2><ul>")
|
||||
for _, te := range sh.recentErrors {
|
||||
fmt.Fprintf(rw, "<li>%s: %s</li>\n",
|
||||
te.t.Format(time.RFC3339),
|
||||
html.EscapeString(te.err.Error()))
|
||||
recentErrors := make([]blob.Ref, 0, len(sh.recentErrors))
|
||||
for _, br := range sh.recentErrors {
|
||||
if _, ok := sh.needCopy[br]; ok {
|
||||
// Only show it in the web UI if it's still a problem. Blobs that
|
||||
// have since succeeded just confused people.
|
||||
recentErrors = append(recentErrors, br)
|
||||
}
|
||||
fmt.Fprintf(rw, "</ul>")
|
||||
}
|
||||
if len(recentErrors) > 0 {
|
||||
f("<h2>Recent Errors</h2><p>Blobs that haven't successfully copied over yet, and their last errors:</p><ul>")
|
||||
for _, br := range recentErrors {
|
||||
fail := sh.lastFail[br]
|
||||
f("<li>%s: %s: %s</li>\n",
|
||||
br,
|
||||
fail.when.Format(time.RFC3339),
|
||||
html.EscapeString(fail.err.Error()))
|
||||
}
|
||||
f("</ul>")
|
||||
}
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) setStatus(s string, args ...interface{}) {
|
||||
func (sh *SyncHandler) setStatusf(s string, args ...interface{}) {
|
||||
s = time.Now().UTC().Format(time.RFC3339) + ": " + fmt.Sprintf(s, args...)
|
||||
sh.mu.Lock()
|
||||
defer sh.mu.Unlock()
|
||||
sh.status = s
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) setBlobStatus(blobref string, s fmt.Stringer) {
|
||||
sh.mu.Lock()
|
||||
defer sh.mu.Unlock()
|
||||
if s != nil {
|
||||
sh.blobStatus[blobref] = s
|
||||
} else {
|
||||
delete(sh.blobStatus, blobref)
|
||||
}
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) addErrorToLog(err error) {
|
||||
sh.logf("%v", err)
|
||||
sh.mu.Lock()
|
||||
defer sh.mu.Unlock()
|
||||
sh.recentErrors = append(sh.recentErrors, timestampedError{time.Now().UTC(), err})
|
||||
if len(sh.recentErrors) > maxErrors {
|
||||
// Kinda lame, but whatever. Only for errors, rare.
|
||||
copy(sh.recentErrors[:maxErrors], sh.recentErrors[1:maxErrors+1])
|
||||
sh.recentErrors = sh.recentErrors[:maxErrors]
|
||||
}
|
||||
}
|
||||
|
||||
type copyResult struct {
|
||||
sb blob.SizedRef
|
||||
err error
|
||||
|
@ -309,6 +333,39 @@ func blobserverEnumerator(ctx *context.Context, src blobserver.BlobEnumerator) f
|
|||
}
|
||||
}
|
||||
|
||||
// enumeratePendingBlobs yields blobs from the in-memory pending list (needCopy).
|
||||
// This differs from enumerateQueuedBlobs, which pulls in the on-disk sorted.KeyValue store.
|
||||
func (sh *SyncHandler) enumeratePendingBlobs(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
|
||||
defer close(dst)
|
||||
sh.mu.Lock()
|
||||
var toSend []blob.SizedRef
|
||||
{
|
||||
n := len(sh.needCopy)
|
||||
const maxBatch = 1000
|
||||
if n > maxBatch {
|
||||
n = maxBatch
|
||||
}
|
||||
toSend = make([]blob.SizedRef, 0, n)
|
||||
for br, size := range sh.needCopy {
|
||||
toSend = append(toSend, blob.SizedRef{br, size})
|
||||
if len(toSend) == n {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
sh.mu.Unlock()
|
||||
for _, sb := range toSend {
|
||||
select {
|
||||
case dst <- sb:
|
||||
case <-intr:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// enumerateQueuedBlobs yields blobs from the on-disk sorted.KeyValue store.
|
||||
// This differs from enumeratePendingBlobs, which sends from the in-memory pending list.
|
||||
func (sh *SyncHandler) enumerateQueuedBlobs(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
|
||||
defer close(dst)
|
||||
it := sh.queue.Find("", "")
|
||||
|
@ -328,21 +385,6 @@ func (sh *SyncHandler) enumerateQueuedBlobs(dst chan<- blob.SizedRef, intr <-cha
|
|||
return it.Close()
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) enumerateBlobc(first blob.SizedRef) func(chan<- blob.SizedRef, <-chan struct{}) error {
|
||||
return func(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
|
||||
defer close(dst)
|
||||
dst <- first
|
||||
for {
|
||||
select {
|
||||
case sb := <-sh.blobc:
|
||||
dst <- sb
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) runSync(srcName string, enumSrc func(chan<- blob.SizedRef, <-chan struct{}) error) int {
|
||||
enumch := make(chan blob.SizedRef, 8)
|
||||
errch := make(chan error, 1)
|
||||
|
@ -355,17 +397,21 @@ func (sh *SyncHandler) runSync(srcName string, enumSrc func(chan<- blob.SizedRef
|
|||
|
||||
workch := make(chan blob.SizedRef, 1000)
|
||||
resch := make(chan copyResult, 8)
|
||||
FeedWork:
|
||||
for sb := range enumch {
|
||||
toCopy++
|
||||
workch <- sb
|
||||
if toCopy <= sh.copierPoolSize {
|
||||
if toCopy < sh.copierPoolSize {
|
||||
go sh.copyWorker(resch, workch)
|
||||
}
|
||||
sh.setStatus("Enumerating queued blobs: %d", toCopy)
|
||||
select {
|
||||
case workch <- sb:
|
||||
toCopy++
|
||||
default:
|
||||
break FeedWork
|
||||
}
|
||||
}
|
||||
close(workch)
|
||||
for i := 0; i < toCopy; i++ {
|
||||
sh.setStatus("Copied %d/%d of batch of queued blobs", nCopied, toCopy)
|
||||
sh.setStatusf("Copying blobs")
|
||||
res := <-resch
|
||||
sh.mu.Lock()
|
||||
if res.err == nil {
|
||||
|
@ -380,83 +426,80 @@ func (sh *SyncHandler) runSync(srcName string, enumSrc func(chan<- blob.SizedRef
|
|||
}
|
||||
|
||||
if err := <-errch; err != nil {
|
||||
sh.addErrorToLog(fmt.Errorf("replication error for source %q, enumerate from source: %v", srcName, err))
|
||||
sh.logf("error enumerating from source: %v", err)
|
||||
}
|
||||
return nCopied
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) syncQueueLoop() {
|
||||
func (sh *SyncHandler) syncLoop() {
|
||||
for {
|
||||
t0 := time.Now()
|
||||
|
||||
for sh.runSync(sh.fromName, sh.enumerateQueuedBlobs) > 0 {
|
||||
for sh.runSync(sh.fromName, sh.enumeratePendingBlobs) > 0 {
|
||||
// Loop, before sleeping.
|
||||
}
|
||||
sh.setStatus("Sleeping briefly before next long poll.")
|
||||
sh.setStatusf("Sleeping briefly before next long poll.")
|
||||
|
||||
d := queueSyncInterval - time.Since(t0)
|
||||
select {
|
||||
case <-time.After(d):
|
||||
case sb := <-sh.blobc:
|
||||
// Blob arrived.
|
||||
sh.runSync(sh.fromName, sh.enumerateBlobc(sb))
|
||||
case <-sh.wakec:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) copyWorker(res chan<- copyResult, work <-chan blob.SizedRef) {
|
||||
for sb := range work {
|
||||
res <- copyResult{sb, sh.copyBlob(sb, 0)}
|
||||
res <- copyResult{sb, sh.copyBlob(sb)}
|
||||
}
|
||||
}
|
||||
|
||||
type statusFunc func() string
|
||||
func (sh *SyncHandler) copyBlob(sb blob.SizedRef) (err error) {
|
||||
cs := sh.newCopyStatus(sb)
|
||||
defer func() { cs.setError(err) }()
|
||||
br := sb.Ref
|
||||
|
||||
func (sf statusFunc) String() string { return sf() }
|
||||
sh.mu.Lock()
|
||||
sh.copying[br] = cs
|
||||
sh.mu.Unlock()
|
||||
|
||||
type status string
|
||||
|
||||
func (s status) String() string { return string(s) }
|
||||
|
||||
func (sh *SyncHandler) copyBlob(sb blob.SizedRef, tryCount int) error {
|
||||
key := sb.Ref.String()
|
||||
set := func(s fmt.Stringer) {
|
||||
sh.setBlobStatus(key, s)
|
||||
}
|
||||
defer set(nil)
|
||||
|
||||
errorf := func(s string, args ...interface{}) error {
|
||||
// TODO: increment error stats
|
||||
err := fmt.Errorf("replication error for blob %s: "+s,
|
||||
append([]interface{}{sb.Ref}, args...)...)
|
||||
sh.addErrorToLog(err)
|
||||
return err
|
||||
if sb.Size > constants.MaxBlobSize {
|
||||
return fmt.Errorf("blob size %d too large; max blob size is %d", sb.Size, constants.MaxBlobSize)
|
||||
}
|
||||
|
||||
set(status("sending GET to source"))
|
||||
rc, fromSize, err := sh.from.FetchStreaming(sb.Ref)
|
||||
cs.setStatus(statusFetching)
|
||||
rc, fromSize, err := sh.from.FetchStreaming(br)
|
||||
if err != nil {
|
||||
return errorf("source fetch: %v", err)
|
||||
return fmt.Errorf("source fetch: %v", err)
|
||||
}
|
||||
defer rc.Close()
|
||||
if fromSize != sb.Size {
|
||||
return errorf("source fetch size mismatch: get=%d, enumerate=%d", fromSize, sb.Size)
|
||||
rc.Close()
|
||||
return fmt.Errorf("source fetch size mismatch: get=%d, enumerate=%d", fromSize, sb.Size)
|
||||
}
|
||||
|
||||
bytesCopied := int64(0) // TODO: data race, accessed without locking in statusFunc below.
|
||||
set(statusFunc(func() string {
|
||||
return fmt.Sprintf("copying: %d/%d bytes", bytesCopied, sb.Size)
|
||||
}))
|
||||
newsb, err := sh.to.ReceiveBlob(sb.Ref, readerutil.CountingReader{rc, &bytesCopied})
|
||||
buf := make([]byte, fromSize)
|
||||
hash := br.Hash()
|
||||
cs.setStatus(statusReading)
|
||||
n, err := io.ReadFull(io.TeeReader(rc,
|
||||
io.MultiWriter(
|
||||
incrWriter{cs, &cs.nread},
|
||||
hash,
|
||||
)), buf)
|
||||
rc.Close()
|
||||
if err != nil {
|
||||
return errorf("dest write: %v", err)
|
||||
return fmt.Errorf("Read error after %d/%d bytes: %v", n, fromSize, err)
|
||||
}
|
||||
if !br.HashMatches(hash) {
|
||||
return fmt.Errorf("Read data has unexpected digest %x", hash.Sum(nil))
|
||||
}
|
||||
|
||||
cs.setStatus(statusWriting)
|
||||
newsb, err := sh.to.ReceiveBlob(br, io.TeeReader(bytes.NewReader(buf), incrWriter{cs, &cs.nwrite}))
|
||||
if err != nil {
|
||||
return fmt.Errorf("dest write: %v", err)
|
||||
}
|
||||
if newsb.Size != sb.Size {
|
||||
return errorf("write size mismatch: source_read=%d but dest_write=%d", sb.Size, newsb.Size)
|
||||
}
|
||||
set(status("copied; removing from queue"))
|
||||
if err := sh.queue.Delete(sb.Ref.String()); err != nil {
|
||||
return errorf("queue delete: %v", err)
|
||||
return fmt.Errorf("write size mismatch: source_read=%d but dest_write=%d", sb.Size, newsb.Size)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -470,7 +513,29 @@ func (sh *SyncHandler) ReceiveBlob(br blob.Ref, r io.Reader) (sb blob.SizedRef,
|
|||
return sb, sh.enqueue(sb)
|
||||
}
|
||||
|
||||
// addBlobToCopy adds a blob to copy to memory (not to disk: that's enqueue).
|
||||
// It returns true if it was added, or false if it was a duplicate.
|
||||
func (sh *SyncHandler) addBlobToCopy(sb blob.SizedRef) bool {
|
||||
sh.mu.Lock()
|
||||
defer sh.mu.Unlock()
|
||||
|
||||
sh.needCopy[sb.Ref] = sb.Size
|
||||
sh.bytesRemain += int64(sb.Size)
|
||||
|
||||
// Non-blocking send to wake up looping goroutine if it's
|
||||
// sleeping...
|
||||
select {
|
||||
case sh.wakec <- true:
|
||||
default:
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) enqueue(sb blob.SizedRef) error {
|
||||
if !sh.addBlobToCopy(sb) {
|
||||
// Dup
|
||||
return nil
|
||||
}
|
||||
// TODO: include current time in encoded value, to attempt to
|
||||
// do in-order delivery to remote side later? Possible
|
||||
// friendly optimization later. Might help peer's indexer have
|
||||
|
@ -478,15 +543,149 @@ func (sh *SyncHandler) enqueue(sb blob.SizedRef) error {
|
|||
if err := sh.queue.Set(sb.Ref.String(), fmt.Sprint(sb.Size)); err != nil {
|
||||
return err
|
||||
}
|
||||
// Non-blocking send to wake up looping goroutine if it's
|
||||
// sleeping...
|
||||
select {
|
||||
case sh.blobc <- sb:
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) newCopyStatus(sb blob.SizedRef) *copyStatus {
|
||||
now := time.Now()
|
||||
return ©Status{
|
||||
sh: sh,
|
||||
sb: sb,
|
||||
state: statusStarting,
|
||||
start: now,
|
||||
t: now,
|
||||
}
|
||||
}
|
||||
|
||||
// copyStatus is an in-progress copy.
|
||||
type copyStatus struct {
|
||||
sh *SyncHandler
|
||||
sb blob.SizedRef
|
||||
start time.Time
|
||||
|
||||
mu sync.Mutex
|
||||
state string // one of statusFoo, below
|
||||
t time.Time // last status update time
|
||||
nread uint32
|
||||
nwrite uint32
|
||||
}
|
||||
|
||||
const (
|
||||
statusStarting = "starting"
|
||||
statusFetching = "fetching source"
|
||||
statusReading = "reading"
|
||||
statusWriting = "writing"
|
||||
)
|
||||
|
||||
func (cs *copyStatus) setStatus(s string) {
|
||||
now := time.Now()
|
||||
cs.mu.Lock()
|
||||
defer cs.mu.Unlock()
|
||||
cs.state = s
|
||||
cs.t = now
|
||||
}
|
||||
|
||||
func (cs *copyStatus) setError(err error) {
|
||||
now := time.Now()
|
||||
sh := cs.sh
|
||||
br := cs.sb.Ref
|
||||
if err == nil {
|
||||
// This is somewhat slow, so do it before we acquire the lock.
|
||||
// The queue is thread-safe.
|
||||
if derr := sh.queue.Delete(br.String()); derr != nil {
|
||||
sh.logf("queue delete of %v error: %v", cs.sb.Ref, derr)
|
||||
}
|
||||
}
|
||||
|
||||
sh.mu.Lock()
|
||||
defer sh.mu.Unlock()
|
||||
if _, needCopy := sh.needCopy[br]; !needCopy {
|
||||
sh.logf("IGNORING DUPLICATE UPLOAD of %v = %v", br, err)
|
||||
return
|
||||
}
|
||||
delete(sh.copying, br)
|
||||
if err == nil {
|
||||
delete(sh.needCopy, br)
|
||||
delete(sh.lastFail, br)
|
||||
sh.recentCopyTime = now
|
||||
sh.totalCopies++
|
||||
sh.totalCopyBytes += int64(cs.sb.Size)
|
||||
sh.bytesRemain -= int64(cs.sb.Size)
|
||||
return
|
||||
}
|
||||
|
||||
sh.logf("error copying %v: %v", br, err)
|
||||
sh.lastFail[br] = failDetail{
|
||||
when: now,
|
||||
err: err,
|
||||
}
|
||||
|
||||
// Kinda lame. TODO: use a ring buffer or container/list instead.
|
||||
if len(sh.recentErrors) == maxRecentErrors {
|
||||
copy(sh.recentErrors[1:], sh.recentErrors)
|
||||
sh.recentErrors = sh.recentErrors[:maxRecentErrors-1]
|
||||
}
|
||||
sh.recentErrors = append(sh.recentErrors, br)
|
||||
}
|
||||
|
||||
func (cs *copyStatus) String() string {
|
||||
var buf bytes.Buffer
|
||||
now := time.Now()
|
||||
buf.WriteString(cs.sb.Ref.String())
|
||||
buf.WriteString(": ")
|
||||
|
||||
cs.mu.Lock()
|
||||
defer cs.mu.Unlock()
|
||||
sinceStart := now.Sub(cs.start)
|
||||
sinceLast := now.Sub(cs.t)
|
||||
|
||||
switch cs.state {
|
||||
case statusReading:
|
||||
buf.WriteString(cs.state)
|
||||
fmt.Fprintf(&buf, " (%d/%dB)", cs.nread, cs.sb.Size)
|
||||
case statusWriting:
|
||||
if cs.nwrite == cs.sb.Size {
|
||||
buf.WriteString("wrote all, waiting ack")
|
||||
} else {
|
||||
buf.WriteString(cs.state)
|
||||
fmt.Fprintf(&buf, " (%d/%dB)", cs.nwrite, cs.sb.Size)
|
||||
}
|
||||
default:
|
||||
buf.WriteString(cs.state)
|
||||
|
||||
}
|
||||
if sinceLast > 5*time.Second {
|
||||
fmt.Fprintf(&buf, ", last change %v ago (total elapsed %v)", sinceLast, sinceStart)
|
||||
}
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
type failDetail struct {
|
||||
when time.Time
|
||||
err error
|
||||
}
|
||||
|
||||
// incrWriter is an io.Writer that locks mu and increments *n.
|
||||
type incrWriter struct {
|
||||
cs *copyStatus
|
||||
n *uint32
|
||||
}
|
||||
|
||||
func (w incrWriter) Write(p []byte) (n int, err error) {
|
||||
w.cs.mu.Lock()
|
||||
*w.n += uint32(len(p))
|
||||
w.cs.t = time.Now()
|
||||
w.cs.mu.Unlock()
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
func storageDesc(v interface{}) string {
|
||||
if s, ok := v.(fmt.Stringer); ok {
|
||||
return s.String()
|
||||
}
|
||||
return fmt.Sprintf("%T", v)
|
||||
}
|
||||
|
||||
// TODO(bradfitz): implement these? what do they mean? possibilities:
|
||||
// a) proxy to sh.from
|
||||
// b) proxy to sh.to
|
||||
|
@ -495,7 +694,26 @@ func (sh *SyncHandler) enqueue(sb blob.SizedRef) error {
|
|||
// sh.from has, sh.to doesn't have, and isn't in the queue to be replicated.
|
||||
//
|
||||
// For now, don't implement them. Wait until we need them.
|
||||
//
|
||||
// func (sh *SyncHandler) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
|
||||
// func (sh *SyncHandler) FetchStreaming(br blob.Ref) (io.ReadCloser, uint32, error) {
|
||||
// func (sh *SyncHandler) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
|
||||
|
||||
func (sh *SyncHandler) Fetch(blob.Ref) (file types.ReadSeekCloser, size uint32, err error) {
|
||||
panic("Unimplemeted blobserver.Fetch called")
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) FetchStreaming(blob.Ref) (file io.ReadCloser, size uint32, err error) {
|
||||
panic("Unimplemeted blobserver.FetchStreaming called")
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
|
||||
sh.logf("Unexpected StatBlobs call")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
|
||||
defer close(dest)
|
||||
sh.logf("Unexpected EnumerateBlobs call")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sh *SyncHandler) RemoveBlobs(blobs []blob.Ref) error {
|
||||
panic("Unimplemeted RemoveBlobs")
|
||||
}
|
||||
|
|
|
@ -0,0 +1,37 @@
|
|||
<h1>Release 0.8 ("XXXXXX")</h1>
|
||||
|
||||
<p><b>2014-mm-dd</b></p>
|
||||
|
||||
<p></p>
|
||||
|
||||
<center>
|
||||
<div style='font-size:18pt;font-weight:bold;'><a href="https://github.com/bradfitz/camlistore/archive/0.8.zip">camlistore-0.8.zip</a></div>
|
||||
<div style='margin-top:0.5em;'>
|
||||
Or browse at Github: <a href="https://github.com/bradfitz/camlistore/tree/0.8">github.com/bradfitz/camlistore/tree/0.8</a></div>
|
||||
</center>
|
||||
|
||||
<h2>Release stats</h2>
|
||||
<p>n total committers over n commits since <a href="/docs/release/0.7">0.7</a> (just %v ago), including TODO</p>
|
||||
|
||||
<p>Thank you!</p>
|
||||
|
||||
<h2>New in 0.8</h2>
|
||||
<h3>Apps</h3>
|
||||
<ul>
|
||||
</ul>
|
||||
<h3>Web UI</h3>
|
||||
<ul>
|
||||
</ul>
|
||||
<h3>Storage</h3>
|
||||
<ul>
|
||||
<li>Rewritten blob sync handler (the server's background async replication)</li>
|
||||
</ul>
|
||||
<h3>Search</h3>
|
||||
<ul>
|
||||
</ul>
|
||||
<h3>Tools</h3>
|
||||
<ul>
|
||||
</ul>
|
||||
<h3>General</h3>
|
||||
<ul>
|
||||
</ul>
|
Loading…
Reference in New Issue