sync: delay copy retry on specific errors

http://camlistore.org/issue/206

Change-Id: I1dd07149352e3af6b39bcb86ed2312f19c3bae30
This commit is contained in:
mpl 2013-08-27 23:30:02 +02:00
parent f2dd7c1328
commit e036f96488
3 changed files with 80 additions and 8 deletions

25
pkg/camerrors/errors.go Normal file
View File

@ -0,0 +1,25 @@
/*
Copyright 2013 The Camlistore Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package camerrors define specific errors that are used to
// decide on how to deal with some failure cases.
package camerrors
import (
"errors"
)
var MissingKeyBlob = errors.New("key blob not found")

View File

@ -22,9 +22,11 @@ import (
"encoding/json"
"errors"
"fmt"
"log"
"strings"
"camlistore.org/pkg/blob"
"camlistore.org/pkg/camerrors"
"camlistore.org/third_party/code.google.com/p/go.crypto/openpgp/armor"
"camlistore.org/third_party/code.google.com/p/go.crypto/openpgp/packet"
)
@ -136,7 +138,12 @@ func (vr *VerifyRequest) ParsePayloadMap() bool {
func (vr *VerifyRequest) FindAndParsePublicKeyBlob() bool {
reader, _, err := vr.fetcher.FetchStreaming(vr.CamliSigner)
if err != nil {
return vr.fail(fmt.Sprintf("error fetching public key blob: %v", err))
log.Printf("error fetching public key blob: %v", err)
// TODO(mpl): we're losing some info here, so maybe
// create an error type that contains the reason,
// instead of logging the reason.
vr.Err = camerrors.MissingKeyBlob
return false
}
defer reader.Close()
pk, err := openArmoredPublicKeyFile(reader)

View File

@ -27,13 +27,17 @@ import (
"camlistore.org/pkg/blob"
"camlistore.org/pkg/blobserver"
"camlistore.org/pkg/camerrors"
"camlistore.org/pkg/jsonconfig"
"camlistore.org/pkg/readerutil"
)
var queueSyncInterval = 5 * time.Second
const maxErrors = 20
const (
maxErrors = 20
maxCopyTries = 17 // ~36 hours with retryCopyLoop(time.Second ...)
)
// TODO: rate control + tunable
// TODO: expose copierPoolSize as tunable
@ -271,6 +275,8 @@ func (sh *SyncHandler) runSync(srcName string, enumSrc blobserver.Storage, longP
for i := 0; i < toCopy; i++ {
sh.setStatus("Copied %d/%d of batch of queued blobs", nCopied, toCopy)
res := <-resch
// TODO(mpl): why is nCopied incremented while res.err hasn't been checked
// yet? Maybe it should be renamed to nTried?
nCopied++
sh.lk.Lock()
if res.err == nil {
@ -301,7 +307,33 @@ func (sh *SyncHandler) syncQueueLoop() {
func (sh *SyncHandler) copyWorker(res chan<- copyResult, work <-chan blob.SizedRef) {
for sb := range work {
res <- copyResult{sb, sh.copyBlob(sb)}
res <- copyResult{sb, sh.copyBlob(sb, 0)}
}
}
func (sh *SyncHandler) retryCopyLoop(initialInterval time.Duration, sb blob.SizedRef) {
interval := initialInterval
tryCount := 1
for {
if tryCount >= maxCopyTries {
break
}
t1 := time.Now()
err := sh.copyBlob(sb, tryCount)
sh.lk.Lock()
if err == nil {
sh.totalCopies++
sh.totalCopyBytes += sb.Size
sh.recentCopyTime = time.Now().UTC()
sh.lk.Unlock()
break
} else {
sh.totalErrors++
}
sh.lk.Unlock()
time.Sleep(t1.Add(interval).Sub(time.Now()))
interval = interval * 2
tryCount++
}
}
@ -313,7 +345,7 @@ type status string
func (s status) String() string { return string(s) }
func (sh *SyncHandler) copyBlob(sb blob.SizedRef) error {
func (sh *SyncHandler) copyBlob(sb blob.SizedRef, tryCount int) error {
key := sb.Ref.String()
set := func(s fmt.Stringer) {
sh.setBlobStatus(key, s)
@ -345,6 +377,17 @@ func (sh *SyncHandler) copyBlob(sb blob.SizedRef) error {
}))
newsb, err := sh.to.ReceiveBlob(sb.Ref, readerutil.CountingReader{rc, &bytesCopied})
if err != nil {
if err == camerrors.MissingKeyBlob && tryCount == 0 {
err := sh.fromq.RemoveBlobs([]blob.Ref{sb.Ref})
if err != nil {
return errorf("source queue delete: %v", err)
}
// TODO(mpl): instead of doing one goroutine per blob, maybe transfer
// the "faulty" blobs in a retry queue, and do one goroutine per queue?
// Also we probably will want to deal with part of this problem in the
// index layer anyway: http://camlistore.org/issue/102
go sh.retryCopyLoop(time.Second, sb)
}
return errorf("dest write: %v", err)
}
if newsb.Size != sb.Size {
@ -362,9 +405,6 @@ func every(interval time.Duration, f func()) {
for {
t1 := time.Now()
f()
sleepUntil := t1.Add(interval)
if sleep := sleepUntil.Sub(time.Now()); sleep > 0 {
time.Sleep(sleep)
}
time.Sleep(t1.Add(interval).Sub(time.Now()))
}
}