diff --git a/pkg/server/sync.go b/pkg/server/sync.go
index 4a4405ed6..22b3a28ce 100644
--- a/pkg/server/sync.go
+++ b/pkg/server/sync.go
@@ -17,6 +17,7 @@ limitations under the License.
package server
import (
+ "bytes"
"errors"
"fmt"
"html"
@@ -24,23 +25,24 @@ import (
"io/ioutil"
"log"
"net/http"
+ "sort"
"strconv"
"sync"
"time"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
+ "camlistore.org/pkg/constants"
"camlistore.org/pkg/context"
"camlistore.org/pkg/index"
"camlistore.org/pkg/jsonconfig"
- "camlistore.org/pkg/readerutil"
"camlistore.org/pkg/sorted"
+ "camlistore.org/pkg/types"
)
-var queueSyncInterval = 5 * time.Second
-
const (
- maxErrors = 20
+ maxRecentErrors = 20
+ queueSyncInterval = 5 * time.Second
)
// The SyncHandler handles async replication in one direction between
@@ -50,36 +52,36 @@ const (
// blobs; instead, it records blobs it has received and queues them
// for async replication soon, or whenever it can.
type SyncHandler struct {
- // TODO: rate control + tunable
- // TODO: expose copierPoolSize as tunable
-
- blobserver.NoImplStorage
-
+ // TODO: rate control tunables
fromName, toName string
from blobserver.Storage
to blobserver.BlobReceiver
queue sorted.KeyValue
toIndex bool // whether this sync is from a blob storage to an index
+ idle bool // if true, the handler does nothing other than providing the discovery.
+ copierPoolSize int
- idle bool // if true, the handler does nothing other than providing the discovery.
-
- copierPoolSize int
-
- // blobc receives a blob to copy. It's an optimization only to wake up
- // the syncer from idle sleep periods and sends are non-blocking and may
- // drop blobs. The queue is the actual source of truth.
- blobc chan blob.SizedRef
+ // wakec wakes up the blob syncer loop when a blob is received.
+ wakec chan bool
mu sync.Mutex // protects following
status string
- blobStatus map[string]fmt.Stringer // stringer called with mu held
- recentErrors []timestampedError
+ copying map[blob.Ref]*copyStatus // to start time
+ needCopy map[blob.Ref]uint32 // blobs needing to be copied. some might be in lastFail too.
+ lastFail map[blob.Ref]failDetail // subset of needCopy that previously failed, and why
+ bytesRemain int64 // sum of needCopy values
+ recentErrors []blob.Ref // up to maxRecentErrors, recent first. valid if still in lastFail.
recentCopyTime time.Time
totalCopies int64
totalCopyBytes int64
totalErrors int64
}
+var (
+ _ blobserver.Storage = (*SyncHandler)(nil)
+ _ blobserver.HandlerIniter = (*SyncHandler)(nil)
+)
+
func (sh *SyncHandler) String() string {
return fmt.Sprintf("[SyncHandler %v -> %v]", sh.fromName, sh.toName)
}
@@ -88,33 +90,25 @@ func (sh *SyncHandler) logf(format string, args ...interface{}) {
log.Printf(sh.String()+" "+format, args...)
}
-var (
- _ blobserver.Storage = (*SyncHandler)(nil)
- _ blobserver.HandlerIniter = (*SyncHandler)(nil)
-)
-
func init() {
blobserver.RegisterHandlerConstructor("sync", newSyncFromConfig)
}
func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, error) {
var (
- from = conf.RequiredString("from")
- to = conf.RequiredString("to")
- fullSync = conf.OptionalBool("fullSyncOnStart", false)
- blockFullSync = conf.OptionalBool("blockingFullSyncOnStart", false)
- idle = conf.OptionalBool("idle", false)
- queueConf = conf.OptionalObject("queue")
+ from = conf.RequiredString("from")
+ to = conf.RequiredString("to")
+ fullSync = conf.OptionalBool("fullSyncOnStart", false)
+ blockFullSync = conf.OptionalBool("blockingFullSyncOnStart", false)
+ idle = conf.OptionalBool("idle", false)
+ queueConf = conf.OptionalObject("queue")
+ copierPoolSize = conf.OptionalInt("copierPoolSize", 5)
)
if err := conf.Validate(); err != nil {
return nil, err
}
if idle {
- synch, err := createIdleSyncHandler(from, to)
- if err != nil {
- return nil, err
- }
- return synch, nil
+ return newIdleSyncHandler(from, to), nil
}
if len(queueConf) == 0 {
return nil, errors.New(`Missing required "queue" object`)
@@ -139,20 +133,29 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler,
}
}
- sh, err := createSyncHandler(from, to, fromBs, toBs, q, isToIndex)
- if err != nil {
- return nil, err
+ sh := newSyncHandler(from, to, fromBs, toBs, q)
+ sh.toIndex = isToIndex
+ sh.copierPoolSize = copierPoolSize
+ if err := sh.readQueueToMemory(); err != nil {
+ return nil, fmt.Errorf("Error reading sync queue to memory: %v", err)
}
if fullSync || blockFullSync {
+ sh.logf("Doing full sync")
didFullSync := make(chan bool, 1)
go func() {
- n := sh.runSync("queue", sh.enumerateQueuedBlobs)
- sh.logf("Queue sync copied %d blobs", n)
- n = sh.runSync("full", blobserverEnumerator(context.TODO(), fromBs))
+ for {
+ n := sh.runSync("queue", sh.enumeratePendingBlobs)
+ if n > 0 {
+ sh.logf("Queue sync copied %d blobs", n)
+ continue
+ }
+ break
+ }
+ n := sh.runSync("full", blobserverEnumerator(context.TODO(), fromBs))
sh.logf("Full sync copied %d blobs", n)
didFullSync <- true
- sh.syncQueueLoop()
+ sh.syncLoop()
}()
if blockFullSync {
sh.logf("Blocking startup, waiting for full sync from %q to %q", from, to)
@@ -160,7 +163,7 @@ func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler,
sh.logf("Full sync complete.")
}
} else {
- go sh.syncQueueLoop()
+ go sh.syncLoop()
}
blobserver.GetHub(fromBs).AddReceiveHook(sh.enqueue)
@@ -180,38 +183,31 @@ func (sh *SyncHandler) InitHandler(hl blobserver.FindHandlerByTyper) error {
return nil
}
-type timestampedError struct {
- t time.Time
- err error
-}
-
-func createSyncHandler(fromName, toName string,
+func newSyncHandler(fromName, toName string,
from blobserver.Storage, to blobserver.BlobReceiver,
- queue sorted.KeyValue, isToIndex bool) (*SyncHandler, error) {
-
- h := &SyncHandler{
- copierPoolSize: 3,
+ queue sorted.KeyValue) *SyncHandler {
+ return &SyncHandler{
+ copierPoolSize: 2,
from: from,
to: to,
fromName: fromName,
toName: toName,
queue: queue,
- toIndex: isToIndex,
- blobc: make(chan blob.SizedRef, 8),
+ wakec: make(chan bool),
status: "not started",
- blobStatus: make(map[string]fmt.Stringer),
+ needCopy: make(map[blob.Ref]uint32),
+ lastFail: make(map[blob.Ref]failDetail),
+ copying: make(map[blob.Ref]*copyStatus),
}
- return h, nil
}
-func createIdleSyncHandler(fromName, toName string) (*SyncHandler, error) {
- h := &SyncHandler{
+func newIdleSyncHandler(fromName, toName string) *SyncHandler {
+ return &SyncHandler{
fromName: fromName,
toName: toName,
idle: true,
status: "disabled",
}
- return h, nil
}
func (sh *SyncHandler) discoveryMap() map[string]interface{} {
@@ -223,74 +219,102 @@ func (sh *SyncHandler) discoveryMap() map[string]interface{} {
}
}
+// readQueueToMemory slurps in the pending queue from disk (or
+// wherever) to memory. Even with millions of blobs, it's not much
+// memory. The point of the persistent queue is to survive restarts if
+// the "fullSyncOnStart" option is off. With "fullSyncOnStart" set to
+// true, this is a little pointless (we'd figure out what's missing
+// eventually), but this might save us a few minutes (let us start
+// syncing missing blobs a few minutes earlier) since we won't have to
+// wait to figure out what the destination is missing.
+func (sh *SyncHandler) readQueueToMemory() error {
+ errc := make(chan error, 1)
+ blobs := make(chan blob.SizedRef, 16)
+ intr := make(chan struct{})
+ defer close(intr)
+ go func() {
+ errc <- sh.enumerateQueuedBlobs(blobs, intr)
+ }()
+ n := 0
+ for sb := range blobs {
+ sh.addBlobToCopy(sb)
+ n++
+ }
+ sh.logf("Added %d pending blobs from sync queue to pending list", n)
+ return <-errc
+}
+
func (sh *SyncHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
sh.mu.Lock()
defer sh.mu.Unlock()
-
- fmt.Fprintf(rw, "
%s to %s Sync Status
Current status: %s
",
- sh.fromName, sh.toName, html.EscapeString(sh.status))
+ f := func(p string, a ...interface{}) {
+ fmt.Fprintf(rw, p, a...)
+ }
+ now := time.Now()
+ f("Sync Status (for %s to %s)
", sh.fromName, sh.toName)
+ f("Current status: %s
", html.EscapeString(sh.status))
if sh.idle {
return
}
- fmt.Fprintf(rw, "Stats:
")
- fmt.Fprintf(rw, "- Blobs copied: %d
", sh.totalCopies)
- fmt.Fprintf(rw, "- Bytes copied: %d
", sh.totalCopyBytes)
+ f("Stats:
")
+ f("- Source: %s
", html.EscapeString(storageDesc(sh.from)))
+ f("- Target: %s
", html.EscapeString(storageDesc(sh.to)))
+ f("- Blobs synced: %d
", sh.totalCopies)
+ f("- Bytes synced: %d
", sh.totalCopyBytes)
+ f("- Blobs yet to copy: %d
", len(sh.needCopy))
+ f("- Bytes yet to copy: %d
", sh.bytesRemain)
if !sh.recentCopyTime.IsZero() {
- fmt.Fprintf(rw, "- Most recent copy: %s
", sh.recentCopyTime.Format(time.RFC3339))
+ f("- Most recent copy: %s (%v ago)
", sh.recentCopyTime.Format(time.RFC3339), now.Sub(sh.recentCopyTime))
}
- fmt.Fprintf(rw, "- Copy errors: %d
", sh.totalErrors)
- fmt.Fprintf(rw, "
")
+ clarification := ""
+ if len(sh.needCopy) == 0 && sh.totalErrors > 0 {
+ clarification = "(all since resolved)"
+ }
+ f("- Previous copy errors: %d %s
", sh.totalErrors, clarification)
+ f("
")
- if len(sh.blobStatus) > 0 {
- fmt.Fprintf(rw, "Current Copies:
")
- for blobstr, sfn := range sh.blobStatus {
- fmt.Fprintf(rw, "- %s: %s
\n",
- blobstr, html.EscapeString(sfn.String()))
+ if len(sh.copying) > 0 {
+ f("Currently Copying
")
+ copying := make([]blob.Ref, 0, len(sh.copying))
+ for br := range sh.copying {
+ copying = append(copying, br)
}
- fmt.Fprintf(rw, "
")
+ sort.Sort(blob.ByRef(copying))
+ for _, br := range copying {
+ f("- %s
\n", sh.copying[br])
+ }
+ f("
")
}
- if len(sh.recentErrors) > 0 {
- fmt.Fprintf(rw, "Recent Errors:
")
- for _, te := range sh.recentErrors {
- fmt.Fprintf(rw, "- %s: %s
\n",
- te.t.Format(time.RFC3339),
- html.EscapeString(te.err.Error()))
+ recentErrors := make([]blob.Ref, 0, len(sh.recentErrors))
+ for _, br := range sh.recentErrors {
+ if _, ok := sh.needCopy[br]; ok {
+ // Only show it in the web UI if it's still a problem. Blobs that
+ // have since succeeded just confused people.
+ recentErrors = append(recentErrors, br)
}
- fmt.Fprintf(rw, "
")
+ }
+ if len(recentErrors) > 0 {
+ f("Recent Errors
Blobs that haven't successfully copied over yet, and their last errors:
")
+ for _, br := range recentErrors {
+ fail := sh.lastFail[br]
+ f("- %s: %s: %s
\n",
+ br,
+ fail.when.Format(time.RFC3339),
+ html.EscapeString(fail.err.Error()))
+ }
+ f("
")
}
}
-func (sh *SyncHandler) setStatus(s string, args ...interface{}) {
+func (sh *SyncHandler) setStatusf(s string, args ...interface{}) {
s = time.Now().UTC().Format(time.RFC3339) + ": " + fmt.Sprintf(s, args...)
sh.mu.Lock()
defer sh.mu.Unlock()
sh.status = s
}
-func (sh *SyncHandler) setBlobStatus(blobref string, s fmt.Stringer) {
- sh.mu.Lock()
- defer sh.mu.Unlock()
- if s != nil {
- sh.blobStatus[blobref] = s
- } else {
- delete(sh.blobStatus, blobref)
- }
-}
-
-func (sh *SyncHandler) addErrorToLog(err error) {
- sh.logf("%v", err)
- sh.mu.Lock()
- defer sh.mu.Unlock()
- sh.recentErrors = append(sh.recentErrors, timestampedError{time.Now().UTC(), err})
- if len(sh.recentErrors) > maxErrors {
- // Kinda lame, but whatever. Only for errors, rare.
- copy(sh.recentErrors[:maxErrors], sh.recentErrors[1:maxErrors+1])
- sh.recentErrors = sh.recentErrors[:maxErrors]
- }
-}
-
type copyResult struct {
sb blob.SizedRef
err error
@@ -309,6 +333,39 @@ func blobserverEnumerator(ctx *context.Context, src blobserver.BlobEnumerator) f
}
}
+// enumeratePendingBlobs yields blobs from the in-memory pending list (needCopy).
+// This differs from enumerateQueuedBlobs, which pulls in the on-disk sorted.KeyValue store.
+func (sh *SyncHandler) enumeratePendingBlobs(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
+ defer close(dst)
+ sh.mu.Lock()
+ var toSend []blob.SizedRef
+ {
+ n := len(sh.needCopy)
+ const maxBatch = 1000
+ if n > maxBatch {
+ n = maxBatch
+ }
+ toSend = make([]blob.SizedRef, 0, n)
+ for br, size := range sh.needCopy {
+ toSend = append(toSend, blob.SizedRef{br, size})
+ if len(toSend) == n {
+ break
+ }
+ }
+ }
+ sh.mu.Unlock()
+ for _, sb := range toSend {
+ select {
+ case dst <- sb:
+ case <-intr:
+ return nil
+ }
+ }
+ return nil
+}
+
+// enumerateQueuedBlobs yields blobs from the on-disk sorted.KeyValue store.
+// This differs from enumeratePendingBlobs, which sends from the in-memory pending list.
func (sh *SyncHandler) enumerateQueuedBlobs(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
defer close(dst)
it := sh.queue.Find("", "")
@@ -328,21 +385,6 @@ func (sh *SyncHandler) enumerateQueuedBlobs(dst chan<- blob.SizedRef, intr <-cha
return it.Close()
}
-func (sh *SyncHandler) enumerateBlobc(first blob.SizedRef) func(chan<- blob.SizedRef, <-chan struct{}) error {
- return func(dst chan<- blob.SizedRef, intr <-chan struct{}) error {
- defer close(dst)
- dst <- first
- for {
- select {
- case sb := <-sh.blobc:
- dst <- sb
- default:
- return nil
- }
- }
- }
-}
-
func (sh *SyncHandler) runSync(srcName string, enumSrc func(chan<- blob.SizedRef, <-chan struct{}) error) int {
enumch := make(chan blob.SizedRef, 8)
errch := make(chan error, 1)
@@ -355,17 +397,21 @@ func (sh *SyncHandler) runSync(srcName string, enumSrc func(chan<- blob.SizedRef
workch := make(chan blob.SizedRef, 1000)
resch := make(chan copyResult, 8)
+FeedWork:
for sb := range enumch {
- toCopy++
- workch <- sb
- if toCopy <= sh.copierPoolSize {
+ if toCopy < sh.copierPoolSize {
go sh.copyWorker(resch, workch)
}
- sh.setStatus("Enumerating queued blobs: %d", toCopy)
+ select {
+ case workch <- sb:
+ toCopy++
+ default:
+ break FeedWork
+ }
}
close(workch)
for i := 0; i < toCopy; i++ {
- sh.setStatus("Copied %d/%d of batch of queued blobs", nCopied, toCopy)
+ sh.setStatusf("Copying blobs")
res := <-resch
sh.mu.Lock()
if res.err == nil {
@@ -380,83 +426,80 @@ func (sh *SyncHandler) runSync(srcName string, enumSrc func(chan<- blob.SizedRef
}
if err := <-errch; err != nil {
- sh.addErrorToLog(fmt.Errorf("replication error for source %q, enumerate from source: %v", srcName, err))
+ sh.logf("error enumerating from source: %v", err)
}
return nCopied
}
-func (sh *SyncHandler) syncQueueLoop() {
+func (sh *SyncHandler) syncLoop() {
for {
t0 := time.Now()
- for sh.runSync(sh.fromName, sh.enumerateQueuedBlobs) > 0 {
+ for sh.runSync(sh.fromName, sh.enumeratePendingBlobs) > 0 {
// Loop, before sleeping.
}
- sh.setStatus("Sleeping briefly before next long poll.")
+ sh.setStatusf("Sleeping briefly before next long poll.")
d := queueSyncInterval - time.Since(t0)
select {
case <-time.After(d):
- case sb := <-sh.blobc:
- // Blob arrived.
- sh.runSync(sh.fromName, sh.enumerateBlobc(sb))
+ case <-sh.wakec:
}
}
}
func (sh *SyncHandler) copyWorker(res chan<- copyResult, work <-chan blob.SizedRef) {
for sb := range work {
- res <- copyResult{sb, sh.copyBlob(sb, 0)}
+ res <- copyResult{sb, sh.copyBlob(sb)}
}
}
-type statusFunc func() string
+func (sh *SyncHandler) copyBlob(sb blob.SizedRef) (err error) {
+ cs := sh.newCopyStatus(sb)
+ defer func() { cs.setError(err) }()
+ br := sb.Ref
-func (sf statusFunc) String() string { return sf() }
+ sh.mu.Lock()
+ sh.copying[br] = cs
+ sh.mu.Unlock()
-type status string
-
-func (s status) String() string { return string(s) }
-
-func (sh *SyncHandler) copyBlob(sb blob.SizedRef, tryCount int) error {
- key := sb.Ref.String()
- set := func(s fmt.Stringer) {
- sh.setBlobStatus(key, s)
- }
- defer set(nil)
-
- errorf := func(s string, args ...interface{}) error {
- // TODO: increment error stats
- err := fmt.Errorf("replication error for blob %s: "+s,
- append([]interface{}{sb.Ref}, args...)...)
- sh.addErrorToLog(err)
- return err
+ if sb.Size > constants.MaxBlobSize {
+ return fmt.Errorf("blob size %d too large; max blob size is %d", sb.Size, constants.MaxBlobSize)
}
- set(status("sending GET to source"))
- rc, fromSize, err := sh.from.FetchStreaming(sb.Ref)
+ cs.setStatus(statusFetching)
+ rc, fromSize, err := sh.from.FetchStreaming(br)
if err != nil {
- return errorf("source fetch: %v", err)
+ return fmt.Errorf("source fetch: %v", err)
}
- defer rc.Close()
if fromSize != sb.Size {
- return errorf("source fetch size mismatch: get=%d, enumerate=%d", fromSize, sb.Size)
+ rc.Close()
+ return fmt.Errorf("source fetch size mismatch: get=%d, enumerate=%d", fromSize, sb.Size)
}
- bytesCopied := int64(0) // TODO: data race, accessed without locking in statusFunc below.
- set(statusFunc(func() string {
- return fmt.Sprintf("copying: %d/%d bytes", bytesCopied, sb.Size)
- }))
- newsb, err := sh.to.ReceiveBlob(sb.Ref, readerutil.CountingReader{rc, &bytesCopied})
+ buf := make([]byte, fromSize)
+ hash := br.Hash()
+ cs.setStatus(statusReading)
+ n, err := io.ReadFull(io.TeeReader(rc,
+ io.MultiWriter(
+ incrWriter{cs, &cs.nread},
+ hash,
+ )), buf)
+ rc.Close()
if err != nil {
- return errorf("dest write: %v", err)
+ return fmt.Errorf("Read error after %d/%d bytes: %v", n, fromSize, err)
+ }
+ if !br.HashMatches(hash) {
+ return fmt.Errorf("Read data has unexpected digest %x", hash.Sum(nil))
+ }
+
+ cs.setStatus(statusWriting)
+ newsb, err := sh.to.ReceiveBlob(br, io.TeeReader(bytes.NewReader(buf), incrWriter{cs, &cs.nwrite}))
+ if err != nil {
+ return fmt.Errorf("dest write: %v", err)
}
if newsb.Size != sb.Size {
- return errorf("write size mismatch: source_read=%d but dest_write=%d", sb.Size, newsb.Size)
- }
- set(status("copied; removing from queue"))
- if err := sh.queue.Delete(sb.Ref.String()); err != nil {
- return errorf("queue delete: %v", err)
+ return fmt.Errorf("write size mismatch: source_read=%d but dest_write=%d", sb.Size, newsb.Size)
}
return nil
}
@@ -470,7 +513,29 @@ func (sh *SyncHandler) ReceiveBlob(br blob.Ref, r io.Reader) (sb blob.SizedRef,
return sb, sh.enqueue(sb)
}
+// addBlobToCopy adds a blob to copy to memory (not to disk: that's enqueue).
+// It returns true if it was added, or false if it was a duplicate.
+func (sh *SyncHandler) addBlobToCopy(sb blob.SizedRef) bool {
+ sh.mu.Lock()
+ defer sh.mu.Unlock()
+
+ sh.needCopy[sb.Ref] = sb.Size
+ sh.bytesRemain += int64(sb.Size)
+
+ // Non-blocking send to wake up looping goroutine if it's
+ // sleeping...
+ select {
+ case sh.wakec <- true:
+ default:
+ }
+ return true
+}
+
func (sh *SyncHandler) enqueue(sb blob.SizedRef) error {
+ if !sh.addBlobToCopy(sb) {
+ // Dup
+ return nil
+ }
// TODO: include current time in encoded value, to attempt to
// do in-order delivery to remote side later? Possible
// friendly optimization later. Might help peer's indexer have
@@ -478,15 +543,149 @@ func (sh *SyncHandler) enqueue(sb blob.SizedRef) error {
if err := sh.queue.Set(sb.Ref.String(), fmt.Sprint(sb.Size)); err != nil {
return err
}
- // Non-blocking send to wake up looping goroutine if it's
- // sleeping...
- select {
- case sh.blobc <- sb:
- default:
- }
return nil
}
+func (sh *SyncHandler) newCopyStatus(sb blob.SizedRef) *copyStatus {
+ now := time.Now()
+ return ©Status{
+ sh: sh,
+ sb: sb,
+ state: statusStarting,
+ start: now,
+ t: now,
+ }
+}
+
+// copyStatus is an in-progress copy.
+type copyStatus struct {
+ sh *SyncHandler
+ sb blob.SizedRef
+ start time.Time
+
+ mu sync.Mutex
+ state string // one of statusFoo, below
+ t time.Time // last status update time
+ nread uint32
+ nwrite uint32
+}
+
+const (
+ statusStarting = "starting"
+ statusFetching = "fetching source"
+ statusReading = "reading"
+ statusWriting = "writing"
+)
+
+func (cs *copyStatus) setStatus(s string) {
+ now := time.Now()
+ cs.mu.Lock()
+ defer cs.mu.Unlock()
+ cs.state = s
+ cs.t = now
+}
+
+func (cs *copyStatus) setError(err error) {
+ now := time.Now()
+ sh := cs.sh
+ br := cs.sb.Ref
+ if err == nil {
+ // This is somewhat slow, so do it before we acquire the lock.
+ // The queue is thread-safe.
+ if derr := sh.queue.Delete(br.String()); derr != nil {
+ sh.logf("queue delete of %v error: %v", cs.sb.Ref, derr)
+ }
+ }
+
+ sh.mu.Lock()
+ defer sh.mu.Unlock()
+ if _, needCopy := sh.needCopy[br]; !needCopy {
+ sh.logf("IGNORING DUPLICATE UPLOAD of %v = %v", br, err)
+ return
+ }
+ delete(sh.copying, br)
+ if err == nil {
+ delete(sh.needCopy, br)
+ delete(sh.lastFail, br)
+ sh.recentCopyTime = now
+ sh.totalCopies++
+ sh.totalCopyBytes += int64(cs.sb.Size)
+ sh.bytesRemain -= int64(cs.sb.Size)
+ return
+ }
+
+ sh.logf("error copying %v: %v", br, err)
+ sh.lastFail[br] = failDetail{
+ when: now,
+ err: err,
+ }
+
+ // Kinda lame. TODO: use a ring buffer or container/list instead.
+ if len(sh.recentErrors) == maxRecentErrors {
+ copy(sh.recentErrors[1:], sh.recentErrors)
+ sh.recentErrors = sh.recentErrors[:maxRecentErrors-1]
+ }
+ sh.recentErrors = append(sh.recentErrors, br)
+}
+
+func (cs *copyStatus) String() string {
+ var buf bytes.Buffer
+ now := time.Now()
+ buf.WriteString(cs.sb.Ref.String())
+ buf.WriteString(": ")
+
+ cs.mu.Lock()
+ defer cs.mu.Unlock()
+ sinceStart := now.Sub(cs.start)
+ sinceLast := now.Sub(cs.t)
+
+ switch cs.state {
+ case statusReading:
+ buf.WriteString(cs.state)
+ fmt.Fprintf(&buf, " (%d/%dB)", cs.nread, cs.sb.Size)
+ case statusWriting:
+ if cs.nwrite == cs.sb.Size {
+ buf.WriteString("wrote all, waiting ack")
+ } else {
+ buf.WriteString(cs.state)
+ fmt.Fprintf(&buf, " (%d/%dB)", cs.nwrite, cs.sb.Size)
+ }
+ default:
+ buf.WriteString(cs.state)
+
+ }
+ if sinceLast > 5*time.Second {
+ fmt.Fprintf(&buf, ", last change %v ago (total elapsed %v)", sinceLast, sinceStart)
+ }
+ return buf.String()
+}
+
+type failDetail struct {
+ when time.Time
+ err error
+}
+
+// incrWriter is an io.Writer that locks mu and increments *n.
+type incrWriter struct {
+ cs *copyStatus
+ n *uint32
+}
+
+func (w incrWriter) Write(p []byte) (n int, err error) {
+ w.cs.mu.Lock()
+ *w.n += uint32(len(p))
+ w.cs.t = time.Now()
+ w.cs.mu.Unlock()
+ return len(p), nil
+}
+
+func storageDesc(v interface{}) string {
+ if s, ok := v.(fmt.Stringer); ok {
+ return s.String()
+ }
+ return fmt.Sprintf("%T", v)
+}
+
// TODO(bradfitz): implement these? what do they mean? possibilities:
// a) proxy to sh.from
// b) proxy to sh.to
@@ -495,7 +694,26 @@ func (sh *SyncHandler) enqueue(sb blob.SizedRef) error {
// sh.from has, sh.to doesn't have, and isn't in the queue to be replicated.
//
// For now, don't implement them. Wait until we need them.
-//
-// func (sh *SyncHandler) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
-// func (sh *SyncHandler) FetchStreaming(br blob.Ref) (io.ReadCloser, uint32, error) {
-// func (sh *SyncHandler) EnumerateBlobs(dest chan<- blob.SizedRef, after string, limit int) error {
+
+func (sh *SyncHandler) Fetch(blob.Ref) (file types.ReadSeekCloser, size uint32, err error) {
+ panic("Unimplemeted blobserver.Fetch called")
+}
+
+func (sh *SyncHandler) FetchStreaming(blob.Ref) (file io.ReadCloser, size uint32, err error) {
+ panic("Unimplemeted blobserver.FetchStreaming called")
+}
+
+func (sh *SyncHandler) StatBlobs(dest chan<- blob.SizedRef, blobs []blob.Ref) error {
+ sh.logf("Unexpected StatBlobs call")
+ return nil
+}
+
+func (sh *SyncHandler) EnumerateBlobs(ctx *context.Context, dest chan<- blob.SizedRef, after string, limit int) error {
+ defer close(dest)
+ sh.logf("Unexpected EnumerateBlobs call")
+ return nil
+}
+
+func (sh *SyncHandler) RemoveBlobs(blobs []blob.Ref) error {
+ panic("Unimplemeted RemoveBlobs")
+}
diff --git a/website/content/docs/release/0.8 b/website/content/docs/release/0.8
new file mode 100644
index 000000000..8783b6fc7
--- /dev/null
+++ b/website/content/docs/release/0.8
@@ -0,0 +1,37 @@
+Release 0.8 ("XXXXXX")
+
+2014-mm-dd
+
+
+
+
+
+
+
+
+Release stats
+n total committers over n commits since 0.7 (just %v ago), including TODO
+
+Thank you!
+
+New in 0.8
+Apps
+
+Web UI
+
+Storage
+
+ - Rewritten blob sync handler (the server's background async replication)
+
+Search
+
+Tools
+
+General
+