// mgo - MongoDB driver for Go // // Copyright (c) 2010-2012 - Gustavo Niemeyer // // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // // 1. Redistributions of source code must retain the above copyright notice, this // list of conditions and the following disclaimer. // 2. Redistributions in binary form must reproduce the above copyright notice, // this list of conditions and the following disclaimer in the documentation // and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR // ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. package mgo import ( "errors" "math/rand" "sync" "time" ) // --------------------------------------------------------------------------- // Mongo cluster encapsulation. // // A cluster enables the communication with one or more servers participating // in a mongo cluster. This works with individual servers, a replica set, // a replica pair, one or multiple mongos routers, etc. type mongoCluster struct { sync.RWMutex serverSynced sync.Cond userSeeds []string dynaSeeds []string servers mongoServers masters mongoServers slaves mongoServers references int syncing bool direct bool cachedIndex map[string]bool sync chan bool } func newCluster(userSeeds []string, direct bool) *mongoCluster { cluster := &mongoCluster{userSeeds: userSeeds, references: 1, direct: direct} cluster.serverSynced.L = cluster.RWMutex.RLocker() cluster.sync = make(chan bool, 1) go cluster.syncServersLoop() return cluster } // Acquire increases the reference count for the cluster. func (cluster *mongoCluster) Acquire() { cluster.Lock() cluster.references++ debugf("Cluster %p acquired (refs=%d)", cluster, cluster.references) cluster.Unlock() } // Release decreases the reference count for the cluster. Once // it reaches zero, all servers will be closed. func (cluster *mongoCluster) Release() { cluster.Lock() if cluster.references == 0 { panic("cluster.Release() with references == 0") } cluster.references-- debugf("Cluster %p released (refs=%d)", cluster, cluster.references) if cluster.references == 0 { for _, server := range cluster.servers.Slice() { server.Close() } // Wake up the sync loop so it can die. cluster.syncServers() } cluster.Unlock() } func (cluster *mongoCluster) LiveServers() (servers []string) { cluster.RLock() for _, serv := range cluster.servers.Slice() { servers = append(servers, serv.Addr) } cluster.RUnlock() return servers } func (cluster *mongoCluster) removeServer(server *mongoServer) { cluster.Lock() cluster.masters.Remove(server) cluster.slaves.Remove(server) other := cluster.servers.Remove(server) cluster.Unlock() if other != nil { other.Close() log("Removed server ", server.Addr, " from cluster.") } server.Close() } type isMasterResult struct { IsMaster bool Secondary bool Primary string Hosts []string Passives []string } func (cluster *mongoCluster) syncServer(server *mongoServer) (hosts []string, err error) { addr := server.Addr log("SYNC Processing ", addr, "...") defer func() { if err != nil { cluster.removeServer(server) } }() socket, err := server.AcquireSocket() if err != nil { log("SYNC Failed to get socket to ", addr, ": ", err.Error()) return } // Monotonic will let us talk to a slave and still hold the socket. session := newSession(Monotonic, cluster, socket, 10 * time.Second) defer session.Close() // session holds the socket now. socket.Release() result := isMasterResult{} err = session.Run("ismaster", &result) if err != nil { log("SYNC Command 'ismaster' to ", addr, " failed: ", err.Error()) return } debugf("SYNC Result of 'ismaster' from %s: %#v", addr, result) if result.IsMaster { log("SYNC ", addr, " is a master.") // Made an incorrect assumption above, so fix stats. stats.conn(-1, false) server.SetMaster(true) stats.conn(+1, true) } else if result.Secondary { log("SYNC ", addr, " is a slave.") } else { log("SYNC ", addr, " is neither a master nor a slave.") // Made an incorrect assumption above, so fix stats. stats.conn(-1, false) return nil, errors.New(addr + " is not a master nor slave") } hosts = make([]string, 0, 1+len(result.Hosts)+len(result.Passives)) if result.Primary != "" { // First in the list to speed up master discovery. hosts = append(hosts, result.Primary) } hosts = append(hosts, result.Hosts...) hosts = append(hosts, result.Passives...) debugf("SYNC %s knows about the following peers: %#v", addr, hosts) return hosts, nil } func (cluster *mongoCluster) mergeServer(server *mongoServer) { cluster.Lock() previous := cluster.servers.Search(server) isMaster := server.IsMaster() if previous == nil { cluster.servers.Add(server) if isMaster { log("SYNC Adding ", server.Addr, " to cluster as a master.") cluster.masters.Add(server) } else { log("SYNC Adding ", server.Addr, " to cluster as a slave.") cluster.slaves.Add(server) } } else { if isMaster != previous.IsMaster() { if isMaster { log("SYNC Server ", server.Addr, " is now a master.") cluster.slaves.Remove(previous) cluster.masters.Add(previous) } else { log("SYNC Server ", server.Addr, " is now a slave.") cluster.masters.Remove(previous) cluster.slaves.Add(previous) } } previous.Merge(server) } debugf("SYNC Broadcasting availability of server %s", server.Addr) cluster.serverSynced.Broadcast() cluster.Unlock() } func (cluster *mongoCluster) getKnownAddrs() []string { cluster.RLock() max := len(cluster.userSeeds) + len(cluster.dynaSeeds) + cluster.servers.Len() seen := make(map[string]bool, max) known := make([]string, 0, max) add := func(addr string) { if _, found := seen[addr]; !found { seen[addr] = true known = append(known, addr) } } for _, addr := range cluster.userSeeds { add(addr) } for _, addr := range cluster.dynaSeeds { add(addr) } for _, serv := range cluster.servers.Slice() { add(serv.Addr) } cluster.RUnlock() return known } // syncServers injects a value into the cluster.sync channel to force // an iteration of the syncServersLoop function. func (cluster *mongoCluster) syncServers() { select { case cluster.sync <- true: default: } } // How long to wait for a checkup of the cluster topology if nothing // else kicks a synchronization first. const syncServersDelay = 5 * time.Minute // syncServersLoop loops while the cluster is alive to keep its idea of // the server topology up-to-date. It must be called just once from // newCluster. The loop iterates once syncServersDelay has passed, or // if somebody injects a value into the cluster.sync channel to force a // synchronization. A loop iteration will contact all servers in // parallel, ask them about known peers and their own role within the // cluster, and then attempt to do the same with all the peers // retrieved. func (cluster *mongoCluster) syncServersLoop() { for { debugf("SYNC Cluster %p is starting a sync loop iteration.", cluster) cluster.Lock() if cluster.references == 0 { cluster.Unlock() break } cluster.references++ // Keep alive while syncing. direct := cluster.direct cluster.Unlock() cluster.syncServersIteration(direct) // We just synchronized, so consume any outstanding requests. select { case <-cluster.sync: default: } cluster.Release() // Hold off before allowing another sync. No point in // burning CPU looking for down servers. time.Sleep(5e8) cluster.Lock() if cluster.references == 0 { cluster.Unlock() break } // Poke all waiters so they have a chance to timeout or // restart syncing if they wish to. cluster.serverSynced.Broadcast() // Check if we have to restart immediately either way. restart := !direct && cluster.masters.Empty() || cluster.servers.Empty() cluster.Unlock() if restart { log("SYNC No masters found. Will synchronize again.") continue } debugf("SYNC Cluster %p waiting for next requested or scheduled sync.", cluster) // Hold off until somebody explicitly requests a synchronization // or it's time to check for a cluster topology change again. select { case <-cluster.sync: case <-time.After(syncServersDelay): } } debugf("SYNC Cluster %p is stopping its sync loop.", cluster) } func (cluster *mongoCluster) syncServersIteration(direct bool) { log("SYNC Starting full topology synchronization...") var wg sync.WaitGroup var m sync.Mutex mergePending := make(map[string]*mongoServer) mergeRequested := make(map[string]bool) seen := make(map[string]bool) goodSync := false var spawnSync func(addr string, byMaster bool) spawnSync = func(addr string, byMaster bool) { wg.Add(1) go func() { defer wg.Done() server, err := newServer(addr, cluster.sync) if err != nil { log("SYNC Failed to start sync of ", addr, ": ", err.Error()) return } m.Lock() if byMaster { if s, found := mergePending[server.ResolvedAddr]; found { delete(mergePending, server.ResolvedAddr) m.Unlock() cluster.mergeServer(s) return } mergeRequested[server.ResolvedAddr] = true } if seen[server.ResolvedAddr] { m.Unlock() return } seen[server.ResolvedAddr] = true m.Unlock() hosts, err := cluster.syncServer(server) if err == nil { isMaster := server.IsMaster() if !direct { for _, addr := range hosts { spawnSync(addr, isMaster) } } m.Lock() merge := direct || isMaster if mergeRequested[server.ResolvedAddr] { merge = true } else if !merge { mergePending[server.ResolvedAddr] = server } if merge { goodSync = true } m.Unlock() if merge { cluster.mergeServer(server) } } }() } for _, addr := range cluster.getKnownAddrs() { spawnSync(addr, false) } wg.Wait() for _, server := range mergePending { if goodSync { cluster.removeServer(server) } else { server.Close() } } cluster.Lock() log("SYNC Synchronization completed: ", cluster.masters.Len(), " master(s) and, ", cluster.slaves.Len(), " slave(s) alive.") // Update dynamic seeds, but only if we have any good servers. Otherwise, // leave them alone for better chances of a successful sync in the future. if goodSync { dynaSeeds := make([]string, cluster.servers.Len()) for i, server := range cluster.servers.Slice() { dynaSeeds[i] = server.Addr } cluster.dynaSeeds = dynaSeeds debugf("SYNC New dynamic seeds: %#v\n", dynaSeeds) } cluster.Unlock() } // AcquireSocket returns a socket to a server in the cluster. If slaveOk is // true, it will attempt to return a socket to a slave server. If it is // false, the socket will necessarily be to a master server. func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Duration) (s *mongoSocket, err error) { started := time.Now() for { cluster.RLock() for { debugf("Cluster has %d known masters and %d known slaves.", cluster.masters.Len(), cluster.slaves.Len()) if !cluster.masters.Empty() || slaveOk && !cluster.slaves.Empty() { break } if syncTimeout != 0 && started.Before(time.Now().Add(-syncTimeout)) { cluster.RUnlock() return nil, errors.New("no reachable servers") } log("Waiting for servers to synchronize...") cluster.syncServers() cluster.serverSynced.Wait() } var server *mongoServer if !slaveOk || cluster.slaves.Empty() { i := rand.Intn(cluster.masters.Len()) server = cluster.masters.Get(i) } else { i := rand.Intn(cluster.slaves.Len()) server = cluster.slaves.Get(i) } cluster.RUnlock() s, err = server.AcquireSocket() if err != nil { cluster.removeServer(server) cluster.syncServers() continue } return s, nil } panic("unreached") } func (cluster *mongoCluster) CacheIndex(cacheKey string, exists bool) { cluster.Lock() if cluster.cachedIndex == nil { cluster.cachedIndex = make(map[string]bool) } if exists { cluster.cachedIndex[cacheKey] = true } else { delete(cluster.cachedIndex, cacheKey) } cluster.Unlock() } func (cluster *mongoCluster) HasCachedIndex(cacheKey string) (result bool) { cluster.RLock() if cluster.cachedIndex != nil { result = cluster.cachedIndex[cacheKey] } cluster.RUnlock() return } func (cluster *mongoCluster) ResetIndexCache() { cluster.Lock() cluster.cachedIndex = make(map[string]bool) cluster.Unlock() }