/* Copyright 2011 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package server import ( "fmt" "html" "log" "net/http" "strings" "sync" "time" "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/camerrors" "camlistore.org/pkg/jsonconfig" "camlistore.org/pkg/readerutil" ) var queueSyncInterval = 5 * time.Second const ( maxErrors = 20 maxCopyTries = 17 // ~36 hours with retryCopyLoop(time.Second ...) ) // TODO: rate control + tunable // TODO: expose copierPoolSize as tunable type SyncHandler struct { fromName, fromqName, toName string from, fromq blobserver.Storage to blobserver.BlobReceiver idle bool // if true, the handler does nothing other than providing the discovery. copierPoolSize int lk sync.Mutex // protects following status string blobStatus map[string]fmt.Stringer // stringer called with lk held recentErrors []timestampedError recentCopyTime time.Time totalCopies int64 totalCopyBytes int64 totalErrors int64 } func init() { blobserver.RegisterHandlerConstructor("sync", newSyncFromConfig) } func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, error) { from := conf.RequiredString("from") to := conf.RequiredString("to") fullSync := conf.OptionalBool("fullSyncOnStart", false) blockFullSync := conf.OptionalBool("blockingFullSyncOnStart", false) idle := conf.OptionalBool("idle", false) queueDir := conf.OptionalString("queueDir", "") if err := conf.Validate(); err != nil { return nil, err } if idle { synch, err := createIdleSyncHandler(from, to) if err != nil { return nil, err } return synch, nil } fromBs, err := ld.GetStorage(from) if err != nil { return nil, err } toBs, err := ld.GetStorage(to) if err != nil { return nil, err } fromQsc, ok := fromBs.(blobserver.StorageQueueCreator) if !ok { if queueDir != "" { // TODO(bradfitz): finish implementing. // Should create a a localdisk target on the // queueDir and setup sync from that. Be sure // to remove item from top-level TODO too. } return nil, fmt.Errorf("Prefix %s (type %T) does not support being efficient replication source (queueing)", from, fromBs) } synch, err := createSyncHandler(from, to, fromQsc, toBs) if err != nil { return nil, err } if fullSync || blockFullSync { didFullSync := make(chan bool, 1) go func() { n := synch.runSync("queue", fromQsc, 0) log.Printf("Queue sync copied %d blobs", n) n = synch.runSync("full", fromBs, 0) log.Printf("Full sync copied %d blobs", n) didFullSync <- true synch.syncQueueLoop() }() if blockFullSync { log.Printf("Blocking startup, waiting for full sync from %q to %q", from, to) <-didFullSync log.Printf("Full sync complete.") } } else { go synch.syncQueueLoop() } rootPrefix, _, err := ld.FindHandlerByType("root") switch err { case blobserver.ErrHandlerTypeNotFound: // ignore; okay to not have a root handler. case nil: h, err := ld.GetHandler(rootPrefix) if err != nil { return nil, err } h.(*RootHandler).registerSyncHandler(synch) default: return nil, fmt.Errorf("Error looking for root handler: %v", err) } return synch, nil } type timestampedError struct { t time.Time err error } func createSyncHandler(fromName, toName string, from blobserver.StorageQueueCreator, to blobserver.BlobReceiver) (*SyncHandler, error) { h := &SyncHandler{ copierPoolSize: 3, from: from, to: to, fromName: fromName, toName: toName, status: "not started", blobStatus: make(map[string]fmt.Stringer), } h.fromqName = strings.Replace(strings.Trim(toName, "/"), "/", "-", -1) var err error h.fromq, err = from.CreateQueue(h.fromqName) if err != nil { return nil, fmt.Errorf("Prefix %s (type %T) failed to create queue %q: %v", fromName, from, h.fromqName, err) } return h, nil } func createIdleSyncHandler(fromName, toName string) (*SyncHandler, error) { h := &SyncHandler{ fromName: fromName, toName: toName, idle: true, status: "disabled", } return h, nil } func (sh *SyncHandler) discoveryMap() map[string]interface{} { // TODO(mpl): more status info return map[string]interface{}{ "from": sh.fromName, "to": sh.toName, } } func (sh *SyncHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { sh.lk.Lock() defer sh.lk.Unlock() fmt.Fprintf(rw, "

%s to %s Sync Status

Current status: %s

", sh.fromName, sh.toName, html.EscapeString(sh.status)) if sh.idle { return } fmt.Fprintf(rw, "

Stats:

") if len(sh.blobStatus) > 0 { fmt.Fprintf(rw, "

Current Copies:

") } if len(sh.recentErrors) > 0 { fmt.Fprintf(rw, "

Recent Errors:

") } } func (sh *SyncHandler) setStatus(s string, args ...interface{}) { s = time.Now().UTC().Format(time.RFC3339) + ": " + fmt.Sprintf(s, args...) sh.lk.Lock() defer sh.lk.Unlock() sh.status = s } func (sh *SyncHandler) setBlobStatus(blobref string, s fmt.Stringer) { sh.lk.Lock() defer sh.lk.Unlock() if s != nil { sh.blobStatus[blobref] = s } else { delete(sh.blobStatus, blobref) } } func (sh *SyncHandler) addErrorToLog(err error) { log.Printf(err.Error()) sh.lk.Lock() defer sh.lk.Unlock() sh.recentErrors = append(sh.recentErrors, timestampedError{time.Now().UTC(), err}) if len(sh.recentErrors) > maxErrors { // Kinda lame, but whatever. Only for errors, rare. copy(sh.recentErrors[:maxErrors], sh.recentErrors[1:maxErrors+1]) sh.recentErrors = sh.recentErrors[:maxErrors] } } type copyResult struct { sb blob.SizedRef err error } func (sh *SyncHandler) runSync(srcName string, enumSrc blobserver.Storage, longPollWait time.Duration) int { if longPollWait != 0 { sh.setStatus("Idle; waiting for new blobs") // TODO: use longPollWait somehow. } enumch := make(chan blob.SizedRef) errch := make(chan error, 1) go func() { errch <- enumSrc.EnumerateBlobs(enumch, "", 1000) }() nCopied := 0 toCopy := 0 workch := make(chan blob.SizedRef, 1000) resch := make(chan copyResult, 8) for sb := range enumch { toCopy++ workch <- sb if toCopy <= sh.copierPoolSize { go sh.copyWorker(resch, workch) } sh.setStatus("Enumerating queued blobs: %d", toCopy) } close(workch) for i := 0; i < toCopy; i++ { sh.setStatus("Copied %d/%d of batch of queued blobs", nCopied, toCopy) res := <-resch // TODO(mpl): why is nCopied incremented while res.err hasn't been checked // yet? Maybe it should be renamed to nTried? nCopied++ sh.lk.Lock() if res.err == nil { sh.totalCopies++ sh.totalCopyBytes += res.sb.Size sh.recentCopyTime = time.Now().UTC() } else { sh.totalErrors++ } sh.lk.Unlock() } if err := <-errch; err != nil { sh.addErrorToLog(fmt.Errorf("replication error for source %q, enumerate from source: %v", srcName, err)) return nCopied } return nCopied } func (sh *SyncHandler) syncQueueLoop() { every(queueSyncInterval, func() { for sh.runSync(sh.fromqName, sh.fromq, queueSyncInterval) > 0 { // Loop, before sleeping. } sh.setStatus("Sleeping briefly before next long poll.") }) } func (sh *SyncHandler) copyWorker(res chan<- copyResult, work <-chan blob.SizedRef) { for sb := range work { res <- copyResult{sb, sh.copyBlob(sb, 0)} } } func (sh *SyncHandler) retryCopyLoop(initialInterval time.Duration, sb blob.SizedRef) { interval := initialInterval tryCount := 1 for { if tryCount >= maxCopyTries { break } t1 := time.Now() err := sh.copyBlob(sb, tryCount) sh.lk.Lock() if err == nil { sh.totalCopies++ sh.totalCopyBytes += sb.Size sh.recentCopyTime = time.Now().UTC() sh.lk.Unlock() break } else { sh.totalErrors++ } sh.lk.Unlock() time.Sleep(t1.Add(interval).Sub(time.Now())) interval = interval * 2 tryCount++ } } type statusFunc func() string func (sf statusFunc) String() string { return sf() } type status string func (s status) String() string { return string(s) } func (sh *SyncHandler) copyBlob(sb blob.SizedRef, tryCount int) error { key := sb.Ref.String() set := func(s fmt.Stringer) { sh.setBlobStatus(key, s) } defer set(nil) errorf := func(s string, args ...interface{}) error { // TODO: increment error stats pargs := []interface{}{sh.fromqName, sb.Ref} pargs = append(pargs, args...) err := fmt.Errorf("replication error for queue %q, blob %s: "+s, pargs...) sh.addErrorToLog(err) return err } set(status("sending GET to source")) rc, fromSize, err := sh.from.FetchStreaming(sb.Ref) if err != nil { return errorf("source fetch: %v", err) } defer rc.Close() if fromSize != sb.Size { return errorf("source fetch size mismatch: get=%d, enumerate=%d", fromSize, sb.Size) } bytesCopied := int64(0) // TODO: data race, accessed without locking in statusFunc below. set(statusFunc(func() string { return fmt.Sprintf("copying: %d/%d bytes", bytesCopied, sb.Size) })) newsb, err := sh.to.ReceiveBlob(sb.Ref, readerutil.CountingReader{rc, &bytesCopied}) if err != nil { if err == camerrors.MissingKeyBlob && tryCount == 0 { err := sh.fromq.RemoveBlobs([]blob.Ref{sb.Ref}) if err != nil { return errorf("source queue delete: %v", err) } // TODO(mpl): instead of doing one goroutine per blob, maybe transfer // the "faulty" blobs in a retry queue, and do one goroutine per queue? // Also we probably will want to deal with part of this problem in the // index layer anyway: http://camlistore.org/issue/102 go sh.retryCopyLoop(time.Second, sb) } return errorf("dest write: %v", err) } if newsb.Size != sb.Size { return errorf("write size mismatch: source_read=%d but dest_write=%d", sb.Size, newsb.Size) } set(status("copied; removing from queue")) err = sh.fromq.RemoveBlobs([]blob.Ref{sb.Ref}) if err != nil { return errorf("source queue delete: %v", err) } return nil } func every(interval time.Duration, f func()) { for { t1 := time.Now() f() time.Sleep(t1.Add(interval).Sub(time.Now())) } }