third_party/mgo: updated to rev 253

Change-Id: I995db6e123b1ef3d6061774aa333c7dbe645a744
This commit is contained in:
mpl 2013-12-13 00:51:30 +01:00
parent 22101b61d8
commit 96f0e39700
21 changed files with 3806 additions and 961 deletions

View File

@ -1,6 +1,6 @@
mgo - MongoDB driver for Go
Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
Copyright (c) 2010-2013 - Gustavo Niemeyer <gustavo@niemeyer.net>
All rights reserved.

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -27,11 +27,11 @@
package mgo
import (
"camlistore.org/third_party/labix.org/v2/mgo/bson"
"crypto/md5"
"encoding/hex"
"errors"
"fmt"
"camlistore.org/third_party/labix.org/v2/mgo/bson"
"sync"
)
@ -91,13 +91,13 @@ func (socket *mongoSocket) resetNonce() {
op.limit = -1
op.replyFunc = func(err error, reply *replyOp, docNum int, docData []byte) {
if err != nil {
socket.kill(errors.New("getNonce: " + err.Error()), true)
socket.kill(errors.New("getNonce: "+err.Error()), true)
return
}
result := &getNonceResult{}
err = bson.Unmarshal(docData, &result)
if err != nil {
socket.kill(errors.New("Failed to unmarshal nonce: " + err.Error()), true)
socket.kill(errors.New("Failed to unmarshal nonce: "+err.Error()), true)
return
}
debugf("Socket %p to %s: nonce unmarshalled: %#v", socket, socket.addr, result)
@ -125,7 +125,7 @@ func (socket *mongoSocket) resetNonce() {
}
err := socket.Query(op)
if err != nil {
socket.kill(errors.New("resetNonce: " + err.Error()), true)
socket.kill(errors.New("resetNonce: "+err.Error()), true)
}
}

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -27,54 +27,62 @@
package mgo_test
import (
. "camlistore.org/third_party/launchpad.net/gocheck"
"camlistore.org/third_party/labix.org/v2/mgo"
. "camlistore.org/third_party/launchpad.net/gocheck"
"fmt"
"sync"
"time"
)
func (s *S) TestAuthLogin(c *C) {
session, err := mgo.Dial("localhost:40002")
c.Assert(err, IsNil)
defer session.Close()
// Test both with a normal database and with an authenticated shard.
for _, addr := range []string{"localhost:40002", "localhost:40203"} {
session, err := mgo.Dial(addr)
c.Assert(err, IsNil)
defer session.Close()
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized|need to login|not authorized .*")
admindb := session.DB("admin")
admindb := session.DB("admin")
err = admindb.Login("root", "wrong")
c.Assert(err, ErrorMatches, "auth fails")
err = admindb.Login("root", "wrong")
c.Assert(err, ErrorMatches, "auth fails")
err = admindb.Login("root", "rapadura")
c.Assert(err, IsNil)
err = admindb.Login("root", "rapadura")
c.Assert(err, IsNil)
err = coll.Insert(M{"n": 1})
c.Assert(err, IsNil)
err = coll.Insert(M{"n": 1})
c.Assert(err, IsNil)
}
}
func (s *S) TestAuthLoginLogout(c *C) {
session, err := mgo.Dial("localhost:40002")
c.Assert(err, IsNil)
defer session.Close()
// Test both with a normal database and with an authenticated shard.
for _, addr := range []string{"localhost:40002", "localhost:40203"} {
session, err := mgo.Dial(addr)
c.Assert(err, IsNil)
defer session.Close()
admindb := session.DB("admin")
err = admindb.Login("root", "rapadura")
c.Assert(err, IsNil)
admindb := session.DB("admin")
err = admindb.Login("root", "rapadura")
c.Assert(err, IsNil)
admindb.Logout()
admindb.Logout()
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized|need to login|not authorized .*")
// Must have dropped auth from the session too.
session = session.Copy()
defer session.Close()
// Must have dropped auth from the session too.
session = session.Copy()
defer session.Close()
coll = session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
coll = session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized|need to login|not authorized .*")
}
}
func (s *S) TestAuthLoginLogoutAll(c *C) {
@ -90,7 +98,7 @@ func (s *S) TestAuthLoginLogoutAll(c *C) {
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
c.Assert(err, ErrorMatches, "unauthorized|need to login|not authorized .*")
// Must have dropped auth from the session too.
session = session.Copy()
@ -98,7 +106,181 @@ func (s *S) TestAuthLoginLogoutAll(c *C) {
coll = session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
c.Assert(err, ErrorMatches, "unauthorized|need to login|not authorized .*")
}
func (s *S) TestAuthUpsertUserErrors(c *C) {
session, err := mgo.Dial("localhost:40002")
c.Assert(err, IsNil)
defer session.Close()
admindb := session.DB("admin")
err = admindb.Login("root", "rapadura")
c.Assert(err, IsNil)
mydb := session.DB("mydb")
err = mydb.UpsertUser(&mgo.User{})
c.Assert(err, ErrorMatches, "user has no Username")
err = mydb.UpsertUser(&mgo.User{Username: "user", Password: "pass", UserSource: "source"})
c.Assert(err, ErrorMatches, "user has both Password/PasswordHash and UserSource set")
err = mydb.UpsertUser(&mgo.User{Username: "user", Password: "pass", OtherDBRoles: map[string][]mgo.Role{"db": nil}})
c.Assert(err, ErrorMatches, "user with OtherDBRoles is only supported in admin database")
}
func (s *S) TestAuthUpsertUser(c *C) {
if !s.versionAtLeast(2, 4) {
c.Skip("UpsertUser only works on 2.4+")
}
session, err := mgo.Dial("localhost:40002")
c.Assert(err, IsNil)
defer session.Close()
admindb := session.DB("admin")
err = admindb.Login("root", "rapadura")
c.Assert(err, IsNil)
mydb := session.DB("mydb")
myotherdb := session.DB("myotherdb")
ruser := &mgo.User{
Username: "myruser",
Password: "mypass",
Roles: []mgo.Role{mgo.RoleRead},
}
rwuser := &mgo.User{
Username: "myrwuser",
Password: "mypass",
Roles: []mgo.Role{mgo.RoleReadWrite},
}
rwuserother := &mgo.User{
Username: "myrwuser",
UserSource: "mydb",
Roles: []mgo.Role{mgo.RoleRead},
}
err = mydb.UpsertUser(ruser)
c.Assert(err, IsNil)
err = mydb.UpsertUser(rwuser)
c.Assert(err, IsNil)
err = myotherdb.UpsertUser(rwuserother)
c.Assert(err, IsNil)
err = mydb.Login("myruser", "mypass")
c.Assert(err, IsNil)
admindb.Logout()
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized|not authorized .*")
err = mydb.Login("myrwuser", "mypass")
c.Assert(err, IsNil)
err = coll.Insert(M{"n": 1})
c.Assert(err, IsNil)
// Test indirection via UserSource: we can't write to it, because
// the roles for myrwuser are different there.
othercoll := myotherdb.C("myothercoll")
err = othercoll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized|not authorized .*")
// Reading works, though.
err = othercoll.Find(nil).One(nil)
c.Assert(err, Equals, mgo.ErrNotFound)
// Can't login directly into the database using UserSource, though.
err = myotherdb.Login("myrwuser", "mypass")
c.Assert(err, ErrorMatches, "auth fails")
}
func (s *S) TestAuthUpserUserOtherDBRoles(c *C) {
if !s.versionAtLeast(2, 4) {
c.Skip("UpsertUser only works on 2.4+")
}
session, err := mgo.Dial("localhost:40002")
c.Assert(err, IsNil)
defer session.Close()
admindb := session.DB("admin")
err = admindb.Login("root", "rapadura")
c.Assert(err, IsNil)
ruser := &mgo.User{
Username: "myruser",
Password: "mypass",
OtherDBRoles: map[string][]mgo.Role{"mydb": []mgo.Role{mgo.RoleRead}},
}
err = admindb.UpsertUser(ruser)
c.Assert(err, IsNil)
defer admindb.RemoveUser("myruser")
admindb.Logout()
err = admindb.Login("myruser", "mypass")
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized|not authorized .*")
err = coll.Find(nil).One(nil)
c.Assert(err, Equals, mgo.ErrNotFound)
}
func (s *S) TestAuthUpserUserUnsetFields(c *C) {
if !s.versionAtLeast(2, 4) {
c.Skip("UpsertUser only works on 2.4+")
}
session, err := mgo.Dial("localhost:40002")
c.Assert(err, IsNil)
defer session.Close()
admindb := session.DB("admin")
err = admindb.Login("root", "rapadura")
c.Assert(err, IsNil)
// Insert a user with most fields set.
user := &mgo.User{
Username: "myruser",
Password: "mypass",
Roles: []mgo.Role{mgo.RoleRead},
OtherDBRoles: map[string][]mgo.Role{"mydb": []mgo.Role{mgo.RoleRead}},
}
err = admindb.UpsertUser(user)
c.Assert(err, IsNil)
defer admindb.RemoveUser("myruser")
// Now update the user with few things set.
user = &mgo.User{
Username: "myruser",
UserSource: "mydb",
}
err = admindb.UpsertUser(user)
c.Assert(err, IsNil)
// Everything that was unset must have been dropped.
var userm M
err = admindb.C("system.users").Find(M{"user": "myruser"}).One(&userm)
c.Assert(err, IsNil)
delete(userm, "_id")
c.Assert(userm, DeepEquals, M{"user": "myruser", "userSource": "mydb", "roles": []interface{}{}})
// Now set password again...
user = &mgo.User{
Username: "myruser",
Password: "mypass",
}
err = admindb.UpsertUser(user)
c.Assert(err, IsNil)
// ... and assert that userSource has been dropped.
err = admindb.C("system.users").Find(M{"user": "myruser"}).One(&userm)
_, found := userm["userSource"]
c.Assert(found, Equals, false)
}
func (s *S) TestAuthAddUser(c *C) {
@ -123,7 +305,7 @@ func (s *S) TestAuthAddUser(c *C) {
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
c.Assert(err, ErrorMatches, "unauthorized|not authorized .*")
err = mydb.Login("mywuser", "mypass")
c.Assert(err, IsNil)
@ -156,7 +338,7 @@ func (s *S) TestAuthAddUserReplaces(c *C) {
// ReadOnly flag was changed too.
err = mydb.C("mycoll").Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
c.Assert(err, ErrorMatches, "unauthorized|not authorized .*")
}
func (s *S) TestAuthRemoveUser(c *C) {
@ -233,7 +415,7 @@ func (s *S) TestAuthLoginSwitchUser(c *C) {
// Can't write.
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
c.Assert(err, ErrorMatches, "unauthorized|not authorized .*")
// But can read.
result := struct{ N int }{}
@ -268,7 +450,7 @@ func (s *S) TestAuthLoginChangePassword(c *C) {
// The second login must be in effect, which means read-only.
err = mydb.C("mycoll").Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
c.Assert(err, ErrorMatches, "unauthorized|not authorized .*")
}
func (s *S) TestAuthLoginCachingWithSessionRefresh(c *C) {
@ -335,7 +517,7 @@ func (s *S) TestAuthLoginCachingWithNewSession(c *C) {
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
c.Assert(err, ErrorMatches, "unauthorized|need to login|not authorized for .*")
}
func (s *S) TestAuthLoginCachingAcrossPool(c *C) {
@ -432,7 +614,7 @@ func (s *S) TestAuthLoginCachingAcrossPoolWithLogout(c *C) {
// Can't write, since root has been implicitly logged out
// when the collection went into the pool, and not revalidated.
err = other.DB("mydb").C("mycoll").Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized")
c.Assert(err, ErrorMatches, "unauthorized|not authorized .*")
// But can read due to the revalidated myuser login.
result := struct{ N int }{}
@ -515,3 +697,82 @@ func (s *S) TestAuthURLWithNewSession(c *C) {
err = session.DB("mydb").C("mycoll").Insert(M{"n": 1})
c.Assert(err, IsNil)
}
func (s *S) TestAuthURLWithDatabase(c *C) {
session, err := mgo.Dial("mongodb://root:rapadura@localhost:40002")
c.Assert(err, IsNil)
defer session.Close()
mydb := session.DB("mydb")
err = mydb.AddUser("myruser", "mypass", true)
c.Assert(err, IsNil)
usession, err := mgo.Dial("mongodb://myruser:mypass@localhost:40002/mydb")
c.Assert(err, IsNil)
defer usession.Close()
ucoll := usession.DB("mydb").C("mycoll")
err = ucoll.FindId(0).One(nil)
c.Assert(err, Equals, mgo.ErrNotFound)
err = ucoll.Insert(M{"n": 1})
c.Assert(err, ErrorMatches, "unauthorized|not authorized .*")
}
func (s *S) TestDefaultDatabase(c *C) {
tests := []struct{ url, db string }{
{"mongodb://root:rapadura@localhost:40002", "test"},
{"mongodb://root:rapadura@localhost:40002/admin", "admin"},
{"mongodb://localhost:40001", "test"},
{"mongodb://localhost:40001/", "test"},
{"mongodb://localhost:40001/mydb", "mydb"},
}
for _, test := range tests {
session, err := mgo.Dial(test.url)
c.Assert(err, IsNil)
defer session.Close()
c.Logf("test: %#v", test)
c.Assert(session.DB("").Name, Equals, test.db)
scopy := session.Copy()
c.Check(scopy.DB("").Name, Equals, test.db)
scopy.Close()
}
}
func (s *S) TestAuthDirect(c *C) {
// Direct connections must work to the master and slaves.
for _, port := range []string{"40031", "40032", "40033"} {
url := fmt.Sprintf("mongodb://root:rapadura@localhost:%s/?connect=direct", port)
session, err := mgo.Dial(url)
c.Assert(err, IsNil)
defer session.Close()
session.SetMode(mgo.Monotonic, true)
var result struct{}
err = session.DB("mydb").C("mycoll").Find(nil).One(&result)
c.Assert(err, Equals, mgo.ErrNotFound)
}
}
func (s *S) TestAuthDirectWithLogin(c *C) {
// Direct connections must work to the master and slaves.
for _, port := range []string{"40031", "40032", "40033"} {
url := fmt.Sprintf("mongodb://localhost:%s/?connect=direct", port)
session, err := mgo.Dial(url)
c.Assert(err, IsNil)
defer session.Close()
session.SetMode(mgo.Monotonic, true)
session.SetSyncTimeout(3 * time.Second)
err = session.DB("admin").Login("root", "rapadura")
c.Assert(err, IsNil)
var result struct{}
err = session.DB("mydb").C("mycoll").Find(nil).One(&result)
c.Assert(err, Equals, mgo.ErrNotFound)
}
}

View File

@ -1,18 +1,18 @@
// BSON library for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -24,14 +24,22 @@
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// Package bson is an implementation of the BSON specification for Go:
//
// http://bsonspec.org
//
// It was created as part of the mgo MongoDB driver for Go, but is standalone
// and may be used on its own without the driver.
package bson
import (
"crypto/md5"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"reflect"
"runtime"
@ -58,11 +66,12 @@ type Getter interface {
// value via the SetBSON method during unmarshaling, and the object
// itself will not be changed as usual.
//
// If setting the value works, the method should return nil. If it returns
// a bson.TypeError value, the BSON value will be omitted from a map or
// slice being decoded and the unmarshalling will continue. If it returns
// any other non-nil error, the unmarshalling procedure will stop and error
// out with the provided value.
// If setting the value works, the method should return nil or alternatively
// bson.SetZero to set the respective field to its zero value (nil for
// pointer types). If SetBSON returns a value of type bson.TypeError, the
// BSON value will be omitted from a map or slice being decoded and the
// unmarshalling will continue. If it returns any other non-nil error, the
// unmarshalling procedure will stop and error out with the provided value.
//
// This interface is generally useful in pointer receivers, since the method
// will want to change the receiver. A type field that implements the Setter
@ -84,6 +93,11 @@ type Setter interface {
SetBSON(raw Raw) error
}
// SetZero may be returned from a SetBSON method to have the value set to
// its respective zero value. When used in pointer values, this will set the
// field to nil rather than to the pre-allocated value.
var SetZero = errors.New("set to zero")
// M is a convenient alias for a map[string]interface{} map, useful for
// dealing with BSON in a native way. For instance:
//
@ -94,23 +108,30 @@ type Setter interface {
// undefined ordered. See also the bson.D type for an ordered alternative.
type M map[string]interface{}
// D is a type for dealing with documents containing ordered elements in a
// native fashion. For instance:
// D represents a BSON document containing ordered elements. For example:
//
// bson.D{{"a", 1}, {"b", true}}
//
// In some situations, such as when creating indexes for MongoDB, the order in
// which the elements are defined is important. If the order is not important,
// using a map is generally more comfortable. See the bson.M type and the
// Map() method for D.
// using a map is generally more comfortable. See bson.M and bson.RawD.
type D []DocElem
// See the bson.D type.
// See the D type.
type DocElem struct {
Name string
Value interface{}
}
// Map returns a map out of the ordered element name/value pairs in d.
func (d D) Map() (m M) {
m = make(M, len(d))
for _, item := range d {
m[item.Name] = item.Value
}
return m
}
// The Raw type represents raw unprocessed BSON documents and elements.
// Kind is the kind of element as defined per the BSON specification, and
// Data is the raw unprocessed data for the respective element.
@ -125,13 +146,16 @@ type Raw struct {
Data []byte
}
// Map returns a map out of the ordered element name/value pairs in d.
func (d D) Map() (m M) {
m = make(M, len(d))
for _, item := range d {
m[item.Name] = item.Value
}
return m
// RawD represents a BSON document containing raw unprocessed elements.
// This low-level representation may be useful when lazily processing
// documents of uncertain content, or when manipulating the raw content
// documents in general.
type RawD []RawDocElem
// See the RawD type.
type RawDocElem struct {
Name string
Value Raw
}
// ObjectId is a unique ID identifying a BSON value. It must be exactly 12 bytes
@ -143,7 +167,7 @@ type ObjectId string
// ObjectIdHex returns an ObjectId from the provided hex representation.
// Calling this function with an invalid hex representation will
// cause a runtime panic.
// cause a runtime panic. See the IsObjectIdHex function.
func ObjectIdHex(s string) ObjectId {
d, err := hex.DecodeString(s)
if err != nil || len(d) != 12 {
@ -152,40 +176,52 @@ func ObjectIdHex(s string) ObjectId {
return ObjectId(d)
}
// IsObjectIdHex returns whether s is a valid hex representation of
// an ObjectId. See the ObjectIdHex function.
func IsObjectIdHex(s string) bool {
if len(s) != 24 {
return false
}
_, err := hex.DecodeString(s)
return err == nil
}
// objectIdCounter is atomically incremented when generating a new ObjectId
// using NewObjectId() function. It's used as a counter part of an id.
var objectIdCounter uint32 = 0
// machineId stores machine id generated once and used in subsequent calls
// to NewObjectId function.
var machineId []byte
var machineId = readMachineId()
// initMachineId generates machine id and puts it into the machineId global
// variable. If this function fails to get the hostname, it will cause
// a runtime error.
func initMachineId() {
func readMachineId() []byte {
var sum [3]byte
hostname, err := os.Hostname()
if err != nil {
panic("Failed to get hostname: " + err.Error())
id := sum[:]
hostname, err1 := os.Hostname()
if err1 != nil {
_, err2 := io.ReadFull(rand.Reader, id)
if err2 != nil {
panic(fmt.Errorf("cannot get hostname: %v; %v", err1, err2))
}
return id
}
hw := md5.New()
hw.Write([]byte(hostname))
copy(sum[:3], hw.Sum(nil))
machineId = sum[:]
copy(id, hw.Sum(nil))
return id
}
// NewObjectId returns a new unique ObjectId.
// This function causes a runtime error if it fails to get the hostname
// of the current machine.
func NewObjectId() ObjectId {
b := make([]byte, 12)
var b [12]byte
// Timestamp, 4 bytes, big endian
binary.BigEndian.PutUint32(b, uint32(time.Now().Unix()))
binary.BigEndian.PutUint32(b[:], uint32(time.Now().Unix()))
// Machine, first 3 bytes of md5(hostname)
if machineId == nil {
initMachineId()
}
b[4] = machineId[0]
b[5] = machineId[1]
b[6] = machineId[2]
@ -198,7 +234,7 @@ func NewObjectId() ObjectId {
b[9] = byte(i >> 16)
b[10] = byte(i >> 8)
b[11] = byte(i)
return ObjectId(b)
return ObjectId(b[:])
}
// NewObjectIdWithTime returns a dummy ObjectId with the timestamp part filled
@ -294,7 +330,7 @@ type Symbol string
// why this function exists. Using the time.Now function also works fine
// otherwise.
func Now() time.Time {
return time.Unix(0, time.Now().UnixNano() / 1e6 * 1e6)
return time.Unix(0, time.Now().UnixNano()/1e6*1e6)
}
// MongoTimestamp is a special internal type used by MongoDB that for some
@ -383,16 +419,16 @@ func handleErr(err *error) {
//
// The following flags are currently supported:
//
// omitempty Only include the field if it's not set to the zero
// value for the type or to empty slices or maps.
// Does not apply to zero valued structs.
// omitempty Only include the field if it's not set to the zero
// value for the type or to empty slices or maps.
//
// minsize Marshal an int64 value as an int32, if that's feasible
// while preserving the numeric value.
// minsize Marshal an int64 value as an int32, if that's feasible
// while preserving the numeric value.
//
// inline Inline the field, which must be a struct, causing all
// of its fields to be processed as if they were part of
// the outer struct.
// inline Inline the field, which must be a struct or a map,
// causing all of its fields or keys to be processed as if
// they were part of the outer struct. For maps, keys must
// not conflict with the bson keys of other struct fields.
//
// Some examples:
//
@ -404,7 +440,7 @@ func handleErr(err *error) {
// E int64 ",minsize"
// F int64 "myf,omitempty,minsize"
// }
//
//
func Marshal(in interface{}) (out []byte, err error) {
defer handleErr(&err)
e := &encoder{make([]byte, 0, initialBufferSize)}
@ -413,10 +449,24 @@ func Marshal(in interface{}) (out []byte, err error) {
}
// Unmarshal deserializes data from in into the out value. The out value
// must be a map or a pointer to a struct (or a pointer to a struct pointer).
// must be a map, a pointer to a struct, or a pointer to a bson.D value.
// The lowercased field name is used as the key for each exported field,
// but this behavior may be changed using the respective field tag.
// Uninitialized pointer values are properly initialized only when necessary.
// The tag may also contain flags to tweak the marshalling behavior for
// the field. The tag formats accepted are:
//
// "[<key>][,<flag1>[,<flag2>]]"
//
// `(...) bson:"[<key>][,<flag1>[,<flag2>]]" (...)`
//
// The following flags are currently supported during unmarshal (see the
// Marshal method for other flags):
//
// inline Inline the field, which must be a struct or a map.
// Inlined structs are handled as if its fields were part
// of the outer struct. An inlined map causes keys that do
// not match any other struct field to be inserted in the
// map rather than being discarded as usual.
//
// The target field or element types of out may not necessarily match
// the BSON values of the provided data. The following conversions are
@ -428,14 +478,16 @@ func Marshal(in interface{}) (out []byte, err error) {
// - Numeric types are converted to bools as true if not 0 or false otherwise
// - Binary and string BSON data is converted to a string, array or byte slice
//
// If the value would not fit the type and cannot be converted, it's silently
// skipped.
// If the value would not fit the type and cannot be converted, it's
// silently skipped.
//
// Pointer values are initialized when necessary.
func Unmarshal(in []byte, out interface{}) (err error) {
defer handleErr(&err)
v := reflect.ValueOf(out)
switch v.Kind() {
case reflect.Map, reflect.Ptr:
d := &decoder{in: in}
d := newDecoder(in)
d.readDocTo(v)
case reflect.Struct:
return errors.New("Unmarshal can't deal with struct values. Use a pointer.")
@ -458,7 +510,7 @@ func (raw Raw) Unmarshal(out interface{}) (err error) {
v = v.Elem()
fallthrough
case reflect.Map:
d := &decoder{in: raw.Data}
d := newDecoder(raw.Data)
good := d.readElemTo(v, raw.Kind)
if !good {
return &TypeError{v.Type(), raw.Kind}
@ -486,6 +538,7 @@ func (e *TypeError) Error() string {
type structInfo struct {
FieldsMap map[string]fieldInfo
FieldsList []fieldInfo
InlineMap int
Zero reflect.Value
}
@ -516,6 +569,7 @@ func getStructInfo(st reflect.Type) (*structInfo, error) {
n := st.NumField()
fieldsMap := make(map[string]fieldInfo)
fieldsList := make([]fieldInfo, 0, n)
inlineMap := -1
for i := 0; i != n; i++ {
field := st.Field(i)
if field.PkgPath != "" {
@ -570,25 +624,35 @@ func getStructInfo(st reflect.Type) (*structInfo, error) {
}
if inline {
if field.Type.Kind() != reflect.Struct {
panic("Option ,inline needs a struct value field")
}
sinfo, err := getStructInfo(field.Type)
if err != nil {
return nil, err
}
for _, finfo := range sinfo.FieldsList {
if _, found := fieldsMap[finfo.Key]; found {
msg := "Duplicated key '" + finfo.Key + "' in struct " + st.String()
return nil, errors.New(msg)
switch field.Type.Kind() {
case reflect.Map:
if inlineMap >= 0 {
return nil, errors.New("Multiple ,inline maps in struct " + st.String())
}
if finfo.Inline == nil {
finfo.Inline = []int{i, finfo.Num}
} else {
finfo.Inline = append([]int{i}, finfo.Inline...)
if field.Type.Key() != reflect.TypeOf("") {
return nil, errors.New("Option ,inline needs a map with string keys in struct " + st.String())
}
fieldsMap[finfo.Key] = finfo
fieldsList = append(fieldsList, finfo)
inlineMap = info.Num
case reflect.Struct:
sinfo, err := getStructInfo(field.Type)
if err != nil {
return nil, err
}
for _, finfo := range sinfo.FieldsList {
if _, found := fieldsMap[finfo.Key]; found {
msg := "Duplicated key '" + finfo.Key + "' in struct " + st.String()
return nil, errors.New(msg)
}
if finfo.Inline == nil {
finfo.Inline = []int{i, finfo.Num}
} else {
finfo.Inline = append([]int{i}, finfo.Inline...)
}
fieldsMap[finfo.Key] = finfo
fieldsList = append(fieldsList, finfo)
}
default:
panic("Option ,inline needs a struct value or map field")
}
continue
}
@ -609,7 +673,8 @@ func getStructInfo(st reflect.Type) (*structInfo, error) {
}
sinfo = &structInfo{
fieldsMap,
fieldsList[:len(fieldsMap)],
fieldsList,
inlineMap,
reflect.New(st).Elem(),
}
structMapMutex.Lock()

View File

@ -1,18 +1,18 @@
// BSON library for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -28,11 +28,11 @@
package bson_test
import (
"camlistore.org/third_party/labix.org/v2/mgo/bson"
. "camlistore.org/third_party/launchpad.net/gocheck"
"encoding/binary"
"encoding/json"
"errors"
. "camlistore.org/third_party/launchpad.net/gocheck"
"camlistore.org/third_party/labix.org/v2/mgo/bson"
"net/url"
"reflect"
"testing"
@ -59,12 +59,17 @@ func wrapInDoc(data string) string {
func makeZeroDoc(value interface{}) (zero interface{}) {
v := reflect.ValueOf(value)
t := v.Type()
if t.Kind() == reflect.Map {
switch t.Kind() {
case reflect.Map:
mv := reflect.MakeMap(t)
zero = mv.Interface()
} else {
case reflect.Ptr:
pv := reflect.New(v.Type().Elem())
zero = pv.Interface()
case reflect.Slice:
zero = reflect.New(t).Interface()
default:
panic("unsupported doc type")
}
return zero
}
@ -225,6 +230,18 @@ func (s *S) TestUnmarshalZeroesMap(c *C) {
c.Assert(m, DeepEquals, bson.M{"b": 2})
}
func (s *S) TestUnmarshalNonNilInterface(c *C) {
data, err := bson.Marshal(bson.M{"b": 2})
c.Assert(err, IsNil)
m := bson.M{"a": 1}
var i interface{}
i = m
err = bson.Unmarshal(data, &i)
c.Assert(err, IsNil)
c.Assert(i, DeepEquals, bson.M{"b": 2})
c.Assert(m, DeepEquals, bson.M{"a": 1})
}
// --------------------------------------------------------------------------
// Some one way marshaling operations which would unmarshal differently.
@ -336,6 +353,20 @@ func (s *S) TestUnmarshalStructSampleItems(c *C) {
}
}
func (s *S) Test64bitInt(c *C) {
var i int64 = (1 << 31)
if int(i) > 0 {
data, err := bson.Marshal(bson.M{"i": int(i)})
c.Assert(err, IsNil)
c.Assert(string(data), Equals, wrapInDoc("\x12i\x00\x00\x00\x00\x80\x00\x00\x00\x00"))
var result struct{ I int }
err = bson.Unmarshal(data, &result)
c.Assert(err, IsNil)
c.Assert(int64(result.I), Equals, i)
}
}
// --------------------------------------------------------------------------
// Generic two-way struct marshaling tests.
@ -429,8 +460,18 @@ var marshalItems = []testItemType{
// Ordered document dump. Will unmarshal as a dictionary by default.
{bson.D{{"a", nil}, {"c", nil}, {"b", nil}, {"d", nil}, {"f", nil}, {"e", true}},
"\x0Aa\x00\x0Ac\x00\x0Ab\x00\x0Ad\x00\x0Af\x00\x08e\x00\x01"},
{MyD{{"a", nil}, {"c", nil}, {"b", nil}, {"d", nil}, {"f", nil}, {"e", true}},
"\x0Aa\x00\x0Ac\x00\x0Ab\x00\x0Ad\x00\x0Af\x00\x08e\x00\x01"},
{&dOnIface{bson.D{{"a", nil}, {"c", nil}, {"b", nil}, {"d", true}}},
"\x03d\x00" + wrapInDoc("\x0Aa\x00\x0Ac\x00\x0Ab\x00\x08d\x00\x01")},
{bson.RawD{{"a", bson.Raw{0x0A, nil}}, {"c", bson.Raw{0x0A, nil}}, {"b", bson.Raw{0x08, []byte{0x01}}}},
"\x0Aa\x00" + "\x0Ac\x00" + "\x08b\x00\x01"},
{MyRawD{{"a", bson.Raw{0x0A, nil}}, {"c", bson.Raw{0x0A, nil}}, {"b", bson.Raw{0x08, []byte{0x01}}}},
"\x0Aa\x00" + "\x0Ac\x00" + "\x08b\x00\x01"},
{&dOnIface{bson.RawD{{"a", bson.Raw{0x0A, nil}}, {"c", bson.Raw{0x0A, nil}}, {"b", bson.Raw{0x08, []byte{0x01}}}}},
"\x03d\x00" + wrapInDoc("\x0Aa\x00"+"\x0Ac\x00"+"\x08b\x00\x01")},
{&ignoreField{"before", "ignore", "after"},
"\x02before\x00\a\x00\x00\x00before\x00\x02after\x00\x06\x00\x00\x00after\x00"},
@ -491,9 +532,17 @@ var unmarshalItems = []testItemType{
{&bson.Raw{0x03, []byte(wrapInDoc("\x10byte\x00\x08\x00\x00\x00"))},
"\x10byte\x00\x08\x00\x00\x00"},
// RawD document.
{&struct{ bson.RawD }{bson.RawD{{"a", bson.Raw{0x0A, []byte{}}}, {"c", bson.Raw{0x0A, []byte{}}}, {"b", bson.Raw{0x08, []byte{0x01}}}}},
"\x03rawd\x00" + wrapInDoc("\x0Aa\x00\x0Ac\x00\x08b\x00\x01")},
// Decode old binary.
{bson.M{"_": []byte("old")},
"\x05_\x00\x07\x00\x00\x00\x02\x03\x00\x00\x00old"},
// Decode old binary without length. According to the spec, this shouldn't happen.
{bson.M{"_": []byte("old")},
"\x05_\x00\x03\x00\x00\x00\x02old"},
}
func (s *S) TestUnmarshalOneWayItems(c *C) {
@ -533,9 +582,15 @@ var marshalErrorItems = []testItemType{
{bson.Raw{0x0A, []byte{}},
"Attempted to unmarshal Raw kind 10 as a document"},
{&inlineCantPtr{&struct{ A, B int }{1, 2}},
"Option ,inline needs a struct value field"},
"Option ,inline needs a struct value or map field"},
{&inlineDupName{1, struct{ A, B int }{2, 3}},
"Duplicated key 'a' in struct bson_test.inlineDupName"},
{&inlineDupMap{},
"Multiple ,inline maps in struct bson_test.inlineDupMap"},
{&inlineBadKeyMap{},
"Option ,inline needs a map with string keys in struct bson_test.inlineBadKeyMap"},
{&inlineMap{A: 1, M: map[string]interface{}{"a": 1}},
`Can't have key "a" in inlined map; conflicts with struct field`},
}
func (s *S) TestMarshalErrorItems(c *C) {
@ -753,9 +808,7 @@ func (s *S) TestUnmarshalSetterOmits(c *C) {
func (s *S) TestUnmarshalSetterErrors(c *C) {
boom := errors.New("BOOM")
setterResult["2"] = boom
defer func() {
delete(setterResult, "2")
}()
defer delete(setterResult, "2")
m := map[string]*setterType{}
data := wrapInDoc("\x02abc\x00\x02\x00\x00\x001\x00" +
@ -775,6 +828,22 @@ func (s *S) TestDMap(c *C) {
c.Assert(d.Map(), DeepEquals, bson.M{"a": 1, "b": 2})
}
func (s *S) TestUnmarshalSetterSetZero(c *C) {
setterResult["foo"] = bson.SetZero
defer delete(setterResult, "field")
data, err := bson.Marshal(bson.M{"field": "foo"})
c.Assert(err, IsNil)
m := map[string]*setterType{}
err = bson.Unmarshal([]byte(data), m)
c.Assert(err, IsNil)
value, ok := m["field"]
c.Assert(ok, Equals, true)
c.Assert(value, IsNil)
}
// --------------------------------------------------------------------------
// Getter test cases.
@ -869,6 +938,9 @@ type condInt struct {
type condUInt struct {
V uint ",omitempty"
}
type condFloat struct {
V float64 ",omitempty"
}
type condIface struct {
V interface{} ",omitempty"
}
@ -887,6 +959,9 @@ type namedCondStr struct {
type condTime struct {
V time.Time ",omitempty"
}
type condStruct struct {
V struct{ A []int } ",omitempty"
}
type shortInt struct {
V int64 ",minsize"
@ -914,17 +989,44 @@ type inlineDupName struct {
A int
V struct{ A, B int } ",inline"
}
type inlineMap struct {
A int
M map[string]interface{} ",inline"
}
type inlineMapInt struct {
A int
M map[string]int ",inline"
}
type inlineMapMyM struct {
A int
M MyM ",inline"
}
type inlineDupMap struct {
M1 map[string]interface{} ",inline"
M2 map[string]interface{} ",inline"
}
type inlineBadKeyMap struct {
M map[int]int ",inline"
}
type MyBytes []byte
type MyBool bool
type (
MyString string
MyBytes []byte
MyBool bool
MyD []bson.DocElem
MyRawD []bson.RawDocElem
MyM map[string]interface{}
)
var truevar = true
var falsevar = false
var (
truevar = true
falsevar = false
var int64var = int64(42)
var int64ptr = &int64var
var intvar = int(42)
var intptr = &intvar
int64var = int64(42)
int64ptr = &int64var
intvar = int(42)
intptr = &intvar
)
func parseURL(s string) *url.URL {
u, err := url.Parse(s)
@ -1040,6 +1142,7 @@ var twoWayCrossItems = []crossTypeItem{
{&condInt{}, map[string]int{}},
{&condUInt{1}, map[string]uint{"v": 1}},
{&condUInt{}, map[string]uint{}},
{&condFloat{}, map[string]int{}},
{&condStr{"yo"}, map[string]string{"v": "yo"}},
{&condStr{}, map[string]string{}},
{&condStrNS{"yo"}, map[string]string{"v": "yo"}},
@ -1058,6 +1161,9 @@ var twoWayCrossItems = []crossTypeItem{
{&condTime{time.Unix(123456789, 123e6)}, map[string]time.Time{"v": time.Unix(123456789, 123e6)}},
{&condTime{}, map[string]string{}},
{&condStruct{struct{ A []int }{[]int{1}}}, bson.M{"v": bson.M{"a": []interface{}{1}}}},
{&condStruct{struct{ A []int }{}}, bson.M{}},
{&namedCondStr{"yo"}, map[string]string{"myv": "yo"}},
{&namedCondStr{}, map[string]string{}},
@ -1074,6 +1180,11 @@ var twoWayCrossItems = []crossTypeItem{
{&shortNonEmptyInt{}, map[string]interface{}{}},
{&inlineInt{struct{ A, B int }{1, 2}}, map[string]interface{}{"a": 1, "b": 2}},
{&inlineMap{A: 1, M: map[string]interface{}{"b": 2}}, map[string]interface{}{"a": 1, "b": 2}},
{&inlineMap{A: 1, M: nil}, map[string]interface{}{"a": 1}},
{&inlineMapInt{A: 1, M: map[string]int{"b": 2}}, map[string]int{"a": 1, "b": 2}},
{&inlineMapInt{A: 1, M: nil}, map[string]int{"a": 1}},
{&inlineMapMyM{A: 1, M: MyM{"b": MyM{"c": 3}}}, map[string]interface{}{"a": 1, "b": map[string]interface{}{"c": 3}}},
// []byte <=> MyBytes
{&struct{ B MyBytes }{[]byte("abc")}, map[string]string{"b": "abc"}},
@ -1096,13 +1207,30 @@ var twoWayCrossItems = []crossTypeItem{
// zero time + 1 second + 1 millisecond; overflows int64 as nanoseconds
{&struct{ V time.Time }{time.Unix(-62135596799, 1e6).Local()},
map[string]interface{}{"v": time.Unix(-62135596799, 1e6).Local()}},
// bson.D <=> []DocElem
{&bson.D{{"a", bson.D{{"b", 1}, {"c", 2}}}}, &bson.D{{"a", bson.D{{"b", 1}, {"c", 2}}}}},
{&bson.D{{"a", bson.D{{"b", 1}, {"c", 2}}}}, &MyD{{"a", MyD{{"b", 1}, {"c", 2}}}}},
// bson.RawD <=> []RawDocElem
{&bson.RawD{{"a", bson.Raw{0x08, []byte{0x01}}}}, &bson.RawD{{"a", bson.Raw{0x08, []byte{0x01}}}}},
{&bson.RawD{{"a", bson.Raw{0x08, []byte{0x01}}}}, &MyRawD{{"a", bson.Raw{0x08, []byte{0x01}}}}},
// bson.M <=> map
{bson.M{"a": bson.M{"b": 1, "c": 2}}, MyM{"a": MyM{"b": 1, "c": 2}}},
{bson.M{"a": bson.M{"b": 1, "c": 2}}, map[string]interface{}{"a": map[string]interface{}{"b": 1, "c": 2}}},
// bson.M <=> map[MyString]
{bson.M{"a": bson.M{"b": 1, "c": 2}}, map[MyString]interface{}{"a": map[MyString]interface{}{"b": 1, "c": 2}}},
}
// Same thing, but only one way (obj1 => obj2).
var oneWayCrossItems = []crossTypeItem{
// map <=> struct
{map[string]interface{}{"a": 1, "b": "2", "c": 3},
map[string]int{"a": 1, "c": 3}},
{map[string]interface{}{"a": 1, "b": "2", "c": 3}, map[string]int{"a": 1, "c": 3}},
// inline map elides badly typed values
{map[string]interface{}{"a": 1, "b": "2", "c": 3}, &inlineMapInt{A: 1, M: map[string]int{"c": 3}}},
// Can't decode int into struct.
{bson.M{"a": bson.M{"b": 2}}, &struct{ A bool }{}},
@ -1146,6 +1274,21 @@ func (s *S) TestObjectIdHex(c *C) {
c.Assert(id.Hex(), Equals, "4d88e15b60f486e428412dc9")
}
func (s *S) TestIsObjectIdHex(c *C) {
test := []struct {
id string
valid bool
}{
{"4d88e15b60f486e428412dc9", true},
{"4d88e15b60f486e428412dc", false},
{"4d88e15b60f486e428412dc9e", false},
{"4d88e15b60f486e428412dcx", false},
}
for _, t := range test {
c.Assert(bson.IsObjectIdHex(t.id), Equals, t.valid)
}
}
// --------------------------------------------------------------------------
// ObjectId parts extraction tests.

View File

@ -1,18 +1,18 @@
// BSON library for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -37,8 +37,15 @@ import (
)
type decoder struct {
in []byte
i int
in []byte
i int
docType reflect.Type
}
var typeM = reflect.TypeOf(M{})
func newDecoder(in []byte) *decoder {
return &decoder{in, 0, typeM}
}
// --------------------------------------------------------------------------
@ -106,6 +113,13 @@ func getSetter(outt reflect.Type, out reflect.Value) Setter {
return out.Interface().(Setter)
}
func clearMap(m reflect.Value) {
var none reflect.Value
for _, k := range m.MapKeys() {
m.SetMapIndex(k, none)
}
}
func (d *decoder) readDocTo(out reflect.Value) {
var elemType reflect.Type
outt := out.Type()
@ -134,31 +148,44 @@ func (d *decoder) readDocTo(out reflect.Value) {
}
var fieldsMap map[string]fieldInfo
var inlineMap reflect.Value
start := d.i
switch outk {
case reflect.Interface:
if !out.IsNil() {
panic("Found non-nil interface. Please contact the developers.")
origout := out
if outk == reflect.Interface {
if d.docType.Kind() == reflect.Map {
mv := reflect.MakeMap(d.docType)
out.Set(mv)
out = mv
} else {
dv := reflect.New(d.docType).Elem()
out.Set(dv)
out = dv
}
mv := reflect.ValueOf(make(M))
out.Set(mv)
out = mv
outt = out.Type()
outk = outt.Kind()
fallthrough
}
docType := d.docType
keyType := typeString
convertKey := false
switch outk {
case reflect.Map:
if outt.Key().Kind() != reflect.String {
keyType = outt.Key()
if keyType.Kind() != reflect.String {
panic("BSON map must have string keys. Got: " + outt.String())
}
if keyType != typeString {
convertKey = true
}
elemType = outt.Elem()
if elemType == typeIface {
d.docType = outt
}
if out.IsNil() {
out.Set(reflect.MakeMap(out.Type()))
} else if out.Len() > 0 {
var none reflect.Value
for _, k := range out.MapKeys() {
out.SetMapIndex(k, none)
}
clearMap(out)
}
case reflect.Struct:
if outt != typeRaw {
@ -168,12 +195,33 @@ func (d *decoder) readDocTo(out reflect.Value) {
}
fieldsMap = sinfo.FieldsMap
out.Set(sinfo.Zero)
if sinfo.InlineMap != -1 {
inlineMap = out.Field(sinfo.InlineMap)
if !inlineMap.IsNil() && inlineMap.Len() > 0 {
clearMap(inlineMap)
}
elemType = inlineMap.Type().Elem()
if elemType == typeIface {
d.docType = inlineMap.Type()
}
}
}
case reflect.Slice:
switch outt.Elem() {
case typeDocElem:
origout.Set(d.readDocElems(outt))
return
case typeRawDocElem:
origout.Set(d.readRawDocElems(outt))
return
}
fallthrough
default:
panic("Unsupported document type for unmarshalling: " + out.Type().String())
}
end := d.i - 4 + int(d.readInt32())
end := int(d.readInt32())
end += d.i - 4
if end <= d.i || end > len(d.in) || d.in[end-1] != '\x00' {
corrupted()
}
@ -188,11 +236,15 @@ func (d *decoder) readDocTo(out reflect.Value) {
case reflect.Map:
e := reflect.New(elemType).Elem()
if d.readElemTo(e, kind) {
out.SetMapIndex(reflect.ValueOf(name), e)
k := reflect.ValueOf(name)
if convertKey {
k = k.Convert(keyType)
}
out.SetMapIndex(k, e)
}
case reflect.Struct:
if outt == typeRaw {
d.readElemTo(blackHole, kind)
d.dropElem(kind)
} else {
if info, ok := fieldsMap[name]; ok {
if info.Inline == nil {
@ -200,10 +252,19 @@ func (d *decoder) readDocTo(out reflect.Value) {
} else {
d.readElemTo(out.FieldByIndex(info.Inline), kind)
}
} else if inlineMap.IsValid() {
if inlineMap.IsNil() {
inlineMap.Set(reflect.MakeMap(inlineMap.Type()))
}
e := reflect.New(elemType).Elem()
if d.readElemTo(e, kind) {
inlineMap.SetMapIndex(reflect.ValueOf(name), e)
}
} else {
d.dropElem(kind)
}
}
case reflect.Slice:
}
if d.i >= end {
@ -214,17 +275,16 @@ func (d *decoder) readDocTo(out reflect.Value) {
if d.i != end {
corrupted()
}
d.docType = docType
switch outk {
case reflect.Struct:
if outt == typeRaw {
out.Set(reflect.ValueOf(Raw{0x03, d.in[start:d.i]}))
}
if outt == typeRaw {
out.Set(reflect.ValueOf(Raw{0x03, d.in[start:d.i]}))
}
}
func (d *decoder) readArrayDocTo(out reflect.Value) {
end := d.i - 4 + int(d.readInt32())
end := int(d.readInt32())
end += d.i - 4
if end <= d.i || end > len(d.in) || d.in[end-1] != '\x00' {
corrupted()
}
@ -261,7 +321,8 @@ func (d *decoder) readSliceDoc(t reflect.Type) interface{} {
tmp := make([]reflect.Value, 0, 8)
elemType := t.Elem()
end := d.i - 4 + int(d.readInt32())
end := int(d.readInt32())
end += d.i - 4
if end <= d.i || end > len(d.in) || d.in[end-1] != '\x00' {
corrupted()
}
@ -295,11 +356,13 @@ func (d *decoder) readSliceDoc(t reflect.Type) interface{} {
return slice.Interface()
}
var typeD = reflect.TypeOf(D{})
var typeSlice = reflect.TypeOf([]interface{}{})
var typeIface = typeSlice.Elem()
func (d *decoder) readDocD() interface{} {
slice := make(D, 0, 8)
func (d *decoder) readDocElems(typ reflect.Type) reflect.Value {
docType := d.docType
d.docType = typ
slice := make([]DocElem, 0, 8)
d.readDocWith(func(kind byte, name string) {
e := DocElem{Name: name}
v := reflect.ValueOf(&e.Value)
@ -307,11 +370,32 @@ func (d *decoder) readDocD() interface{} {
slice = append(slice, e)
}
})
return slice
slicev := reflect.New(typ).Elem()
slicev.Set(reflect.ValueOf(slice))
d.docType = docType
return slicev
}
func (d *decoder) readRawDocElems(typ reflect.Type) reflect.Value {
docType := d.docType
d.docType = typ
slice := make([]RawDocElem, 0, 8)
d.readDocWith(func(kind byte, name string) {
e := RawDocElem{Name: name}
v := reflect.ValueOf(&e.Value)
if d.readElemTo(v.Elem(), kind) {
slice = append(slice, e)
}
})
slicev := reflect.New(typ).Elem()
slicev.Set(reflect.ValueOf(slice))
d.docType = docType
return slicev
}
func (d *decoder) readDocWith(f func(kind byte, name string)) {
end := d.i - 4 + int(d.readInt32())
end := int(d.readInt32())
end += d.i - 4
if end <= d.i || end > len(d.in) || d.in[end-1] != '\x00' {
corrupted()
}
@ -354,9 +438,12 @@ func (d *decoder) readElemTo(out reflect.Value, kind byte) (good bool) {
case reflect.Interface, reflect.Ptr, reflect.Struct, reflect.Map:
d.readDocTo(out)
default:
if _, ok := out.Interface().(D); ok {
out.Set(reflect.ValueOf(d.readDocD()))
} else {
switch out.Interface().(type) {
case D:
out.Set(d.readDocElems(out.Type()))
case RawD:
out.Set(d.readRawDocElems(out.Type()))
default:
d.readDocTo(blackHole)
}
}
@ -443,9 +530,14 @@ func (d *decoder) readElemTo(out reflect.Value, kind byte) (good bool) {
if setter := getSetter(outt, out); setter != nil {
err := setter.SetBSON(Raw{kind, d.in[start:d.i]})
if err == SetZero {
out.Set(reflect.Zero(outt))
return true
}
if err == nil {
return true
} else if _, ok := err.(*TypeError); !ok {
}
if _, ok := err.(*TypeError); !ok {
panic(err)
}
return false
@ -621,7 +713,7 @@ func (d *decoder) readBinary() Binary {
b := Binary{}
b.Kind = d.readByte()
b.Data = d.readBytes(l)
if b.Kind == 0x02 {
if b.Kind == 0x02 && len(b.Data) >= 4 {
// Weird obsolete format with redundant length.
b.Data = b.Data[4:]
}

View File

@ -1,18 +1,18 @@
// BSON library for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -28,6 +28,7 @@
package bson
import (
"fmt"
"math"
"net/url"
"reflect"
@ -45,9 +46,11 @@ var (
typeMongoTimestamp = reflect.TypeOf(MongoTimestamp(0))
typeOrderKey = reflect.TypeOf(MinKey)
typeDocElem = reflect.TypeOf(DocElem{})
typeRawDocElem = reflect.TypeOf(RawDocElem{})
typeRaw = reflect.TypeOf(Raw{})
typeURL = reflect.TypeOf(url.URL{})
typeTime = reflect.TypeOf(time.Time{})
typeString = reflect.TypeOf("")
)
const itoaCacheSize = 32
@ -130,6 +133,18 @@ func (e *encoder) addStruct(v reflect.Value) {
panic(err)
}
var value reflect.Value
if sinfo.InlineMap >= 0 {
m := v.Field(sinfo.InlineMap)
if m.Len() > 0 {
for _, k := range m.MapKeys() {
ks := k.String()
if _, found := sinfo.FieldsMap[ks]; found {
panic(fmt.Sprintf("Can't have key %q in inlined map; conflicts with struct field", ks))
}
e.addElem(ks, m.MapIndex(k), false)
}
}
}
for _, info := range sinfo.FieldsList {
if info.Inline == nil {
value = v.Field(info.Num)
@ -157,25 +172,56 @@ func isZero(v reflect.Value) bool {
return v.Int() == 0
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
return v.Uint() == 0
case reflect.Float32, reflect.Float64:
return v.Float() == 0
case reflect.Bool:
return !v.Bool()
case reflect.Struct:
if v.Type() == typeTime {
return v.Interface().(time.Time).IsZero()
}
for i := v.NumField() - 1; i >= 0; i-- {
if !isZero(v.Field(i)) {
return false
}
}
return true
}
return false
}
func (e *encoder) addSlice(v reflect.Value) {
if d, ok := v.Interface().(D); ok {
vi := v.Interface()
if d, ok := vi.(D); ok {
for _, elem := range d {
e.addElem(elem.Name, reflect.ValueOf(elem.Value), false)
}
} else {
for i := 0; i != v.Len(); i++ {
e.addElem(itoa(i), v.Index(i), false)
return
}
if d, ok := vi.(RawD); ok {
for _, elem := range d {
e.addElem(elem.Name, reflect.ValueOf(elem.Value), false)
}
return
}
l := v.Len()
et := v.Type().Elem()
if et == typeDocElem {
for i := 0; i < l; i++ {
elem := v.Index(i).Interface().(DocElem)
e.addElem(elem.Name, reflect.ValueOf(elem.Value), false)
}
return
}
if et == typeRawDocElem {
for i := 0; i < l; i++ {
elem := v.Index(i).Interface().(RawDocElem)
e.addElem(elem.Name, reflect.ValueOf(elem.Value), false)
}
return
}
for i := 0; i < l; i++ {
e.addElem(itoa(i), v.Index(i), false)
}
}
@ -247,32 +293,27 @@ func (e *encoder) addElem(name string, v reflect.Value, minSize bool) {
}
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
if v.Type().Kind() <= reflect.Int32 {
e.addElemName('\x10', name)
e.addInt32(int32(v.Int()))
} else {
switch v.Type() {
case typeMongoTimestamp:
e.addElemName('\x11', name)
e.addInt64(v.Int())
switch v.Type() {
case typeMongoTimestamp:
e.addElemName('\x11', name)
e.addInt64(v.Int())
case typeOrderKey:
if v.Int() == int64(MaxKey) {
e.addElemName('\x7F', name)
} else {
e.addElemName('\xFF', name)
}
case typeOrderKey:
if v.Int() == int64(MaxKey) {
e.addElemName('\x7F', name)
} else {
e.addElemName('\xFF', name)
}
default:
i := v.Int()
if minSize && i >= math.MinInt32 && i <= math.MaxInt32 {
// It fits into an int32, encode as such.
e.addElemName('\x10', name)
e.addInt32(int32(i))
} else {
e.addElemName('\x12', name)
e.addInt64(i)
}
default:
i := v.Int()
if (minSize || v.Type().Kind() != reflect.Int64) && i >= math.MinInt32 && i <= math.MaxInt32 {
// It fits into an int32, encode as such.
e.addElemName('\x10', name)
e.addInt32(int32(i))
} else {
e.addElemName('\x12', name)
e.addInt64(i)
}
}
@ -294,7 +335,7 @@ func (e *encoder) addElem(name string, v reflect.Value, minSize bool) {
if et.Kind() == reflect.Uint8 {
e.addElemName('\x05', name)
e.addBinary('\x00', v.Bytes())
} else if et == typeDocElem {
} else if et == typeDocElem || et == typeRawDocElem {
e.addElemName('\x03', name)
e.addDoc(v)
} else {
@ -347,7 +388,7 @@ func (e *encoder) addElem(name string, v reflect.Value, minSize bool) {
case time.Time:
// MongoDB handles timestamps as milliseconds.
e.addElemName('\x09', name)
e.addInt64(s.Unix() * 1000 + int64(s.Nanosecond() / 1e6))
e.addInt64(s.Unix()*1000 + int64(s.Nanosecond()/1e6))
case url.URL:
e.addElemName('\x02', name)

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -27,7 +27,9 @@
package mgo
import (
"camlistore.org/third_party/labix.org/v2/mgo/bson"
"errors"
"net"
"sync"
"time"
)
@ -51,12 +53,19 @@ type mongoCluster struct {
direct bool
cachedIndex map[string]bool
sync chan bool
dial dialer
}
func newCluster(userSeeds []string, direct bool) *mongoCluster {
cluster := &mongoCluster{userSeeds: userSeeds, references: 1, direct: direct}
func newCluster(userSeeds []string, direct bool, dial dialer) *mongoCluster {
cluster := &mongoCluster{
userSeeds: userSeeds,
references: 1,
direct: direct,
dial: dial,
}
cluster.serverSynced.L = cluster.RWMutex.RLocker()
cluster.sync = make(chan bool, 1)
stats.cluster(+1)
go cluster.syncServersLoop()
return cluster
}
@ -84,6 +93,7 @@ func (cluster *mongoCluster) Release() {
}
// Wake up the sync loop so it can die.
cluster.syncServers()
stats.cluster(-1)
}
cluster.Unlock()
}
@ -115,34 +125,55 @@ type isMasterResult struct {
Primary string
Hosts []string
Passives []string
Tags bson.D
Msg string
}
func (cluster *mongoCluster) syncServer(server *mongoServer) (hosts []string, err error) {
func (cluster *mongoCluster) isMaster(socket *mongoSocket, result *isMasterResult) error {
// Monotonic let's it talk to a slave and still hold the socket.
session := newSession(Monotonic, cluster, 10*time.Second)
session.setSocket(socket)
err := session.Run("ismaster", result)
session.Close()
return err
}
type possibleTimeout interface {
Timeout() bool
}
var syncSocketTimeout = 5 * time.Second
func (cluster *mongoCluster) syncServer(server *mongoServer) (info *mongoServerInfo, hosts []string, err error) {
addr := server.Addr
log("SYNC Processing ", addr, "...")
// Retry a few times to avoid knocking a server down for a hiccup.
var result isMasterResult
var tryerr error
for retry := 0; ; retry++ {
if retry == 3 {
return nil, tryerr
return nil, nil, tryerr
}
if retry > 0 {
// Don't abuse the server needlessly if there's something actually wrong.
if err, ok := tryerr.(possibleTimeout); ok && err.Timeout() {
// Give a chance for waiters to timeout as well.
cluster.serverSynced.Broadcast()
}
time.Sleep(500 * time.Millisecond)
}
socket, err := server.AcquireSocket(0)
// It's not clear what would be a good timeout here. Is it
// better to wait longer or to retry?
socket, _, err := server.AcquireSocket(0, syncSocketTimeout)
if err != nil {
tryerr = err
logf("SYNC Failed to get socket to %s: %v", addr, err)
continue
}
// Monotonic will let us talk to a slave and still hold the socket.
session := newSession(Monotonic, cluster, socket, 10 * time.Second)
// session holds the socket now.
err = cluster.isMaster(socket, &result)
socket.Release()
err = session.Run("ismaster", &result)
session.Close()
if err != nil {
tryerr = err
logf("SYNC Command 'ismaster' to %s failed: %v", addr, err)
@ -156,15 +187,22 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (hosts []string, er
debugf("SYNC %s is a master.", addr)
// Made an incorrect assumption above, so fix stats.
stats.conn(-1, false)
server.SetMaster(true)
stats.conn(+1, true)
} else if result.Secondary {
debugf("SYNC %s is a slave.", addr)
} else if cluster.direct {
logf("SYNC %s in unknown state. Pretending it's a slave due to direct connection.", addr)
} else {
logf("SYNC %s is neither a master nor a slave.", addr)
// Made an incorrect assumption above, so fix stats.
stats.conn(-1, false)
return nil, errors.New(addr + " is not a master nor slave")
return nil, nil, errors.New(addr + " is not a master nor slave")
}
info = &mongoServerInfo{
Master: result.IsMaster,
Mongos: result.Msg == "isdbgrid",
Tags: result.Tags,
}
hosts = make([]string, 0, 1+len(result.Hosts)+len(result.Passives))
@ -176,33 +214,48 @@ func (cluster *mongoCluster) syncServer(server *mongoServer) (hosts []string, er
hosts = append(hosts, result.Passives...)
debugf("SYNC %s knows about the following peers: %#v", addr, hosts)
return hosts, nil
return info, hosts, nil
}
func (cluster *mongoCluster) mergeServer(server *mongoServer) {
type syncKind bool
const (
completeSync syncKind = true
partialSync syncKind = false
)
func (cluster *mongoCluster) addServer(server *mongoServer, info *mongoServerInfo, syncKind syncKind) {
cluster.Lock()
previous := cluster.servers.Search(server)
isMaster := server.IsMaster()
if previous == nil {
current := cluster.servers.Search(server.ResolvedAddr)
if current == nil {
if syncKind == partialSync {
cluster.Unlock()
server.Close()
log("SYNC Discarding unknown server ", server.Addr, " due to partial sync.")
return
}
cluster.servers.Add(server)
if isMaster {
if info.Master {
cluster.masters.Add(server)
log("SYNC Adding ", server.Addr, " to cluster as a master.")
} else {
log("SYNC Adding ", server.Addr, " to cluster as a slave.")
}
} else {
if isMaster != previous.IsMaster() {
if isMaster {
if server != current {
panic("addServer attempting to add duplicated server")
}
if server.Info().Master != info.Master {
if info.Master {
log("SYNC Server ", server.Addr, " is now a master.")
cluster.masters.Add(previous)
cluster.masters.Add(server)
} else {
log("SYNC Server ", server.Addr, " is now a slave.")
cluster.masters.Remove(previous)
cluster.masters.Remove(server)
}
}
previous.Merge(server)
}
server.SetInfo(info)
debugf("SYNC Broadcasting availability of server %s", server.Addr)
cluster.serverSynced.Broadcast()
cluster.Unlock()
@ -246,7 +299,7 @@ func (cluster *mongoCluster) syncServers() {
// How long to wait for a checkup of the cluster topology if nothing
// else kicks a synchronization before that.
const syncServersDelay = 3 * time.Minute
const syncServersDelay = 30 * time.Second
// syncServersLoop loops while the cluster is alive to keep its idea of
// the server topology up-to-date. It must be called just once from
@ -281,7 +334,7 @@ func (cluster *mongoCluster) syncServersLoop() {
// Hold off before allowing another sync. No point in
// burning CPU looking for down servers.
time.Sleep(5e8)
time.Sleep(500 * time.Millisecond)
cluster.Lock()
if cluster.references == 0 {
@ -312,15 +365,42 @@ func (cluster *mongoCluster) syncServersLoop() {
debugf("SYNC Cluster %p is stopping its sync loop.", cluster)
}
func (cluster *mongoCluster) server(addr string, tcpaddr *net.TCPAddr) *mongoServer {
cluster.RLock()
server := cluster.servers.Search(tcpaddr.String())
cluster.RUnlock()
if server != nil {
return server
}
return newServer(addr, tcpaddr, cluster.sync, cluster.dial)
}
func resolveAddr(addr string) (*net.TCPAddr, error) {
tcpaddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
log("SYNC Failed to resolve ", addr, ": ", err.Error())
return nil, err
}
if tcpaddr.String() != addr {
debug("SYNC Address ", addr, " resolved as ", tcpaddr.String())
}
return tcpaddr, nil
}
type pendingAdd struct {
server *mongoServer
info *mongoServerInfo
}
func (cluster *mongoCluster) syncServersIteration(direct bool) {
log("SYNC Starting full topology synchronization...")
var wg sync.WaitGroup
var m sync.Mutex
mergePending := make(map[string]*mongoServer)
mergeRequested := make(map[string]bool)
notYetAdded := make(map[string]pendingAdd)
addIfFound := make(map[string]bool)
seen := make(map[string]bool)
goodSync := false
syncKind := partialSync
var spawnSync func(addr string, byMaster bool)
spawnSync = func(addr string, byMaster bool) {
@ -328,66 +408,71 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
go func() {
defer wg.Done()
server, err := newServer(addr, cluster.sync)
tcpaddr, err := resolveAddr(addr)
if err != nil {
log("SYNC Failed to start sync of ", addr, ": ", err.Error())
return
}
resolvedAddr := tcpaddr.String()
m.Lock()
if byMaster {
if s, found := mergePending[server.ResolvedAddr]; found {
delete(mergePending, server.ResolvedAddr)
if pending, ok := notYetAdded[resolvedAddr]; ok {
delete(notYetAdded, resolvedAddr)
m.Unlock()
cluster.mergeServer(s)
cluster.addServer(pending.server, pending.info, completeSync)
return
}
mergeRequested[server.ResolvedAddr] = true
addIfFound[resolvedAddr] = true
}
if seen[server.ResolvedAddr] {
if seen[resolvedAddr] {
m.Unlock()
return
}
seen[server.ResolvedAddr] = true
seen[resolvedAddr] = true
m.Unlock()
hosts, err := cluster.syncServer(server)
if err == nil {
isMaster := server.IsMaster()
if !direct {
for _, addr := range hosts {
spawnSync(addr, isMaster)
}
}
server := cluster.server(addr, tcpaddr)
info, hosts, err := cluster.syncServer(server)
if err != nil {
cluster.removeServer(server)
return
}
m.Lock()
merge := direct || isMaster
if mergeRequested[server.ResolvedAddr] {
merge = true
} else if !merge {
mergePending[server.ResolvedAddr] = server
}
if merge {
goodSync = true
}
m.Unlock()
if merge {
cluster.mergeServer(server)
m.Lock()
add := direct || info.Master || addIfFound[resolvedAddr]
if add {
syncKind = completeSync
} else {
notYetAdded[resolvedAddr] = pendingAdd{server, info}
}
m.Unlock()
if add {
cluster.addServer(server, info, completeSync)
}
if !direct {
for _, addr := range hosts {
spawnSync(addr, info.Master)
}
}
}()
}
for _, addr := range cluster.getKnownAddrs() {
knownAddrs := cluster.getKnownAddrs()
for _, addr := range knownAddrs {
spawnSync(addr, false)
}
wg.Wait()
for _, server := range mergePending {
if goodSync {
cluster.removeServer(server)
} else {
server.Close()
if syncKind == completeSync {
logf("SYNC Synchronization was complete (got data from primary).")
for _, pending := range notYetAdded {
cluster.removeServer(pending.server)
}
} else {
logf("SYNC Synchronization was partial (cannot talk to primary).")
for _, pending := range notYetAdded {
cluster.addServer(pending.server, pending.info, partialSync)
}
}
@ -397,7 +482,7 @@ func (cluster *mongoCluster) syncServersIteration(direct bool) {
// Update dynamic seeds, but only if we have any good servers. Otherwise,
// leave them alone for better chances of a successful sync in the future.
if goodSync {
if syncKind == completeSync {
dynaSeeds := make([]string, cluster.servers.Len())
for i, server := range cluster.servers.Slice() {
dynaSeeds[i] = server.Addr
@ -413,7 +498,7 @@ var socketsPerServer = 4096
// AcquireSocket returns a socket to a server in the cluster. If slaveOk is
// true, it will attempt to return a socket to a slave server. If it is
// false, the socket will necessarily be to a master server.
func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Duration) (s *mongoSocket, err error) {
func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Duration, socketTimeout time.Duration, serverTags []bson.D) (s *mongoSocket, err error) {
var started time.Time
warnedLimit := false
for {
@ -440,13 +525,19 @@ func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Durati
var server *mongoServer
if slaveOk {
server = cluster.servers.MostAvailable()
server = cluster.servers.BestFit(serverTags)
} else {
server = cluster.masters.MostAvailable()
server = cluster.masters.BestFit(nil)
}
cluster.RUnlock()
s, err = server.AcquireSocket(socketsPerServer)
if server == nil {
// Must have failed the requested tags. Sleep to avoid spinning.
time.Sleep(1e8)
continue
}
s, abended, err := server.AcquireSocket(socketsPerServer, socketTimeout)
if err == errSocketLimit {
if !warnedLimit {
log("WARNING: Per-server connection limit reached.")
@ -459,6 +550,17 @@ func (cluster *mongoCluster) AcquireSocket(slaveOk bool, syncTimeout time.Durati
cluster.syncServers()
continue
}
if abended && !slaveOk {
var result isMasterResult
err := cluster.isMaster(s, &result)
if err != nil || !result.IsMaster {
logf("Cannot confirm server %s as master (%v)", server.Addr, err)
s.Release()
cluster.syncServers()
time.Sleep(1e8)
continue
}
}
return s, nil
}
panic("unreached")

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -27,11 +27,14 @@
package mgo_test
import (
"io"
. "camlistore.org/third_party/launchpad.net/gocheck"
"camlistore.org/third_party/labix.org/v2/mgo"
"camlistore.org/third_party/labix.org/v2/mgo/bson"
. "camlistore.org/third_party/launchpad.net/gocheck"
"fmt"
"io"
"net"
"strings"
"sync"
"time"
)
@ -81,7 +84,7 @@ func (s *S) TestNewSession(c *C) {
m := M{}
ok := iter.Next(m)
c.Assert(ok, Equals, true)
err = iter.Err()
err = iter.Close()
c.Assert(err, IsNil)
// If Batch(-1) is in effect, a single document must have been received.
@ -146,7 +149,7 @@ func (s *S) TestCloneSession(c *C) {
m := M{}
ok := iter.Next(m)
c.Assert(ok, Equals, true)
err = iter.Err()
err = iter.Close()
c.Assert(err, IsNil)
// If Batch(-1) is in effect, a single document must have been received.
@ -226,7 +229,7 @@ func (s *S) TestSetModeMonotonic(c *C) {
stats := mgo.GetStats()
c.Assert(stats.MasterConns, Equals, 1)
c.Assert(stats.SlaveConns, Equals, 2)
c.Assert(stats.SocketsInUse, Equals, 1)
c.Assert(stats.SocketsInUse, Equals, 2)
session.SetMode(mgo.Monotonic, true)
@ -307,6 +310,51 @@ func (s *S) TestSetModeStrongAfterMonotonic(c *C) {
c.Assert(result["ismaster"], Equals, true)
}
func (s *S) TestSetModeMonotonicWriteOnIteration(c *C) {
// Must necessarily connect to a slave, otherwise the
// master connection will be available first.
session, err := mgo.Dial("localhost:40012")
c.Assert(err, IsNil)
defer session.Close()
session.SetMode(mgo.Monotonic, false)
c.Assert(session.Mode(), Equals, mgo.Monotonic)
coll1 := session.DB("mydb").C("mycoll1")
coll2 := session.DB("mydb").C("mycoll2")
ns := []int{40, 41, 42, 43, 44, 45, 46}
for _, n := range ns {
err := coll1.Insert(M{"n": n})
c.Assert(err, IsNil)
}
// Release master so we can grab a slave again.
session.Refresh()
// Wait until synchronization is done.
for {
n, err := coll1.Count()
c.Assert(err, IsNil)
if n == len(ns) {
break
}
}
iter := coll1.Find(nil).Batch(2).Iter()
i := 0
m := M{}
for iter.Next(&m) {
i++
if i > 3 {
err := coll2.Insert(M{"n": 47 + i})
c.Assert(err, IsNil)
}
}
c.Assert(i, Equals, len(ns))
}
func (s *S) TestSetModeEventual(c *C) {
// Must necessarily connect to a slave, otherwise the
// master connection will be available first.
@ -417,6 +465,61 @@ func (s *S) TestPrimaryShutdownStrong(c *C) {
err = session.Run("serverStatus", result)
c.Assert(err, IsNil)
c.Assert(result.Host, Not(Equals), host)
// Insert some data to confirm it's indeed a master.
err = session.DB("mydb").C("mycoll").Insert(M{"n": 42})
c.Assert(err, IsNil)
}
func (s *S) TestPrimaryHiccup(c *C) {
if *fast {
c.Skip("-fast")
}
session, err := mgo.Dial("localhost:40021")
c.Assert(err, IsNil)
defer session.Close()
// With strong consistency, this will open a socket to the master.
result := &struct{ Host string }{}
err = session.Run("serverStatus", result)
c.Assert(err, IsNil)
// Establish a few extra sessions to create spare sockets to
// the master. This increases a bit the chances of getting an
// incorrect cached socket.
var sessions []*mgo.Session
for i := 0; i < 20; i++ {
sessions = append(sessions, session.Copy())
err = sessions[len(sessions)-1].Run("serverStatus", result)
c.Assert(err, IsNil)
}
for i := range sessions {
sessions[i].Close()
}
// Kill the master, but bring it back immediatelly.
host := result.Host
s.Stop(host)
s.StartAll()
// This must fail, since the connection was broken.
err = session.Run("serverStatus", result)
c.Assert(err, Equals, io.EOF)
// With strong consistency, it fails again until reset.
err = session.Run("serverStatus", result)
c.Assert(err, Equals, io.EOF)
session.Refresh()
// Now we should be able to talk to the new master.
// Increase the timeout since this may take quite a while.
session.SetSyncTimeout(3 * time.Minute)
// Insert some data to confirm it's indeed a master.
err = session.DB("mydb").C("mycoll").Insert(M{"n": 42})
c.Assert(err, IsNil)
}
func (s *S) TestPrimaryShutdownMonotonic(c *C) {
@ -435,6 +538,9 @@ func (s *S) TestPrimaryShutdownMonotonic(c *C) {
err = coll.Insert(M{"a": 1})
c.Assert(err, IsNil)
// Wait a bit for this to be synchronized to slaves.
time.Sleep(3 * time.Second)
result := &struct{ Host string }{}
err = session.Run("serverStatus", result)
c.Assert(err, IsNil)
@ -559,6 +665,9 @@ func (s *S) TestPrimaryShutdownEventual(c *C) {
err = coll.Insert(M{"a": 1})
c.Assert(err, IsNil)
// Wait a bit for this to be synchronized to slaves.
time.Sleep(3 * time.Second)
// Kill the master.
s.Stop(master)
@ -664,7 +773,7 @@ func (s *S) TestTopologySyncWithSlaveSeed(c *C) {
c.Assert(result.Ok, Equals, true)
// One connection to each during discovery. Master
// socket recycled for insert.
// socket recycled for insert.
stats := mgo.GetStats()
c.Assert(stats.MasterConns, Equals, 1)
c.Assert(stats.SlaveConns, Equals, 2)
@ -721,6 +830,75 @@ func (s *S) TestDialWithTimeout(c *C) {
c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
}
func (s *S) TestSocketTimeout(c *C) {
if *fast {
c.Skip("-fast")
}
session, err := mgo.Dial("localhost:40001")
c.Assert(err, IsNil)
defer session.Close()
s.Freeze("localhost:40001")
timeout := 3 * time.Second
session.SetSocketTimeout(timeout)
started := time.Now()
// Do something.
result := struct{ Ok bool }{}
err = session.Run("getLastError", &result)
c.Assert(err, ErrorMatches, ".*: i/o timeout")
c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
c.Assert(started.After(time.Now().Add(-timeout*2)), Equals, true)
}
func (s *S) TestSocketTimeoutOnDial(c *C) {
if *fast {
c.Skip("-fast")
}
timeout := 1 * time.Second
defer mgo.HackSyncSocketTimeout(timeout)()
s.Freeze("localhost:40001")
started := time.Now()
session, err := mgo.DialWithTimeout("localhost:40001", timeout)
c.Assert(err, ErrorMatches, "no reachable servers")
c.Assert(session, IsNil)
c.Assert(started.Before(time.Now().Add(-timeout)), Equals, true)
c.Assert(started.After(time.Now().Add(-20*time.Second)), Equals, true)
}
func (s *S) TestSocketTimeoutOnInactiveSocket(c *C) {
if *fast {
c.Skip("-fast")
}
session, err := mgo.Dial("localhost:40001")
c.Assert(err, IsNil)
defer session.Close()
timeout := 2 * time.Second
session.SetSocketTimeout(timeout)
// Do something that relies on the timeout and works.
c.Assert(session.Ping(), IsNil)
// Freeze and wait for the timeout to go by.
s.Freeze("localhost:40001")
time.Sleep(timeout + 500*time.Millisecond)
s.Thaw("localhost:40001")
// Do something again. The timeout above should not have killed
// the socket as there was nothing to be done.
c.Assert(session.Ping(), IsNil)
}
func (s *S) TestDirect(c *C) {
session, err := mgo.Dial("localhost:40012?connect=direct")
c.Assert(err, IsNil)
@ -746,13 +924,51 @@ func (s *S) TestDirect(c *C) {
err = coll.Insert(M{"test": 1})
c.Assert(err, ErrorMatches, "no reachable servers")
// Slave is still reachable.
// Writing to the local database is okay.
coll = session.DB("local").C("mycoll")
defer coll.RemoveAll(nil)
id := bson.NewObjectId()
err = coll.Insert(M{"_id": id})
c.Assert(err, IsNil)
// Data was stored in the right server.
n, err := coll.Find(M{"_id": id}).Count()
c.Assert(err, IsNil)
c.Assert(n, Equals, 1)
// Server hasn't changed.
result.Host = ""
err = session.Run("serverStatus", result)
c.Assert(err, IsNil)
c.Assert(strings.HasSuffix(result.Host, ":40012"), Equals, true)
}
func (s *S) TestDirectToUnknownStateMember(c *C) {
session, err := mgo.Dial("localhost:40041?connect=direct")
c.Assert(err, IsNil)
defer session.Close()
session.SetMode(mgo.Monotonic, true)
result := &struct{ Host string }{}
err = session.Run("serverStatus", result)
c.Assert(err, IsNil)
c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true)
// We've got no master, so it'll timeout.
session.SetSyncTimeout(5e8 * time.Nanosecond)
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(M{"test": 1})
c.Assert(err, ErrorMatches, "no reachable servers")
// Slave is still reachable.
result.Host = ""
err = session.Run("serverStatus", result)
c.Assert(err, IsNil)
c.Assert(strings.HasSuffix(result.Host, ":40041"), Equals, true)
}
type OpCounters struct {
Insert int
Query int
@ -862,7 +1078,10 @@ func (s *S) TestRemovalOfClusterMember(c *C) {
c.Fatalf("Test started with bad cluster state: %v", master.LiveServers())
}
result := &struct{ IsMaster bool; Me string }{}
result := &struct {
IsMaster bool
Me string
}{}
slave := master.Copy()
slave.SetMode(mgo.Monotonic, true) // Monotonic can hold a non-master socket persistently.
err = slave.Run("isMaster", result)
@ -875,10 +1094,6 @@ func (s *S) TestRemovalOfClusterMember(c *C) {
master.Run(bson.D{{"$eval", `rs.add("` + slaveAddr + `")`}}, nil)
master.Close()
slave.Close()
s.Stop(slaveAddr)
// For some reason it remains FATAL if we don't wait.
time.Sleep(3e9)
}()
c.Logf("========== Removing slave: %s ==========", slaveAddr)
@ -909,6 +1124,8 @@ func (s *S) TestRemovalOfClusterMember(c *C) {
if len(live) != 2 {
c.Errorf("Removed server still considered live: %#s", live)
}
c.Log("========== Test succeeded. ==========")
}
func (s *S) TestSocketLimit(c *C) {
@ -955,3 +1172,366 @@ func (s *S) TestSocketLimit(c *C) {
c.Assert(delay > 3e9, Equals, true)
c.Assert(delay < 6e9, Equals, true)
}
func (s *S) TestSetModeEventualIterBug(c *C) {
session1, err := mgo.Dial("localhost:40011")
c.Assert(err, IsNil)
defer session1.Close()
session1.SetMode(mgo.Eventual, false)
coll1 := session1.DB("mydb").C("mycoll")
const N = 100
for i := 0; i < N; i++ {
err = coll1.Insert(M{"_id": i})
c.Assert(err, IsNil)
}
c.Logf("Waiting until secondary syncs")
for {
n, err := coll1.Count()
c.Assert(err, IsNil)
if n == N {
c.Logf("Found all")
break
}
}
session2, err := mgo.Dial("localhost:40011")
c.Assert(err, IsNil)
defer session2.Close()
session2.SetMode(mgo.Eventual, false)
coll2 := session2.DB("mydb").C("mycoll")
i := 0
iter := coll2.Find(nil).Batch(10).Iter()
var result struct{}
for iter.Next(&result) {
i++
}
c.Assert(iter.Close(), Equals, nil)
c.Assert(i, Equals, N)
}
func (s *S) TestCustomDialOld(c *C) {
dials := make(chan bool, 16)
dial := func(addr net.Addr) (net.Conn, error) {
tcpaddr, ok := addr.(*net.TCPAddr)
if !ok {
return nil, fmt.Errorf("unexpected address type: %T", addr)
}
dials <- true
return net.DialTCP("tcp", nil, tcpaddr)
}
info := mgo.DialInfo{
Addrs: []string{"localhost:40012"},
Dial: dial,
}
// Use hostname here rather than IP, to make things trickier.
session, err := mgo.DialWithInfo(&info)
c.Assert(err, IsNil)
defer session.Close()
const N = 3
for i := 0; i < N; i++ {
select {
case <-dials:
case <-time.After(5 * time.Second):
c.Fatalf("expected %d dials, got %d", N, i)
}
}
select {
case <-dials:
c.Fatalf("got more dials than expected")
case <-time.After(100 * time.Millisecond):
}
}
func (s *S) TestCustomDialNew(c *C) {
dials := make(chan bool, 16)
dial := func(addr *mgo.ServerAddr) (net.Conn, error) {
dials <- true
if addr.TCPAddr().Port == 40012 {
c.Check(addr.String(), Equals, "localhost:40012")
}
return net.DialTCP("tcp", nil, addr.TCPAddr())
}
info := mgo.DialInfo{
Addrs: []string{"localhost:40012"},
DialServer: dial,
}
// Use hostname here rather than IP, to make things trickier.
session, err := mgo.DialWithInfo(&info)
c.Assert(err, IsNil)
defer session.Close()
const N = 3
for i := 0; i < N; i++ {
select {
case <-dials:
case <-time.After(5 * time.Second):
c.Fatalf("expected %d dials, got %d", N, i)
}
}
select {
case <-dials:
c.Fatalf("got more dials than expected")
case <-time.After(100 * time.Millisecond):
}
}
func (s *S) TestPrimaryShutdownOnAuthShard(c *C) {
if *fast {
c.Skip("-fast")
}
// Dial the shard.
session, err := mgo.Dial("localhost:40203")
c.Assert(err, IsNil)
defer session.Close()
// Login and insert something to make it more realistic.
session.DB("admin").Login("root", "rapadura")
coll := session.DB("mydb").C("mycoll")
err = coll.Insert(bson.M{"n": 1})
c.Assert(err, IsNil)
// Dial the replica set to figure the master out.
rs, err := mgo.Dial("root:rapadura@localhost:40031")
c.Assert(err, IsNil)
defer rs.Close()
// With strong consistency, this will open a socket to the master.
result := &struct{ Host string }{}
err = rs.Run("serverStatus", result)
c.Assert(err, IsNil)
// Kill the master.
host := result.Host
s.Stop(host)
// This must fail, since the connection was broken.
err = rs.Run("serverStatus", result)
c.Assert(err, Equals, io.EOF)
// This won't work because the master just died.
err = coll.Insert(bson.M{"n": 2})
c.Assert(err, NotNil)
// Refresh session and wait for re-election.
session.Refresh()
for i := 0; i < 60; i++ {
err = coll.Insert(bson.M{"n": 3})
if err == nil {
break
}
c.Logf("Waiting for replica set to elect a new master. Last error: %v", err)
time.Sleep(500 * time.Millisecond)
}
c.Assert(err, IsNil)
count, err := coll.Count()
c.Assert(count > 1, Equals, true)
}
func (s *S) TestNearestSecondary(c *C) {
defer mgo.HackPingDelay(3 * time.Second)()
rs1a := "127.0.0.1:40011"
rs1b := "127.0.0.1:40012"
rs1c := "127.0.0.1:40013"
s.Freeze(rs1b)
session, err := mgo.Dial(rs1a)
c.Assert(err, IsNil)
defer session.Close()
// Wait for the sync up to run through the first couple of servers.
for len(session.LiveServers()) != 2 {
c.Log("Waiting for two servers to be alive...")
time.Sleep(100 * time.Millisecond)
}
// Extra delay to ensure the third server gets penalized.
time.Sleep(500 * time.Millisecond)
// Release third server.
s.Thaw(rs1b)
// Wait for it to come up.
for len(session.LiveServers()) != 3 {
c.Log("Waiting for all servers to be alive...")
time.Sleep(100 * time.Millisecond)
}
session.SetMode(mgo.Monotonic, true)
var result struct{ Host string }
// See which slave picks the line, several times to avoid chance.
for i := 0; i < 10; i++ {
session.Refresh()
err = session.Run("serverStatus", &result)
c.Assert(err, IsNil)
c.Assert(hostPort(result.Host), Equals, hostPort(rs1c))
}
if *fast {
// Don't hold back for several seconds.
return
}
// Now hold the other server for long enough to penalize it.
s.Freeze(rs1c)
time.Sleep(5 * time.Second)
s.Thaw(rs1c)
// Wait for the ping to be processed.
time.Sleep(500 * time.Millisecond)
// Repeating the test should now pick the former server consistently.
for i := 0; i < 10; i++ {
session.Refresh()
err = session.Run("serverStatus", &result)
c.Assert(err, IsNil)
c.Assert(hostPort(result.Host), Equals, hostPort(rs1b))
}
}
func (s *S) TestConnectCloseConcurrency(c *C) {
restore := mgo.HackPingDelay(500 * time.Millisecond)
defer restore()
var wg sync.WaitGroup
const n = 500
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer wg.Done()
session, err := mgo.Dial("localhost:40001")
if err != nil {
c.Fatal(err)
}
time.Sleep(1)
session.Close()
}()
}
wg.Wait()
}
func (s *S) TestSelectServers(c *C) {
if !s.versionAtLeast(2, 2) {
c.Skip("read preferences introduced in 2.2")
}
session, err := mgo.Dial("localhost:40011")
c.Assert(err, IsNil)
defer session.Close()
session.SetMode(mgo.Eventual, true)
var result struct{ Host string }
session.Refresh()
session.SelectServers(bson.D{{"rs1", "b"}})
err = session.Run("serverStatus", &result)
c.Assert(err, IsNil)
c.Assert(hostPort(result.Host), Equals, "40012")
session.Refresh()
session.SelectServers(bson.D{{"rs1", "c"}})
err = session.Run("serverStatus", &result)
c.Assert(err, IsNil)
c.Assert(hostPort(result.Host), Equals, "40013")
}
func (s *S) TestSelectServersWithMongos(c *C) {
if !s.versionAtLeast(2, 2) {
c.Skip("read preferences introduced in 2.2")
}
session, err := mgo.Dial("localhost:40021")
c.Assert(err, IsNil)
defer session.Close()
ssresult := &struct{ Host string }{}
imresult := &struct{ IsMaster bool }{}
// Figure the master while still using the strong session.
err = session.Run("serverStatus", ssresult)
c.Assert(err, IsNil)
err = session.Run("isMaster", imresult)
c.Assert(err, IsNil)
master := ssresult.Host
c.Assert(imresult.IsMaster, Equals, true, Commentf("%s is not the master", master))
var slave1, slave2 string
switch hostPort(master) {
case "40021":
slave1, slave2 = "b", "c"
case "40022":
slave1, slave2 = "a", "c"
case "40023":
slave1, slave2 = "a", "b"
}
// Collect op counters for everyone.
opc21a, err := getOpCounters("localhost:40021")
c.Assert(err, IsNil)
opc22a, err := getOpCounters("localhost:40022")
c.Assert(err, IsNil)
opc23a, err := getOpCounters("localhost:40023")
c.Assert(err, IsNil)
// Do a SlaveOk query through MongoS
mongos, err := mgo.Dial("localhost:40202")
c.Assert(err, IsNil)
defer mongos.Close()
mongos.SetMode(mgo.Monotonic, true)
mongos.Refresh()
mongos.SelectServers(bson.D{{"rs2", slave1}})
coll := mongos.DB("mydb").C("mycoll")
result := &struct{}{}
for i := 0; i != 5; i++ {
err := coll.Find(nil).One(result)
c.Assert(err, Equals, mgo.ErrNotFound)
}
mongos.Refresh()
mongos.SelectServers(bson.D{{"rs2", slave2}})
coll = mongos.DB("mydb").C("mycoll")
for i := 0; i != 7; i++ {
err := coll.Find(nil).One(result)
c.Assert(err, Equals, mgo.ErrNotFound)
}
// Collect op counters for everyone again.
opc21b, err := getOpCounters("localhost:40021")
c.Assert(err, IsNil)
opc22b, err := getOpCounters("localhost:40022")
c.Assert(err, IsNil)
opc23b, err := getOpCounters("localhost:40023")
c.Assert(err, IsNil)
switch hostPort(master) {
case "40021":
c.Check(opc21b.Query-opc21a.Query, Equals, 0)
c.Check(opc22b.Query-opc22a.Query, Equals, 5)
c.Check(opc23b.Query-opc23a.Query, Equals, 7)
case "40022":
c.Check(opc21b.Query-opc21a.Query, Equals, 5)
c.Check(opc22b.Query-opc22a.Query, Equals, 0)
c.Check(opc23b.Query-opc23a.Query, Equals, 7)
case "40023":
c.Check(opc21b.Query-opc21a.Query, Equals, 5)
c.Check(opc22b.Query-opc22a.Query, Equals, 7)
c.Check(opc23b.Query-opc23a.Query, Equals, 0)
default:
c.Fatal("Uh?")
}
}

View File

@ -1,5 +1,9 @@
package mgo
import (
"time"
)
func HackSocketsPerServer(newLimit int) (restore func()) {
oldLimit := newLimit
restore = func() {
@ -8,3 +12,21 @@ func HackSocketsPerServer(newLimit int) (restore func()) {
socketsPerServer = newLimit
return
}
func HackPingDelay(newDelay time.Duration) (restore func()) {
oldDelay := pingDelay
restore = func() {
pingDelay = oldDelay
}
pingDelay = newDelay
return
}
func HackSyncSocketTimeout(newTimeout time.Duration) (restore func()) {
oldTimeout := syncSocketTimeout
restore = func() {
syncSocketTimeout = oldTimeout
}
syncSocketTimeout = newTimeout
return
}

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -27,12 +27,12 @@
package mgo
import (
"camlistore.org/third_party/labix.org/v2/mgo/bson"
"crypto/md5"
"encoding/hex"
"errors"
"hash"
"io"
"camlistore.org/third_party/labix.org/v2/mgo/bson"
"os"
"sync"
"time"
@ -72,10 +72,10 @@ type GridFile struct {
}
type gfsFile struct {
Id interface{} "_id"
ChunkSize int "chunkSize"
UploadDate time.Time "uploadDate"
Length int64 ",minsize"
Id interface{} "_id"
ChunkSize int "chunkSize"
UploadDate time.Time "uploadDate"
Length int64 ",minsize"
MD5 string
Filename string ",omitempty"
ContentType string "contentType,omitempty"
@ -157,15 +157,15 @@ func (gfs *GridFS) Create(name string) (file *GridFile, err error) {
return
}
// OpenId returns a file with the provided id in case it exists or an error
// instead. If the file isn't found, err will be set to mgo.ErrNotFound.
// OpenId returns the file with the provided id, for reading.
// If the file isn't found, err will be set to mgo.ErrNotFound.
//
// It's important to Close files whether they are being written to
// or read from, and to check the err result to ensure the operation
// completed successfully.
//
// The following example will print the first 8192 bytes from the file:
//
//
// func check(err os.Error) {
// if err != nil {
// panic(err.String())
@ -205,15 +205,16 @@ func (gfs *GridFS) OpenId(id interface{}) (file *GridFile, err error) {
return
}
// Open returns the most recent uploaded file with the provided name, or an
// error instead. If the file isn't found, err will be set to mgo.ErrNotFound.
// Open returns the most recently uploaded file with the provided
// name, for reading. If the file isn't found, err will be set
// to mgo.ErrNotFound.
//
// It's important to Close files whether they are being written to
// or read from, and to check the err result to ensure the operation
// completed successfully.
//
// The following example will print the first 8192 bytes from the file:
//
//
// file, err := db.GridFS("fs").Open("myfile.txt")
// check(err)
// b := make([]byte, 8192)
@ -248,17 +249,17 @@ func (gfs *GridFS) Open(name string) (file *GridFile, err error) {
return
}
// OpenNext opens the next file from iter, sets *file to it, and returns
// true on the success case. If no more documents are available on iter or
// an error occurred, *file is set to nil and the result is false. Errors
// will be available on iter.Err().
// OpenNext opens the next file from iter for reading, sets *file to it,
// and returns true on the success case. If no more documents are available
// on iter or an error occurred, *file is set to nil and the result is false.
// Errors will be available via iter.Err().
//
// The iter parameter must be an iterator on the GridFS files collection.
// Using the GridFS.Find method is an easy way to obtain such an iterator,
// but any iterator on the collection will work.
//
// If the provided *file is non-nil, OpenNext will close it before
// iterating to the next element. This means that in a loop one only
// If the provided *file is non-nil, OpenNext will close it before attempting
// to iterate to the next element. This means that in a loop one only
// has to worry about closing files when breaking out of the loop early
// (break, return, or panic).
//
@ -271,8 +272,8 @@ func (gfs *GridFS) Open(name string) (file *GridFile, err error) {
// for gfs.OpenNext(iter, &f) {
// fmt.Printf("Filename: %s\n", f.Name())
// }
// if iter.Err() != nil {
// panic(iter.Err())
// if iter.Close() != nil {
// panic(iter.Close())
// }
//
func (gfs *GridFS) OpenNext(iter *Iter, file **GridFile) bool {
@ -280,7 +281,7 @@ func (gfs *GridFS) OpenNext(iter *Iter, file **GridFile) bool {
// Ignoring the error here shouldn't be a big deal
// as we're reading the file and the loop iteration
// for this file is finished.
_ = file.Close()
_ = (*file).Close()
}
var doc gfsFile
if !iter.Next(&doc) {
@ -306,7 +307,7 @@ func (gfs *GridFS) OpenNext(iter *Iter, file **GridFile) bool {
//
// files := db.C("fs" + ".files")
// iter := files.Find(nil).Iter()
//
//
func (gfs *GridFS) Find(query interface{}) *Query {
return gfs.Files.Find(query)
}
@ -335,7 +336,7 @@ func (gfs *GridFS) Remove(name string) (err error) {
}
}
if err == nil {
err = iter.Err()
err = iter.Close()
}
return err
}
@ -509,6 +510,8 @@ func (file *GridFile) Close() (err error) {
//
// The file will internally cache the data so that all but the last
// chunk sent to the database have the size defined by SetChunkSize.
// This also means that errors may be deferred until a future call
// to Write or Close.
//
// The parameters and behavior of this function turn the file
// into an io.Writer.

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -27,10 +27,10 @@
package mgo_test
import (
"io"
. "camlistore.org/third_party/launchpad.net/gocheck"
"camlistore.org/third_party/labix.org/v2/mgo"
"camlistore.org/third_party/labix.org/v2/mgo/bson"
. "camlistore.org/third_party/launchpad.net/gocheck"
"io"
"os"
"time"
)
@ -161,7 +161,7 @@ func (s *S) TestGridFSFileDetails(c *C) {
ud := file.UploadDate()
now := time.Now()
c.Assert(ud.Before(now), Equals, true)
c.Assert(ud.After(now.Add(-3 * time.Second)), Equals, true)
c.Assert(ud.After(now.Add(-3*time.Second)), Equals, true)
result := M{}
err = db.C("fs.files").Find(nil).One(result)
@ -177,7 +177,7 @@ func (s *S) TestGridFSFileDetails(c *C) {
"md5": "1e50210a0202497fb79bc38b6ade6c34",
"filename": "myfile2.txt",
"contentType": "text/plain",
"metadata": bson.M{"any": "thing"},
"metadata": M{"any": "thing"},
}
c.Assert(result, DeepEquals, expected)
}
@ -249,7 +249,7 @@ func (s *S) TestGridFSCreateWithChunking(c *C) {
}
break
}
c.Assert(iter.Err(), IsNil)
c.Assert(iter.Close(), IsNil)
result["_id"] = "<id>"
@ -589,7 +589,7 @@ func (s *S) TestGridFSOpenNext(c *C) {
ok = gfs.OpenNext(iter, &f)
c.Assert(ok, Equals, false)
c.Assert(iter.Err(), IsNil)
c.Assert(iter.Close(), IsNil)
c.Assert(f, IsNil)
// Do it again with a more restrictive query to make sure
@ -602,6 +602,6 @@ func (s *S) TestGridFSOpenNext(c *C) {
ok = gfs.OpenNext(iter, &f)
c.Assert(ok, Equals, false)
c.Assert(iter.Err(), IsNil)
c.Assert(iter.Close(), IsNil)
c.Assert(f, IsNil)
}

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -84,5 +84,8 @@ func (q *queue) expand() {
copy(elems[newpopi:], q.elems[q.popi:])
q.popi = newpopi
}
for i := range q.elems {
q.elems[i] = nil // Help GC.
}
q.elems = elems
}

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -27,10 +27,12 @@
package mgo
import (
"camlistore.org/third_party/labix.org/v2/mgo/bson"
"errors"
"net"
"sort"
"sync"
"time"
)
// ---------------------------------------------------------------------------
@ -44,31 +46,50 @@ type mongoServer struct {
unusedSockets []*mongoSocket
liveSockets []*mongoSocket
closed bool
master bool
abended bool
sync chan bool
dial dialer
pingValue time.Duration
pingIndex int
pingCount uint32
pingWindow [6]time.Duration
info *mongoServerInfo
}
func newServer(addr string, sync chan bool) (server *mongoServer, err error) {
tcpaddr, err := net.ResolveTCPAddr("tcp", addr)
if err != nil {
log("Failed to resolve ", addr, ": ", err.Error())
return nil, err
}
type dialer struct {
old func(addr net.Addr) (net.Conn, error)
new func(addr *ServerAddr) (net.Conn, error)
}
resolvedAddr := tcpaddr.String()
if resolvedAddr != addr {
debug("Address ", addr, " resolved as ", resolvedAddr)
}
server = &mongoServer{
func (dial dialer) isSet() bool {
return dial.old != nil || dial.new != nil
}
type mongoServerInfo struct {
Master bool
Mongos bool
Tags bson.D
}
var defaultServerInfo mongoServerInfo
func newServer(addr string, tcpaddr *net.TCPAddr, sync chan bool, dial dialer) *mongoServer {
server := &mongoServer{
Addr: addr,
ResolvedAddr: resolvedAddr,
ResolvedAddr: tcpaddr.String(),
tcpaddr: tcpaddr,
sync: sync,
dial: dial,
info: &defaultServerInfo,
}
return
// Once so the server gets a ping value, then loop in background.
server.pinger(false)
go server.pinger(true)
return server
}
var errSocketLimit = errors.New("per-server connection limit reached")
var errServerClosed = errors.New("server was closed")
// AcquireSocket returns a socket for communicating with the server.
// This will attempt to reuse an old connection, if one is available. Otherwise,
@ -77,56 +98,82 @@ var errSocketLimit = errors.New("per-server connection limit reached")
// the same number of times as AcquireSocket + Acquire were called for it.
// If the limit argument is not zero, a socket will only be returned if the
// number of sockets in use for this server is under the provided limit.
func (server *mongoServer) AcquireSocket(limit int) (socket *mongoSocket, err error) {
func (server *mongoServer) AcquireSocket(limit int, timeout time.Duration) (socket *mongoSocket, abended bool, err error) {
for {
server.Lock()
abended = server.abended
if server.closed {
server.Unlock()
return nil, abended, errServerClosed
}
n := len(server.unusedSockets)
if limit > 0 && len(server.liveSockets)-n >= limit {
server.Unlock()
return nil, errSocketLimit
return nil, false, errSocketLimit
}
if n > 0 {
socket = server.unusedSockets[n-1]
server.unusedSockets[n-1] = nil // Help GC.
server.unusedSockets = server.unusedSockets[:n-1]
info := server.info
server.Unlock()
err = socket.InitialAcquire()
err = socket.InitialAcquire(info, timeout)
if err != nil {
continue
}
} else {
server.Unlock()
socket, err = server.Connect()
socket, err = server.Connect(timeout)
if err == nil {
server.Lock()
// We've waited for the Connect, see if we got
// closed in the meantime
if server.closed {
server.Unlock()
socket.Release()
socket.Close()
return nil, abended, errServerClosed
}
server.liveSockets = append(server.liveSockets, socket)
server.Unlock()
}
}
return
}
panic("unreached")
panic("unreachable")
}
// Connect establishes a new connection to the server. This should
// generally be done through server.AcquireSocket().
func (server *mongoServer) Connect() (*mongoSocket, error) {
func (server *mongoServer) Connect(timeout time.Duration) (*mongoSocket, error) {
server.RLock()
addr := server.Addr
tcpaddr := server.tcpaddr
master := server.master
master := server.info.Master
dial := server.dial
server.RUnlock()
log("Establishing new connection to ", addr, "...")
conn, err := net.DialTCP("tcp", nil, tcpaddr)
logf("Establishing new connection to %s (timeout=%s)...", server.Addr, timeout)
var conn net.Conn
var err error
switch {
case !dial.isSet():
// Cannot do this because it lacks timeout support. :-(
//conn, err = net.DialTCP("tcp", nil, server.tcpaddr)
conn, err = net.DialTimeout("tcp", server.ResolvedAddr, timeout)
case dial.old != nil:
conn, err = dial.old(server.tcpaddr)
case dial.new != nil:
conn, err = dial.new(&ServerAddr{server.Addr, server.tcpaddr})
default:
panic("dialer is set, but both dial.old and dial.new are nil")
}
if err != nil {
log("Connection to ", addr, " failed: ", err.Error())
logf("Connection to %s failed: %v", server.Addr, err.Error())
return nil, err
}
log("Connection to ", addr, " established.")
logf("Connection to %s established.", server.Addr)
stats.conn(+1, master)
return newSocket(server, conn), nil
return newSocket(server, conn, timeout), nil
}
// Close forces closing all sockets that are alive, whether
@ -138,9 +185,8 @@ func (server *mongoServer) Close() {
unusedSockets := server.unusedSockets
server.liveSockets = nil
server.unusedSockets = nil
addr := server.Addr
server.Unlock()
logf("Connections to %s closing (%d live sockets).", addr, len(liveSockets))
logf("Connections to %s closing (%d live sockets).", server.Addr, len(liveSockets))
for i, s := range liveSockets {
s.Close()
liveSockets[i] = nil
@ -176,6 +222,7 @@ func removeSocket(sockets []*mongoSocket, socket *mongoSocket) []*mongoSocket {
// abnormally, and thus should be discarded rather than cached.
func (server *mongoServer) AbendSocket(socket *mongoSocket) {
server.Lock()
server.abended = true
if server.closed {
server.Unlock()
return
@ -190,29 +237,83 @@ func (server *mongoServer) AbendSocket(socket *mongoSocket) {
}
}
// Merge other into server, which must both be communicating with
// the same server address.
func (server *mongoServer) Merge(other *mongoServer) {
func (server *mongoServer) SetInfo(info *mongoServerInfo) {
server.Lock()
server.master = other.master
server.Unlock()
// Sockets of other are ignored for the moment. Merging them
// would mean a large number of sockets being cached on longer
// recovering situations.
other.Close()
}
func (server *mongoServer) SetMaster(isMaster bool) {
server.Lock()
server.master = isMaster
server.info = info
server.Unlock()
}
func (server *mongoServer) IsMaster() bool {
server.RLock()
result := server.master
server.RUnlock()
return result
func (server *mongoServer) Info() *mongoServerInfo {
server.Lock()
info := server.info
server.Unlock()
return info
}
func (server *mongoServer) hasTags(serverTags []bson.D) bool {
NextTagSet:
for _, tags := range serverTags {
NextReqTag:
for _, req := range tags {
for _, has := range server.info.Tags {
if req.Name == has.Name {
if req.Value == has.Value {
continue NextReqTag
}
continue NextTagSet
}
}
continue NextTagSet
}
return true
}
return false
}
var pingDelay = 5 * time.Second
func (server *mongoServer) pinger(loop bool) {
op := queryOp{
collection: "admin.$cmd",
query: bson.D{{"ping", 1}},
flags: flagSlaveOk,
limit: -1,
}
for {
if loop {
time.Sleep(pingDelay)
}
op := op
socket, _, err := server.AcquireSocket(0, 3*pingDelay)
if err == nil {
start := time.Now()
_, _ = socket.SimpleQuery(&op)
delay := time.Now().Sub(start)
server.pingWindow[server.pingIndex] = delay
server.pingIndex = (server.pingIndex + 1) % len(server.pingWindow)
server.pingCount++
var max time.Duration
for i := 0; i < len(server.pingWindow) && uint32(i) < server.pingCount; i++ {
if server.pingWindow[i] > max {
max = server.pingWindow[i]
}
}
socket.Release()
server.Lock()
if server.closed {
loop = false
}
server.pingValue = max
server.Unlock()
logf("Ping for %s is %d ms", server.Addr, max/time.Millisecond)
} else if err == errServerClosed {
return
}
if !loop {
return
}
}
}
type mongoServerSlice []*mongoServer
@ -233,8 +334,7 @@ func (s mongoServerSlice) Sort() {
sort.Sort(s)
}
func (s mongoServerSlice) Search(other *mongoServer) (i int, ok bool) {
resolvedAddr := other.ResolvedAddr
func (s mongoServerSlice) Search(resolvedAddr string) (i int, ok bool) {
n := len(s)
i = sort.Search(n, func(i int) bool {
return s[i].ResolvedAddr >= resolvedAddr
@ -246,8 +346,8 @@ type mongoServers struct {
slice mongoServerSlice
}
func (servers *mongoServers) Search(other *mongoServer) (server *mongoServer) {
if i, ok := servers.slice.Search(other); ok {
func (servers *mongoServers) Search(resolvedAddr string) (server *mongoServer) {
if i, ok := servers.slice.Search(resolvedAddr); ok {
return servers.slice[i]
}
return nil
@ -259,7 +359,7 @@ func (servers *mongoServers) Add(server *mongoServer) {
}
func (servers *mongoServers) Remove(other *mongoServer) (server *mongoServer) {
if i, found := servers.slice.Search(other); found {
if i, found := servers.slice.Search(other.ResolvedAddr); found {
server = servers.slice[i]
copy(servers.slice[i:], servers.slice[i+1:])
n := len(servers.slice) - 1
@ -285,26 +385,31 @@ func (servers *mongoServers) Empty() bool {
return len(servers.slice) == 0
}
// MostAvailable returns the best guess of what would be the
// most interesting server to perform operations on at this
// point in time.
func (servers *mongoServers) MostAvailable() *mongoServer {
if len(servers.slice) == 0 {
panic("MostAvailable: can't be used on empty server list")
}
// BestFit returns the best guess of what would be the most interesting
// server to perform operations on at this point in time.
func (servers *mongoServers) BestFit(serverTags []bson.D) *mongoServer {
var best *mongoServer
for i, next := range servers.slice {
if i == 0 {
for _, next := range servers.slice {
if best == nil {
best = next
best.RLock()
if serverTags != nil && !next.info.Mongos && !best.hasTags(serverTags) {
best.RUnlock()
best = nil
}
continue
}
next.RLock()
swap := false
switch {
case next.master != best.master:
case serverTags != nil && !next.info.Mongos && !next.hasTags(serverTags):
// Must have requested tags.
case next.info.Master != best.info.Master:
// Prefer slaves.
swap = best.master
swap = best.info.Master
case absDuration(next.pingValue-best.pingValue) > 15*time.Millisecond:
// Prefer nearest server.
swap = next.pingValue < best.pingValue
case len(next.liveSockets)-len(next.unusedSockets) < len(best.liveSockets)-len(best.unusedSockets):
// Prefer servers with less connections.
swap = true
@ -316,6 +421,15 @@ func (servers *mongoServers) MostAvailable() *mongoServer {
next.RUnlock()
}
}
best.RUnlock()
if best != nil {
best.RUnlock()
}
return best
}
func absDuration(d time.Duration) time.Duration {
if d < 0 {
return -d
}
return d
}

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -27,10 +27,11 @@
package mgo
import (
"errors"
"camlistore.org/third_party/labix.org/v2/mgo/bson"
"errors"
"net"
"sync"
"time"
)
type replyFunc func(err error, reply *replyOp, docNum int, docData []byte)
@ -38,7 +39,8 @@ type replyFunc func(err error, reply *replyOp, docNum int, docData []byte)
type mongoSocket struct {
sync.Mutex
server *mongoServer // nil when cached
conn *net.TCPConn
conn net.Conn
timeout time.Duration
addr string // For debugging only.
nextRequestId uint32
replyFuncs map[uint32]replyFunc
@ -48,16 +50,59 @@ type mongoSocket struct {
cachedNonce string
gotNonce sync.Cond
dead error
serverInfo *mongoServerInfo
}
type queryOpFlags uint32
const (
_ queryOpFlags = 1 << iota
flagTailable
flagSlaveOk
flagLogReplay
flagNoCursorTimeout
flagAwaitData
)
type queryOp struct {
collection string
query interface{}
skip int32
limit int32
selector interface{}
flags uint32
flags queryOpFlags
replyFunc replyFunc
options queryWrapper
hasOptions bool
serverTags []bson.D
}
type queryWrapper struct {
Query interface{} "$query"
OrderBy interface{} "$orderby,omitempty"
Hint interface{} "$hint,omitempty"
Explain bool "$explain,omitempty"
Snapshot bool "$snapshot,omitempty"
ReadPreference bson.D "$readPreference,omitempty"
}
func (op *queryOp) finalQuery(socket *mongoSocket) interface{} {
if op.flags&flagSlaveOk != 0 && len(op.serverTags) > 0 && socket.ServerInfo().Mongos {
op.hasOptions = true
op.options.ReadPreference = bson.D{{"mode", "secondaryPreferred"}, {"tags", op.serverTags}}
}
if op.hasOptions {
if op.query == nil {
var empty bson.D
op.options.Query = empty
} else {
op.options.Query = op.query
}
debugf("final query is %#v\n", &op.options)
return &op.options
}
return op.query
}
type getMoreOp struct {
@ -92,17 +137,24 @@ type deleteOp struct {
flags uint32
}
type killCursorsOp struct {
cursorIds []int64
}
type requestInfo struct {
bufferPos int
replyFunc replyFunc
}
func newSocket(server *mongoServer, conn *net.TCPConn) *mongoSocket {
socket := &mongoSocket{conn: conn, addr: server.Addr}
func newSocket(server *mongoServer, conn net.Conn, timeout time.Duration) *mongoSocket {
socket := &mongoSocket{
conn: conn,
addr: server.Addr,
server: server,
replyFuncs: make(map[uint32]replyFunc),
}
socket.gotNonce.L = &socket.Mutex
socket.replyFuncs = make(map[uint32]replyFunc)
socket.server = server
if err := socket.InitialAcquire(); err != nil {
if err := socket.InitialAcquire(server.Info(), timeout); err != nil {
panic("newSocket: InitialAcquire returned error: " + err.Error())
}
stats.socketsAlive(+1)
@ -112,10 +164,28 @@ func newSocket(server *mongoServer, conn *net.TCPConn) *mongoSocket {
return socket
}
// Server returns the server that the socket is associated with.
// It returns nil while the socket is cached in its respective server.
func (socket *mongoSocket) Server() *mongoServer {
socket.Lock()
server := socket.server
socket.Unlock()
return server
}
// ServerInfo returns details for the server at the time the socket
// was initially acquired.
func (socket *mongoSocket) ServerInfo() *mongoServerInfo {
socket.Lock()
serverInfo := socket.serverInfo
socket.Unlock()
return serverInfo
}
// InitialAcquire obtains the first reference to the socket, either
// right after the connection is made or once a recycled socket is
// being put back in use.
func (socket *mongoSocket) InitialAcquire() error {
func (socket *mongoSocket) InitialAcquire(serverInfo *mongoServerInfo, timeout time.Duration) error {
socket.Lock()
if socket.references > 0 {
panic("Socket acquired out of cache with references")
@ -125,6 +195,8 @@ func (socket *mongoSocket) InitialAcquire() error {
return socket.dead
}
socket.references++
socket.serverInfo = serverInfo
socket.timeout = timeout
stats.socketsInUse(+1)
stats.socketRefs(+1)
socket.Unlock()
@ -134,20 +206,18 @@ func (socket *mongoSocket) InitialAcquire() error {
// Acquire obtains an additional reference to the socket.
// The socket will only be recycled when it's released as many
// times as it's been acquired.
func (socket *mongoSocket) Acquire() (isMaster bool) {
func (socket *mongoSocket) Acquire() (info *mongoServerInfo) {
socket.Lock()
if socket.references == 0 {
panic("Socket got non-initial acquire with references == 0")
}
socket.references++
stats.socketRefs(+1)
// We'll track references to dead sockets as well.
// Caller is still supposed to release the socket.
if socket.dead == nil {
isMaster = socket.server.IsMaster()
}
socket.references++
stats.socketRefs(+1)
serverInfo := socket.serverInfo
socket.Unlock()
return isMaster
return serverInfo
}
// Release decrements a socket reference. The socket will be
@ -173,6 +243,42 @@ func (socket *mongoSocket) Release() {
}
}
// SetTimeout changes the timeout used on socket operations.
func (socket *mongoSocket) SetTimeout(d time.Duration) {
socket.Lock()
socket.timeout = d
socket.Unlock()
}
type deadlineType int
const (
readDeadline deadlineType = 1
writeDeadline deadlineType = 2
)
func (socket *mongoSocket) updateDeadline(which deadlineType) {
var when time.Time
if socket.timeout > 0 {
when = time.Now().Add(socket.timeout)
}
whichstr := ""
switch which {
case readDeadline | writeDeadline:
whichstr = "read/write"
socket.conn.SetDeadline(when)
case readDeadline:
whichstr = "read"
socket.conn.SetReadDeadline(when)
case writeDeadline:
whichstr = "write"
socket.conn.SetWriteDeadline(when)
default:
panic("invalid parameter to updateDeadline")
}
debugf("Socket %p to %s: updated %s deadline to %s ahead (%s)", socket, socket.addr, whichstr, socket.timeout, when)
}
// Close terminates the socket use.
func (socket *mongoSocket) Close() {
socket.kill(errors.New("Closed explicitly"), false)
@ -279,7 +385,7 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) {
buf = addCString(buf, op.collection)
buf = addInt32(buf, op.skip)
buf = addInt32(buf, op.limit)
buf, err = addBSON(buf, op.query)
buf, err = addBSON(buf, op.finalQuery(socket))
if err != nil {
return err
}
@ -310,6 +416,14 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) {
return err
}
case *killCursorsOp:
buf = addHeader(buf, 2007)
buf = addInt32(buf, 0) // Reserved
buf = addInt32(buf, int32(len(op.cursorIds)))
for _, cursorId := range op.cursorIds {
buf = addInt64(buf, cursorId)
}
default:
panic("Internal error: unknown operation type")
}
@ -329,7 +443,7 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) {
socket.Lock()
if socket.dead != nil {
socket.Unlock()
debug("Socket %p to %s: failing query, already closed: %s", socket, socket.addr, socket.dead.Error())
debugf("Socket %p to %s: failing query, already closed: %s", socket, socket.addr, socket.dead.Error())
// XXX This seems necessary in case the session is closed concurrently
// with a query being performed, but it's not yet tested:
for i := 0; i != requestCount; i++ {
@ -341,6 +455,8 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) {
return socket.dead
}
wasWaiting := len(socket.replyFuncs) > 0
// Reserve id 0 for requests which should have no responses.
requestId := socket.nextRequestId + 1
if requestId == 0 {
@ -357,12 +473,16 @@ func (socket *mongoSocket) Query(ops ...interface{}) (err error) {
debugf("Socket %p to %s: sending %d op(s) (%d bytes)", socket, socket.addr, len(ops), len(buf))
stats.sentOps(len(ops))
socket.updateDeadline(writeDeadline)
_, err = socket.conn.Write(buf)
if !wasWaiting && requestCount > 0 {
socket.updateDeadline(readDeadline)
}
socket.Unlock()
return err
}
func fill(r *net.TCPConn, b []byte) error {
func fill(r net.Conn, b []byte) error {
l := len(b)
n, err := r.Read(b)
for n != l && err == nil {
@ -460,6 +580,12 @@ func (socket *mongoSocket) readLoop() {
if replyFuncFound {
delete(socket.replyFuncs, uint32(responseTo))
}
if len(socket.replyFuncs) == 0 {
// Nothing else to read for now. Disable deadline.
socket.conn.SetReadDeadline(time.Time{})
} else {
socket.updateDeadline(readDeadline)
}
socket.Unlock()
// XXX Do bound checking against totalLen.

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -58,6 +58,7 @@ func ResetStats() {
old := stats
stats = &Stats{}
// These are absolute values:
stats.Clusters = old.Clusters
stats.SocketsInUse = old.SocketsInUse
stats.SocketsAlive = old.SocketsAlive
stats.SocketRefs = old.SocketRefs
@ -66,6 +67,7 @@ func ResetStats() {
}
type Stats struct {
Clusters int
MasterConns int
SlaveConns int
SentOps int
@ -76,6 +78,14 @@ type Stats struct {
SocketRefs int
}
func (stats *Stats) cluster(delta int) {
if stats != nil {
statsMutex.Lock()
stats.Clusters += delta
statsMutex.Unlock()
}
}
func (stats *Stats) conn(delta int, master bool) {
if stats != nil {
statsMutex.Lock()

View File

@ -1,18 +1,18 @@
// mgo - MongoDB driver for Go
//
//
// Copyright (c) 2010-2012 - Gustavo Niemeyer <gustavo@niemeyer.net>
//
//
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// modification, are permitted provided that the following conditions are met:
//
// 1. Redistributions of source code must retain the above copyright notice, this
// list of conditions and the following disclaimer.
// list of conditions and the following disclaimer.
// 2. Redistributions in binary form must reproduce the above copyright notice,
// this list of conditions and the following disclaimer in the documentation
// and/or other materials provided with the distribution.
//
// and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
@ -27,15 +27,17 @@
package mgo_test
import (
"camlistore.org/third_party/labix.org/v2/mgo"
"camlistore.org/third_party/labix.org/v2/mgo/bson"
. "camlistore.org/third_party/launchpad.net/gocheck"
"errors"
"flag"
"fmt"
. "camlistore.org/third_party/launchpad.net/gocheck"
"camlistore.org/third_party/labix.org/v2/mgo"
"camlistore.org/third_party/labix.org/v2/mgo/bson"
"net"
"os/exec"
"strconv"
"syscall"
"strings"
"testing"
"time"
)
@ -60,6 +62,20 @@ func TestAll(t *testing.T) {
type S struct {
session *mgo.Session
stopped bool
build mgo.BuildInfo
frozen []string
}
func (s *S) versionAtLeast(v ...int) bool {
for i := range v {
if i == len(s.build.VersionArray) {
return false
}
if s.build.VersionArray[i] < v[i] {
return false
}
}
return true
}
var _ = Suite(&S{})
@ -68,6 +84,12 @@ func (s *S) SetUpSuite(c *C) {
mgo.SetDebug(true)
mgo.SetStats(true)
s.StartAll()
session, err := mgo.Dial("localhost:40001")
c.Assert(err, IsNil)
s.build, err = session.BuildInfo()
c.Check(err, IsNil)
session.Close()
}
func (s *S) SetUpTest(c *C) {
@ -83,8 +105,14 @@ func (s *S) TearDownTest(c *C) {
if s.stopped {
s.StartAll()
}
for _, host := range s.frozen {
if host != "" {
s.Thaw(host)
}
}
var stats mgo.Stats
for i := 0; ; i++ {
stats := mgo.GetStats()
stats = mgo.GetStats()
if stats.SocketsInUse == 0 && stats.SocketsAlive == 0 {
break
}
@ -92,24 +120,70 @@ func (s *S) TearDownTest(c *C) {
c.Fatal("Test left sockets in a dirty state")
}
c.Logf("Waiting for sockets to die: %d in use, %d alive", stats.SocketsInUse, stats.SocketsAlive)
time.Sleep(5e8)
time.Sleep(500 * time.Millisecond)
}
for i := 0; ; i++ {
stats = mgo.GetStats()
if stats.Clusters == 0 {
break
}
if i == 60 {
c.Fatal("Test left clusters alive")
}
c.Logf("Waiting for clusters to die: %d alive", stats.Clusters)
time.Sleep(1 * time.Second)
}
}
func (s *S) Stop(host string) {
// Give a moment for slaves to sync and avoid getting rollback issues.
time.Sleep(2 * time.Second)
err := run("cd _testdb && supervisorctl stop " + supvName(host))
if err != nil {
panic(err.Error())
panic(err)
}
s.stopped = true
}
func (s *S) pid(host string) int {
output, err := exec.Command("lsof", "-iTCP:"+hostPort(host), "-sTCP:LISTEN", "-Fp").CombinedOutput()
if err != nil {
panic(err)
}
pidstr := string(output[1 : len(output)-1])
pid, err := strconv.Atoi(pidstr)
if err != nil {
panic("cannot convert pid to int: " + pidstr)
}
return pid
}
func (s *S) Freeze(host string) {
err := syscall.Kill(s.pid(host), syscall.SIGSTOP)
if err != nil {
panic(err)
}
s.frozen = append(s.frozen, host)
}
func (s *S) Thaw(host string) {
err := syscall.Kill(s.pid(host), syscall.SIGCONT)
if err != nil {
panic(err)
}
for i, frozen := range s.frozen {
if frozen == host {
s.frozen[i] = ""
}
}
}
func (s *S) StartAll() {
// Restart any stopped nodes.
run("cd _testdb && supervisorctl start all")
err := run("cd testdb && mongo --nodb wait.js")
if err != nil {
panic(err.Error())
panic(err)
}
s.stopped = false
}
@ -123,29 +197,44 @@ func run(command string) error {
return nil
}
var supvNames = map[string]string{
"40001": "db1",
"40002": "db2",
"40011": "rs1a",
"40012": "rs1b",
"40013": "rs1c",
"40021": "rs2a",
"40022": "rs2b",
"40023": "rs2c",
"40031": "rs3a",
"40032": "rs3b",
"40033": "rs3c",
"40041": "rs4a",
"40101": "cfg1",
"40102": "cfg2",
"40103": "cfg3",
"40201": "s1",
"40202": "s2",
"40203": "s3",
}
// supvName returns the supervisord name for the given host address.
func supvName(host string) string {
switch {
case strings.HasSuffix(host, ":40001"):
return "db1"
case strings.HasSuffix(host, ":40011"):
return "rs1a"
case strings.HasSuffix(host, ":40012"):
return "rs1b"
case strings.HasSuffix(host, ":40013"):
return "rs1c"
case strings.HasSuffix(host, ":40021"):
return "rs2a"
case strings.HasSuffix(host, ":40022"):
return "rs2b"
case strings.HasSuffix(host, ":40023"):
return "rs2c"
case strings.HasSuffix(host, ":40101"):
return "cfg1"
case strings.HasSuffix(host, ":40201"):
return "s1"
case strings.HasSuffix(host, ":40202"):
return "s2"
host, port, err := net.SplitHostPort(host)
if err != nil {
panic(err)
}
panic("Unknown host: " + host)
name, ok := supvNames[port]
if !ok {
panic("Unknown host: " + host)
}
return name
}
func hostPort(host string) string {
_, port, err := net.SplitHostPort(host)
if err != nil {
panic(err)
}
return port
}