/* Copyright 2011 Google Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package server import ( "bytes" "errors" "fmt" "html" "io" "io/ioutil" "log" "net/http" "os" "sort" "strconv" "strings" "sync" "time" "camlistore.org/pkg/auth" "camlistore.org/pkg/blob" "camlistore.org/pkg/blobserver" "camlistore.org/pkg/constants" "camlistore.org/pkg/index" "camlistore.org/pkg/sorted" "camlistore.org/pkg/types/camtypes" "code.google.com/p/xsrftoken" "go4.org/jsonconfig" "golang.org/x/net/context" "go4.org/syncutil" ) const ( maxRecentErrors = 20 queueSyncInterval = 5 * time.Second ) type blobReceiverEnumerator interface { blobserver.BlobReceiver blobserver.BlobEnumerator } // The SyncHandler handles async replication in one direction between // a pair storage targets, a source and target. // // SyncHandler is a BlobReceiver but doesn't actually store incoming // blobs; instead, it records blobs it has received and queues them // for async replication soon, or whenever it can. type SyncHandler struct { // TODO: rate control tunables fromName, toName string from blobserver.Storage to blobReceiverEnumerator queue sorted.KeyValue toIndex bool // whether this sync is from a blob storage to an index idle bool // if true, the handler does nothing other than providing the discovery. copierPoolSize int // wakec wakes up the blob syncer loop when a blob is received. wakec chan bool mu sync.Mutex // protects following status string copying map[blob.Ref]*copyStatus // to start time needCopy map[blob.Ref]uint32 // blobs needing to be copied. some might be in lastFail too. lastFail map[blob.Ref]failDetail // subset of needCopy that previously failed, and why bytesRemain int64 // sum of needCopy values recentErrors []blob.Ref // up to maxRecentErrors, recent first. valid if still in lastFail. recentCopyTime time.Time totalCopies int64 totalCopyBytes int64 totalErrors int64 vshards []string // validation shards. if 0, validation not running vshardDone int // shards validated vshardErrs []string vmissing int64 // missing blobs found during validat vdestCount int // number of blobs seen on dest during validate vdestBytes int64 // number of blob bytes seen on dest during validate vsrcCount int // number of blobs seen on src during validate vsrcBytes int64 // number of blob bytes seen on src during validate // syncLoop tries to send on alarmIdlec each time we've slept for a full // queueSyncInterval. Initialized as a synchronous chan if we're not an // idle sync handler, otherwise nil. alarmIdlec chan struct{} } var ( _ blobserver.Storage = (*SyncHandler)(nil) _ blobserver.HandlerIniter = (*SyncHandler)(nil) ) func (sh *SyncHandler) String() string { return fmt.Sprintf("[SyncHandler %v -> %v]", sh.fromName, sh.toName) } func (sh *SyncHandler) logf(format string, args ...interface{}) { log.Printf(sh.String()+" "+format, args...) } func init() { blobserver.RegisterHandlerConstructor("sync", newSyncFromConfig) } // TODO: this is is temporary. should delete, or decide when it's on by default (probably always). // Then need genconfig option to disable it. var validateOnStartDefault, _ = strconv.ParseBool(os.Getenv("CAMLI_SYNC_VALIDATE")) func newSyncFromConfig(ld blobserver.Loader, conf jsonconfig.Obj) (http.Handler, error) { var ( from = conf.RequiredString("from") to = conf.RequiredString("to") fullSync = conf.OptionalBool("fullSyncOnStart", false) blockFullSync = conf.OptionalBool("blockingFullSyncOnStart", false) idle = conf.OptionalBool("idle", false) queueConf = conf.OptionalObject("queue") copierPoolSize = conf.OptionalInt("copierPoolSize", 5) validate = conf.OptionalBool("validateOnStart", validateOnStartDefault) ) if err := conf.Validate(); err != nil { return nil, err } if idle { return newIdleSyncHandler(from, to), nil } if len(queueConf) == 0 { return nil, errors.New(`Missing required "queue" object`) } q, err := sorted.NewKeyValue(queueConf) if err != nil { return nil, err } isToIndex := false fromBs, err := ld.GetStorage(from) if err != nil { return nil, err } toBs, err := ld.GetStorage(to) if err != nil { return nil, err } if _, ok := fromBs.(*index.Index); !ok { if _, ok := toBs.(*index.Index); ok { isToIndex = true } } sh := newSyncHandler(from, to, fromBs, toBs, q) sh.toIndex = isToIndex sh.copierPoolSize = copierPoolSize if err := sh.readQueueToMemory(); err != nil { return nil, fmt.Errorf("Error reading sync queue to memory: %v", err) } if fullSync || blockFullSync { sh.logf("Doing full sync") didFullSync := make(chan bool, 1) go func() { for { n := sh.runSync("pending blobs queue", sh.enumeratePendingBlobs) if n > 0 { sh.logf("Queue sync copied %d blobs", n) continue } break } n := sh.runSync("full", blobserverEnumerator(context.TODO(), fromBs)) sh.logf("Full sync copied %d blobs", n) didFullSync <- true sh.syncLoop() }() if blockFullSync { sh.logf("Blocking startup, waiting for full sync from %q to %q", from, to) <-didFullSync sh.logf("Full sync complete.") } } else { go sh.syncLoop() } if validate { go sh.startFullValidation() } blobserver.GetHub(fromBs).AddReceiveHook(sh.enqueue) return sh, nil } func (sh *SyncHandler) InitHandler(hl blobserver.FindHandlerByTyper) error { _, h, err := hl.FindHandlerByType("root") if err == blobserver.ErrHandlerTypeNotFound { // It's optional. We register ourselves if it's there. return nil } if err != nil { return err } h.(*RootHandler).registerSyncHandler(sh) return nil } func newSyncHandler(fromName, toName string, from blobserver.Storage, to blobReceiverEnumerator, queue sorted.KeyValue) *SyncHandler { return &SyncHandler{ copierPoolSize: 5, from: from, to: to, fromName: fromName, toName: toName, queue: queue, wakec: make(chan bool), status: "not started", needCopy: make(map[blob.Ref]uint32), lastFail: make(map[blob.Ref]failDetail), copying: make(map[blob.Ref]*copyStatus), alarmIdlec: make(chan struct{}), } } // NewSyncHandler returns a handler that will asynchronously and continuously // copy blobs from src to dest, if missing on dest. // Blobs waiting to be copied are stored on pendingQueue. srcName and destName are // only used for status and debugging messages. // N.B: blobs should be added to src with a method that notifies the blob hub, // such as blobserver.Receive. func NewSyncHandler(srcName, destName string, src blobserver.Storage, dest blobReceiverEnumerator, pendingQueue sorted.KeyValue) *SyncHandler { sh := newSyncHandler(srcName, destName, src, dest, pendingQueue) go sh.syncLoop() blobserver.GetHub(sh.from).AddReceiveHook(sh.enqueue) return sh } // IdleWait waits until the sync handler has finished processing the currently // queued blobs. func (sh *SyncHandler) IdleWait() { if sh.idle { return } <-sh.alarmIdlec } func (sh *SyncHandler) signalIdle() { select { case sh.alarmIdlec <- struct{}{}: default: } } func newIdleSyncHandler(fromName, toName string) *SyncHandler { return &SyncHandler{ fromName: fromName, toName: toName, idle: true, status: "disabled", } } func (sh *SyncHandler) discovery() camtypes.SyncHandlerDiscovery { return camtypes.SyncHandlerDiscovery{ From: sh.fromName, To: sh.toName, ToIndex: sh.toIndex, } } // syncStatus is a snapshot of the current status, for display by the // status handler (status.go) in both JSON and HTML forms. type syncStatus struct { sh *SyncHandler From string `json:"from"` FromDesc string `json:"fromDesc"` To string `json:"to"` ToDesc string `json:"toDesc"` DestIsIndex bool `json:"destIsIndex,omitempty"` BlobsToCopy int `json:"blobsToCopy"` BytesToCopy int64 `json:"bytesToCopy"` LastCopySecAgo int `json:"lastCopySecondsAgo,omitempty"` } func (sh *SyncHandler) currentStatus() syncStatus { sh.mu.Lock() defer sh.mu.Unlock() ago := 0 if !sh.recentCopyTime.IsZero() { ago = int(time.Now().Sub(sh.recentCopyTime).Seconds()) } return syncStatus{ sh: sh, From: sh.fromName, FromDesc: storageDesc(sh.from), To: sh.toName, ToDesc: storageDesc(sh.to), DestIsIndex: sh.toIndex, BlobsToCopy: len(sh.needCopy), BytesToCopy: sh.bytesRemain, LastCopySecAgo: ago, } } // readQueueToMemory slurps in the pending queue from disk (or // wherever) to memory. Even with millions of blobs, it's not much // memory. The point of the persistent queue is to survive restarts if // the "fullSyncOnStart" option is off. With "fullSyncOnStart" set to // true, this is a little pointless (we'd figure out what's missing // eventually), but this might save us a few minutes (let us start // syncing missing blobs a few minutes earlier) since we won't have to // wait to figure out what the destination is missing. func (sh *SyncHandler) readQueueToMemory() error { errc := make(chan error, 1) blobs := make(chan blob.SizedRef, 16) intr := make(chan struct{}) defer close(intr) go func() { errc <- sh.enumerateQueuedBlobs(blobs, intr) }() n := 0 for sb := range blobs { sh.addBlobToCopy(sb) n++ } sh.logf("Added %d pending blobs from sync queue to pending list", n) return <-errc } func (sh *SyncHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { if req.Method == "POST" { if req.FormValue("mode") == "validate" { token := req.FormValue("token") if xsrftoken.Valid(token, auth.Token(), "user", "runFullValidate") { sh.startFullValidation() http.Redirect(rw, req, "./", http.StatusFound) return } } http.Error(rw, "Bad POST request", http.StatusBadRequest) return } // TODO: remove this lock and instead just call currentStatus, // and transition to using that here. sh.mu.Lock() defer sh.mu.Unlock() f := func(p string, a ...interface{}) { fmt.Fprintf(rw, p, a...) } now := time.Now() f("

Sync Status (for %s to %s)

", sh.fromName, sh.toName) f("

Current status: %s

", html.EscapeString(sh.status)) if sh.idle { return } f("

Stats:

") f("

Validation

") if len(sh.vshards) == 0 { f("Validation disabled") token := xsrftoken.Generate(auth.Token(), "user", "runFullValidate") f("
", token) } else { f("

Background scan of source and destination to ensure that the destination has everything the source does, or is at least enqueued to sync.

") f("