创建main

创建文件main.go并导入bson,mongo和mongo/options包:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
    "context"
    "fmt"
    "log"

    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
)

// You will be using this Trainer type later in the program
type Trainer struct {
    Name string
    Age  int
    City string
}

func main() {
    // Rest of the code will go here
}

此代码还导入一些标准库并定义Trainer类型。您将在本教程的后面部分使用它们。

Go类型与bson的映射

1
2
3
4
5
6
7
8
9
type Post struct {
    ID        primitive.ObjectID `"_id"`
    Title     string             `bson:"title"`
    Body      string
    Tags      []string           `bson:"tags"json:"tags"`
    Comments  uint64             `bson:"comments,omitempty"`
    CreatedAt time.Time          `bson:"created_at"`
    UpdatedAt time.Time          `bson:"updated_at"`
}

有三种规则:

  1. 什么都不写,就像上面的body一样,这种的话就默认属性名全小写作为key。
  2. 第二种就是像ID一样简写bson映射。
  3. 第三种是像Tags一样完整的写出来,这样的话可以同时做bson和json的映射。

Key规则:

必须作为第一个参数,但是可以不写。如果这个字段不想映射到bson的话,可以这样bson:"-"

其它参数:

  • omitempty —— 允许值为空,比如说在ID中使用后,MongoDB会自动为_id生成ObjectId
  • minsize —— 如果在保留数值的情况下可行,将MongoDB大于32位的整数值解析为int32。
  • truncate —— 如果在损失精度的情况下,将MongoDB的double解析为float32。
  • inline —— 可以让内联struct和外联效果一样

鉴于我们Post之前定义的结构,让我们创建另一个文档:

1
2
3
4
5
6
7
8
_, err := col.InsertOne(ctx, &Post{
    ID:        primitive.NewObjectID(),
    Title:     "post",
    tags:
-    []string{"mongodb"},
    Body:      `blog post`,
    CreatedAt: time.Now(),
})

我们得到以下内容:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
    "_id" : ObjectId("5c71f03ccfee587e4212ad90"),
    "title" : "post",
    "body" : "blog post",
    "tags" : [
        "mongodb"
    ],
    "comments" : NumberLong(0),
    "created_at" : ISODate("2019-02-24T01:15:40.329Z"),
    "updated_at" : null
}

如您所见,驱动程序将创建一个文档,其中包含结构中定义的所有字段,甚至是我们未传递任何值的字段。请记住,您必须使用该primitive.NewObjectID()函数来创建新的ObjectID。它还将保留结构中定义的属性的顺序。

使用Go驱动程序连接到MongoDB

导入MongoDB Go驱动程序后,您可以使用该mongo.Connect()功能连接到MongoDB部署。您必须将上下文和options.ClientOptions对象传递给mongo.Connect()。客户端选项用于设置连接字符串。它还可用于配置驱动程序设置,如写入问题,套接字超时等。选项包文档提供了有关可用客户机选项的更多信息。

在main函数中添加以下代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
// Set client options
clientOptions := options.Client().ApplyURI("mongodb://localhost:27017")

// Connect to MongoDB
client, err := mongo.Connect(context.TODO(), clientOptions)

if err != nil {
    log.Fatal(err)
}

// Check the connection
err = client.Ping(context.TODO(), nil)

if err != nil {
    log.Fatal(err)
}

fmt.Println("Connected to MongoDB!")

连接方式还有一种

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
MongoClient, err := mongo.NewClient(&options.ClientOptions{
	Auth: &options.Credential{
		AuthSource: Authsource,
		Username:   Username,
		Password:   Password,
	},
	Hosts: Hosts,
})
if err != nil {
	log.Fatal("MongoDB初始化失败", err)
	return
}
err = MongoClient.Connect(context.TODO())
if err != nil {
	log.Fatal(err)
	return
}

连接后,您现在可以通过在main函数末尾添加以下代码行来trainers获取test数据库中集合的句柄:

1
collection := client.Database("test").Collection("trainers")

以下代码将使用此集合句柄来查询trainers集合。

最佳做法是保持连接到MongoDB的客户端,以便应用程序可以使用连接池 - 您不希望为每个查询打开和关闭连接。但是,如果您的应用程序不再需要连接,则可以使用如下方式关闭连接client.Disconnect():

1
2
3
4
5
6
err = client.Disconnect(context.TODO())

if err != nil {
    log.Fatal(err)
}
fmt.Println("Connection to MongoDB closed.")

mongodb的连接也可以进行设置

1
2
3
4
5
6
opt.SetLocalThreshold(3 * time.Second)     //只使用与mongo操作耗时小于3秒的
opt.SetMaxConnIdleTime(5 * time.Second)    //指定连接可以保持空闲的最大毫秒数
opt.SetMaxPoolSize(200)                    //使用最大的连接数
opt.SetReadPreference(want)                //表示只使用辅助节点
opt.SetReadConcern(readconcern.Majority()) //指定查询应返回实例的最新数据确认为,已入副本集中的大多数成员
opt.SetWriteConcern(wc)                    //请求确认写操作传播到大多数mongod实例

在Go中使用BSON对象

在我们开始向数据库发送查询之前,了解Go Driver如何与BSON对象一起工作非常重要。MongoDB中的JSON文档存储在名为BSON(二进制编码的JSON)的二进制表示中。与将JSON数据存储为简单字符串和数字的其他数据库不同,BSON编码扩展了JSON表示以包括其他类型,如int,long,date,floating point和decimal128。这使应用程序更容易可靠地处理,排序和比较数据。

Go Driver有两类用于表示BSON数据的类型:D类型和Raw类型。

D系列

D系列类型用于使用本地Go类型简洁地构建BSON对象。 这对于构造传递给MongoDB的命令特别有用。

D系列包括四种类型:

D

D表示BSON文档。此类型可用于以简洁和可读的方式表示BSON

D通常应在序列化为BSON时使用。对于反序列化,应该使用Raw或D类型。

用法示例:

1
2
3
4
5
6
_, err := col.InsertOne(ctx, bson.D{
    {"title", "post 2"},
    {"tags", []string{"mongodb"}},
    {"body", `blog post 2`},
    {"created_at", time.Now()},
})

此操作将创建具有有序元素的文档,例如:

1
2
3
4
5
6
7
8
9
{
    "_id" : ObjectId("5c71f03ccfee587e4212ad8f"),
    "title" : "post 2",
    "tags" : [
        "mongodb"
    ],
    "body" : "blog post 2",
    "created_at" : ISODate("2019-02-24T01:15:40.328Z")
}

此类型应在顺序重要的情况下使用,例如MongoDB命令。如果顺序并不重要,map更舒适,更简洁

E

E代表D的BSON元素。它通常用在D中。

M

M是BSON文档的无序,简洁表示。它通常应该用于当BSON文档的元素顺序无关紧要时序列化BSON。如果元素的顺序重要,请改用D代替。

用法示例:

1
2
3
4
5
6
_, err := col.InsertOne(ctx, bson.M{
    "title":      "post",
    "tags":       []string{"mongodb"},
    "body":       `blog post`,
    "created_at": time.Now(),
})

上面的操作将创建一个包含无序元素的文档,例如:

1
2
3
4
5
6
7
8
9
{
    "_id" : ObjectId("5c71f03ccfee587e4212ad8e"),
    "body" : "blog post",
    "created_at" : ISODate("2019-02-24T01:15:40.326Z"),
    "title" : "post",
    "tags" : [
        "mongodb"
    ]
}

请注意,元素的顺序与提供的顺序不同bson.M。

此类型在编码器中作为常规map[string]interface{}处理。元素将是以未定义的随机顺序序列化,每次顺序都不同。

A

A表示BSON数组。这种类型可以用来简洁地表示BSON数组.A通常应在序列化为BSON时使用。对于反序列化,应该使用RawArray或Array类型。

用法示例:

1
bson.A{"bar", "world", 3.14159, primitive.D{{"qux", 12345}}}

Raw类型

Raw类型用于byte切片。您还可以使用欧路词典从Raw类型中取回单独的元素。如果您不希望花费频繁将BSON解码为另一种类型的开销,这将非常有用。本教程仅使用D系列类型。

结构体替代bson

除了传入bson,还可以传入带tag的结构体作为bson

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23

type BeforeCond struct {
	Before int64 `bson:"$lt"`
}

type DeleteCond struct {
	BeforeCond BeforeCond `bson:"timePoint.startTime"`
}

delCond = &DeleteCond{
	BeforeCond: BeforeCond{
		Before: time.Now().Unix(),
	},
}


type FindByJobName struct {
	JobName string `bson:"jobName"`
}

cond = &FindByJobName{
	JobName: "hello",
}

bson转结构体

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
//bson转结构体,需要结构体打上bson标签
//a:=bson.M{}
//x:=A{}
//Bson2Odj(a,&x)
func Bson2Odj(val interface{}, obj interface{}) error {
	//bson转[]byte
	data, err := bson.Marshal(val)
	if err != nil {
		return err
	}
	//[]byte转结构体
	err = bson.Unmarshal(data, obj)
	if err != nil {
		return err
	}
	return nil
}

mongo Shell 转bson

shell语句 转 bson

1
2
3
4
5
a:=`$and : [{"id" : "asdfzcv"}, {"num" : 6}]`
	//bb:=bson.M{"id": "asdfzcv", "num": 6}
	bb:=bson.M{}
	_=bson.Unmarshal([]byte(a),&bb)
	//bb=bson.M{"id": "asdfzcv", "num": 6}
1
2
var update bson.M
	json.Unmarshal([]byte(`{ "$set": {"year": 1998}}`), &update)

shell语句组 转 []bson

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
	toConvert := `[
        {
            "$project": {
                "_id": 0,
                "A": "$$ROOT"
            }
        },
        {
            "$lookup": {
                "localField": "A.TypeId",
                "from": "RoleType",
                "foreignField": "_id",
                "as": "B"
            }
        },
        {
            "$unwind": {
                "path": "$B",
                "preserveNullAndEmptyArrays": true
            }
        },
        {
            "$project": {
                "A.RoleId": "$A.RoleId",
                "A.RoleName": "$A.RoleName",
                "A.Valid": "$A.Valid",
                "B.TypeName": "$B.TypeName"
            }
        }
    ]`
pipeLine := mongo.Pipeline{}
	err := bson.UnmarshalExtJSON([]byte(toConvert), true, &pipeLine)

CRUD操作

连接到数据库后,就可以开始添加和操作某些数据了。该Collection类型有几种方法,允许您向数据库发送查询。

插入文件

InsertOne

要插入单个文档,请使用以下collection.InsertOne()方法:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
// create a new context with a 10 second timeout
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// insert one document
res, err := col.InsertOne(ctx, bson.M{
	"title": "Go mongodb driver cookbook",
	"tags":  []string{"golang", "mongodb"},
	"body": `this is a long post
    that goes on and on
    and have many lines`,
	"comments":   1,
	"created_at": time.Now(),
})
if err != nil {
	log.Fatal(err)
}
fmt.Printf(
	"new post created with id: %s",
	res.InsertedID.(primitive.ObjectID).Hex(),
)
// => new post created with id: 5c71caf32a346553363177ce

也可以不使用bson.M{}来传递参数,而是使用包含bson标签例如`bson:"_id"`的结构体,比如:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
type User struct {
	ID    primitive.ObjectID 	`bson:"_id,omitempty"`
	Name  string      			`bson:"dbname",json:"jsonname"`
	Phone string
	ASD   string
}

// Insert one
result, err := userColl.InsertOne(ctx, User{Name: "UserName", Phone:"1234567890"})
if err == nil {
	log.Println(result.InsertedID)
} else {
	log.Fatal(err)
}
// Insert many
{
	users := []interface{}{
		User{Name: "UserName_0", Phone: "123"},
		User{Name: "UserName_1", Phone: "456"},
		User{Name: "UserName_2", Phone: "789"},
	}
	if result, err := userColl.InsertMany(ctx, users); err == nil {
		log.Println(result)
	} else {
		log.Fatal(err)
	}
}

InsertMany

要一次插入多个文档,该collection.InsertMany()方法将采用一些对象:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
res, err := col.InsertMany(ctx, []interface{}{
	bson.M{
		"title":      "Post one",
		"tags":       []string{"golang"},
		"body":       "post one body",
		"comments":   14,
		"created_at": time.Date(2019, time.January, 10, 15, 30, 0, 0, time.UTC),
	},
	bson.M{
		"title":      "Post two",
		"tags":       []string{"nodejs"},
		"body":       "post two body",
		"comments":   2,
		"created_at": time.Now(),
	},
})
if err != nil {
	log.Fatal(err)
}
fmt.Printf("inserted ids: %v\n", res.InsertedIDs)
// => inserted ids: [ObjectID("5c71ce5c6e6d43eb6e2e93be") ObjectID("5c71ce5c6e6d43eb6e2e93bf")]

更新文件

update

UpdateOne

该collection.UpdateOne()方法允许您更新单个文档。它需要一个过滤器文档来匹配数据库中的文档和一个更新文档来描述更新操作。update需要传入bson命令

您可以使用以下bson.D类型构建这些:

这里我们还从其十六进制字符串表示中创建一个新的ObjectID:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
// create ObjectID from string
id, err := primitive.ObjectIDFromHex("5c71ce5c6e6d43eb6e2e93be")
if err != nil {
	log.Fatal(err)
}

// set filters and updates
filter := bson.M{"_id": id}
update := bson.M{"$set": bson.M{"title": "post 2 (two)"}}

// update document
res, err := col.UpdateOne(ctx, filter, update)
if err != nil {
	log.Fatal(err)
}
fmt.Printf("modified count: %d\n", res.ModifiedCount)
// => modified count: 1

UpdateMany

要更新多个文档(相当于{multi: true}),请使用collection.UpdateMany()

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
// set filters and updates
filter := bson.M{"tags": bson.M{"$elemMatch": bson.M{"$eq": "golang"}}}
update := bson.M{"$set": bson.M{"comments": 0, "updated_at": time.Now()}}

// update documents
res, err := col.UpdateMany(ctx, filter, update)
if err != nil {
	log.Fatal(err)
}
fmt.Printf("modified count: %d\n", res.ModifiedCount)
// => modified count: 17

upsert

对于两者而言UpdateOne,UpdateMany您可以通过将适当的选项传递给函数来使其成为upsert操作:

1
2
3
4
5
6
7
res, err := col.UpdateOne(
	ctx, filter, update, options.Update().SetUpsert(true)
)

res, err := col.UpdateMany(
	ctx, filter, update, options.Update().SetUpsert(true)
)

replace

replaceone

Update是用来直接修改数据库的,ReplaceOne则是整个替换。 可以用bson传入要替换的值

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
var err error
var client *mongo.Client
var collection *mongo.Collection
var ctx = context.Background()
var doc = bson.M{"_id": primitive.NewObjectID(), "hometown": "Atlanta"}
var result *mongo.UpdateResult
client = getMongoClient()
defer client.Disconnect(ctx)
collection = client.Database(dbName).Collection(collectionExamples)
if _, err = collection.InsertOne(ctx, doc); err != nil {
	t.Fatal(err)
}
doc["year"] = 1998
if result, err = collection.ReplaceOne(ctx, bson.M{"_id": doc["_id"]}, doc); err != nil {
	t.Fatal(err)
}

replacement也可以直接传入结构体进行更新

1
2
3
4
5
6
7
8
9
// Replace one
{
	user := User{Name: "UserName_2_replaced", Phone: "789"}
	if result, err := userColl.ReplaceOne(ctx, bson.M{"phone": "789"}, user); err == nil {
		log.Println(result)
	} else {
		log.Fatal(err)
	}
}

resert

也可以通过将适当的选项传递给函数来使replace其成为upsert操作:

1
2
3
4
5
6
7
res, err := col.ReplaceOne(
	ctx, filter, update, options.Replace().SetUpsert(true)
)

res, err := col.ReplaceMany(
	ctx, filter, update, options.Replace().SetUpsert(true)v
)

查询与删改换结合

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
//查询单条数据后删除该数据
if err = collection.FindOneAndDelete(getContext(), bson.D{{"name", "howie_3"}}).Decode(&howie); err != nil {
	checkErr(err)
}
fmt.Printf("FindOneAndDelete查询到的数据:%v\n", howie)
//查询单条数据后修改该数据
if err = collection.FindOneAndUpdate(getContext(), bson.D{{"name", "howie_4"}},bson.M{"$set": bson.M{"name": "这条数据我需要修改了"}}).Decode(&howie); err != nil{
	checkErr(err)
}
fmt.Printf("FindOneAndUpdate查询到的数据:%v\n", howie)

//查询单条数据后替换该数据(以前的数据全部清空)
if err = collection.FindOneAndReplace(getContext(), bson.D{{"name", "howie_5"}} bson.M{"hero": "这条数据我替换了"}).Decode(&howie); err != nil {
	checkErr(err)
}

查找文件

FindOne

要查找文档,您需要一个过滤器文档以及一个指向可以解码结果的值的指针。要查找单个文档,请使用collection.FindOne()。此方法返回单个结果,可将其解码为值。您将使用filter在更新查询中使用的相同变量来匹配名称为Ash的文档。

1
2
3
4
5
6
7
8
9
// create a value into which the result can be decoded
var result Trainer

err = collection.FindOne(context.TODO(), filter).Decode(&result)
if err != nil {
    log.Fatal(err)
}

fmt.Printf("Found a single document: %+v\n", result)

Decode()函数也可以传参bson

1
2
var result bson.M
err := table.FindOne(ctx, filter).Decode(&result)

Find

要查找多个文档,请使用collection.Find()。这个方法返回一个Cursor。A Cursor提供了一个文档流,您可以通过它们一次迭代和解码一个文档。一旦Cursor用尽,你应该关闭Cursor。在这里,您还将使用options包设置一些操作选项。具体来说,您将设置限制,因此只返回2个文档。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// Pass these options to the Find method
findOptions := options.Find()
findOptions.SetLimit(2)

// Here's an array in which you can store the decoded documents
var results []*Trainer

// Passing bson.D{{}} as the filter matches all documents in the collection
cur, err := collection.Find(context.TODO(), bson.D{{}}, findOptions)
if err != nil {
    log.Fatal(err)
}

// Finding multiple documents returns a cursor
// Iterating through the cursor allows us to decode documents one at a time
for cur.Next(context.TODO()) {

    // create a value into which the single document can be decoded
    var elem Trainer
    err := cur.Decode(&elem)
    if err != nil {
        log.Fatal(err)
    }

    results = append(results, &elem)
}

if err := cur.Err(); err != nil {
    log.Fatal(err)
}

// Close the cursor once finished
cur.Close(context.TODO())

fmt.Printf("Found multiple documents (array of pointers): %+v\n", results)

Find也可以返回bson,如下:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
func (o *MongoUtils) FindMore(col string, filter bson.M) ([]bson.M, error){
	if o.Db == nil || o.Con == nil{
		return nil, fmt.Errorf("没有初始化连接和数据库信息!")
	}
	table := o.Db.Collection(col)
	ctx,_ := context.WithTimeout(context.Background(), 5*time.Second)
	ctx, _ = context.WithTimeout(context.Background(), 5*time.Second)
	cur, err2 := table.Find(ctx, filter)
	if err2 != nil {
		fmt.Print(err2)
		return nil, err2
	}
	defer cur.Close(ctx)
	var resultArr []bson.M
	for cur.Next(ctx){
		var result bson.M
		err3 := cur.Decode(&result)
		if err3 != nil {
			return nil, err3
		}
		resultArr = append(resultArr, result)
	}
	return resultArr, nil
}

有序find

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//一次查询多条数据
// 查询createtime>=3
// 限制取2条
// createtime从大到小排序的数据
if cursor, err = collection.Find(getContext(), bson.M{"createtime": bson.M{"$gte": 2}}, options.Find().SetLimit(2), options.Find().SetSort(bson.M{"createtime": -1})); err!= nil {
	checkErr(err)
}

//多个options.Find()可以用链式表达,比如:
//findOptions := options.Find().SetSort(SORT).SetLimit(Limit).SetSkip(Skip)
//按选项查询集合 Skip 跳过 Limit 读取数量 sort

if err = cursor.Err(); err != nil {
	checkErr(err)
}
defer cursor.Close(context.Background())
for cursor.Next(context.Background()) {
	if err = cursor.Decode(&howie); err != nil {
		checkErr(err)
	}
	howieArrayEmpty = append(howieArrayEmpty, howie)
}
for _, v := range howieArrayEmpty {
	fmt.Printf("Find查询到的数据ObejectId值%s 值:%v\n", v.HowieId.Hex(), v)
}

distinct

1
2
3
4
5
6
//返回满足filter条件下的所有fieldName值,[]interface{}类型
//res, err := mymongo.Distinct("test", "id", bson.M{})
	collection := db.Collection(collectionName)
	res, err = collection.Distinct(getContext(), fieldName, filter, opts...)
	return
}

根据_id查找文件

mongodb里面"_id"不是字符串,只是方便显示,在可视化界面中展示为字符串,如"5d36fc03d3a200b7aeecaeac"

如果要在golang中通过_id查找文件,需要先将字符串转化为primitive.ObjectID,然后再查找

1
2
ID, _ := primitive.ObjectIDFromHex(stringID)
err := collection.Find(getContext(),bson.M{"_id": ID})

删除文件

DeleteOne

要删除文档,请使用collection.DeleteOne():

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
// create ObjectID from string
id, err := primitive.ObjectIDFromHex("5c71ce5c6e6d43eb6e2e93be")
if err != nil {
	log.Fatal(err)
}

// delete document
res, err := col.DeleteOne(ctx, bson.M{"_id": id})
if err != nil {
	log.Fatal(err)
}
fmt.Printf("deleted count: %d\n", res.DeletedCount)

DeleteMany

最后,您可以使用collection.DeleteOne()或删除文档collection.DeleteMany()。在这里,您将传递bson.D{{}}过滤器参数,该参数将匹配集合中的所有文档。您还可以使用collection.Drop()删除整个集合。

1
2
3
4
5
deleteResult, err := collection.DeleteMany(context.TODO(), bson.D{{}})
if err != nil {
    log.Fatal(err)
}
fmt.Printf("Deleted %v documents in the trainers collection\n", deleteResult.DeletedCount)

bulkWrite

有时您想要执行许多操作,例如一次插入,更新和删除所有操作。在这种情况下,MongoDB提供了非常有用的批量写入API,它采用一系列写入操作并执行每个操作。默认情况下,操作按顺序执行。这是一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// list of inserts
inserts := []bson.M{
	{
		"title":      "post five",
		"tags":       []string{"postgresql"},
		"created_at": time.Now(),
	},
	{
		"title":      "post six",
		"tags":       []string{"graphql"},
		"created_at": time.Now(),
	},
}

// list of updates
updates := []struct {
	filter  bson.M
	updates bson.M
}{
	{
		filter: bson.M{
			"tags": bson.M{"$elemMatch": bson.M{"$eq": "golang"}},
		},
		updates: bson.M{"$set": bson.M{"updated_at": time.Now()}},
	},
}

// list of deletes
deletes := []bson.M{
	{"_id": id1},
	{"_id": id2},
	{"_id": id3},
}

// create the slice of write models
var writes []mongo.WriteModel

// range over each list of operations and create the write model
for _, ins := range inserts {
	model := mongo.NewInsertOneModel().SetDocument(ins)
	writes = append(writes, model)
}
for _, upd := range updates {
	model := mongo.NewUpdateManyModel().
		SetFilter(upd.filter).SetUpdate(upd.updates)
	writes = append(writes, model)
}
for _, del := range deletes {
	model := mongo.NewDeleteManyModel().SetFilter(del)
	writes = append(writes, model)
}

// run bulk write
res, err := col.BulkWrite(ctx, writes)
if err != nil {
	log.Fatal(err)
}
fmt.Printf(
	"insert: %d, updated: %d, deleted: %d",
	res.InsertedCount,
	res.ModifiedCount,
	res.DeletedCount,
)
// => insert: 2, updated: 10, deleted: 3

聚合查询

count

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//查询集合里面有多少数据
	if size, err = collection.CountDocuments(getContext(), bson.D{}); err != nil {
		checkErr(err)
	}
	fmt.Printf("Count里面有多少条数据:%d\n", size)

	//查询集合里面有多少数据(查询createtime>=3的数据)
	if size, err = collection.CountDocuments(getContext(), bson.M{"createtime": bson.M{"$gte": 3}}); err != nil {
		checkErr(err)
	}
	fmt.Printf("Count里面有多少条数据:%d\n", size)

func checkErr(err error) {
	if err != nil {
		if err == mongo.ErrNoDocuments {
			fmt.Println("没有查到数据")
			os.Exit(0)
		} else {
			fmt.Println(err)
			os.Exit(0)
		}

	}
}

EstimatedDocumentCount

估计集合内有多少数据

1
num, err := collection.EstimatedDocumentCount(getContext(), opts...)

aggregate

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
func TestAggregateJSON(t *testing.T) {
	var err error
	var client *mongo.Client
	var collection *mongo.Collection
	var cur *mongo.Cursor
	var ctx = context.Background()

	client = getMongoClient()
	defer client.Disconnect(ctx)
	seedCarsData(client, dbName)

	pipeline := `[
		{"$match": { "color": "Red" }},
		{"$group": { "_id": "$brand", "count": { "$sum": 1 } }},
		{"$project": { "brand": "$_id", "_id": 0, "count": 1 }}
	]`

	collection = client.Database(dbName).Collection(collectionName)
	var brands []interface{}
	if brands, err = collection.Distinct(ctx, "brand", bson.D{{Key: "color", Value: "Red"}}); err != nil {
		t.Fatal(err)
	}
	opts := options.Aggregate()
	opts.SetAllowDiskUse(true)
	opts.SetBatchSize(5)
	pipeLine := mongo.Pipeline{}
	err := bson.UnmarshalExtJSON([]byte(pipeline), true, &pipeLine)
	if cur, err = collection.Aggregate(ctx, pipeLine, opts); err != nil {
		t.Fatal(err)
	}
	defer cur.Close(ctx)
	total := 0
	for cur.Next(ctx) {
		total++
	}
	if total == 0 || total != len(brands) {
		t.Fatal("expected", len(brands), "but got", total)
	}
}

func TestAggregatePipeline(t *testing.T) {
	var err error
	var client *mongo.Client
	var collection *mongo.Collection
	var cur *mongo.Cursor
	var ctx = context.Background()

	client = getMongoClient()
	seedCarsData(client, dbName)

	// this cause warning from go vet
	// pipeline := mongo.Pipeline{
	// 	{{"$match", bson.D{{"color", "Red"}}}},
	// 	{{"$group", bson.D{{"_id", "$brand"}, {"count", bson.D{{"$sum", 1}}}}}},
	// 	{{"$project", bson.D{{"brand", "$_id"}, {"_id", 0}, {"count", 1}}}},
	// }
	pipeline := mongo.Pipeline{
		{{Key: "$match", Value: bson.D{{Key: "color", Value: "Red"}}}},
		{{Key: "$group", Value: bson.D{{Key: "_id", Value: "$brand"}, {Key: "count", Value: bson.D{{Key: "$sum", Value: 1}}}}}},
		{{Key: "$project", Value: bson.D{{Key: "brand", Value: "$_id"}, {Key: "_id", Value: 0}, {Key: "count", Value: 1}}}},
	}
	collection = client.Database(dbName).Collection(collectionName)
	var brands []interface{}
	if brands, err = collection.Distinct(ctx, "brand", bson.D{{Key: "color", Value: "Red"}}); err != nil {
		t.Fatal(err)
	}
	opts := options.Aggregate()
	opts.SetAllowDiskUse(true)
	opts.SetBatchSize(5)
	if cur, err = collection.Aggregate(ctx, pipeline, opts); err != nil {
		t.Fatal(err)
	}
	defer cur.Close(ctx)
	total := 0
	for cur.Next(ctx) {
		total++
	}
	if total == 0 || total != len(brands) {
		t.Fatal("expected", len(brands), "but got", total)
	}
}

func TestAggregateBSOND(t *testing.T) {
	var err error
	var client *mongo.Client
	var collection *mongo.Collection
	var cur *mongo.Cursor
	var ctx = context.Background()
	size := 10
	client = getMongoClient()
	seedCarsData(client, dbName)
	pipeline := []bson.D{bson.D{{"$sample", bson.D{{"size", size}}}}}
	collection = client.Database(dbName).Collection(collectionName)
	opts := options.Aggregate()
	if cur, err = collection.Aggregate(ctx, pipeline, opts); err != nil {
		t.Fatal(err)
	}
	defer cur.Close(ctx)
	total := 0
	for cur.Next(ctx) {
		total++
	}
	if total == 0 {
		t.Fatal("expected ", size, "but got", total)
	}
}

事务机制

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// WithSession允许用户自己启动会话并进行管理
//它的一生为CRUD方法提供会话的唯一方法是
//使用mongo.SessionContext调用该CRUD方法
//关闭mongo.SessionContext可以用作常规上下文,
//所以像context.WithDeadline和context.WithTimeout这样的方法是
// 支持的。
//
//如果context.Context已经附加了mongo.Session,那就是
// mongo.Session将替换为提供的。
//
//从闭包中返回的错误是透明地返回的
//这个功能
func WithSessionctx context.Contextsess Sessionfn funcSessionContexterrorerror {
return fncontextWithSessionctxsess))
}

// UseSession创建一个默认会话,该会话仅对
//关闭的生命周期关闭会话之外没有清理
//在退出封闭时完成。这意味着一个出色的
//即使闭包返回错误,也会中止事务。
//
//如果ctx已经包含mongo.Session,那mongo.Session将是
//替换为新创建的mongo.Session。
//
//从闭包中返回的错误是透明地返回的
//这个方法
funcc * ClientUseSessionctx context.Contextfn funcSessionContexterrorerror {
return c.UseSessionWithOptionsctxoptions.Session(),fn
}

WithSession

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
{
	var err error
	var client *mongo.Client
	var collection *mongo.Collection
	var ctx = context.Background()
	var id = primitive.NewObjectID()
	var doc = bson.M{"_id": id, "hometown": "Atlanta", "year": int32(1998)}
	var result *mongo.UpdateResult
	var session mongo.Session
	var update = bson.D{{Key: "$set", Value: bson.D{{Key: "year", Value: int32(2000)}}}}
	client = getMongoClient()
	defer client.Disconnect(ctx)

	if session, err = client.StartSession(); err != nil {
		t.Fatal(err)
	}
	if err = session.StartTransaction(); err != nil {
		t.Fatal(err)
	}
	if err = mongo.WithSession(ctx, session, func(sc mongo.SessionContext) error {
		if result, err = collection.UpdateOne(sc, bson.M{"_id": id}, update); err != nil {
			t.Fatal(err)
		}
		if result.MatchedCount != 1 || result.ModifiedCount != 1 {
			t.Fatal("replace failed, expected 1 but got", result.MatchedCount)
		}
		//提交
		if err = session.CommitTransaction(sc); err != nil {
			t.Fatal(err)
		}
		//放弃
		if err = session.AbortTransaction(sc); err != nil {
			t.Fatal(err)
		}
		return nil
	}); err != nil {
		t.Fatal(err)
	}
	session.EndSession(ctx)
}

UseSession

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import (
    "context"
    "github.com/mongodb/mongo-go-driver/mongo"
    "net/url"
    "fmt"
)

func main(){
    connectString := "mongodb://127.0.0.1/test"
    dbUrl, err := url.Parse(connectString)
    if err != nil {
        panic(err)
	}

    //认证参数设置,否则连不上
	opts := &options.ClientOptions{}
	opts.SetAuth(options.Credential{
			AuthMechanism:"SCRAM-SHA-1",
			AuthSource:"test",
			Username:"test",
			Password:"123456"})

    client, err = mongo.Connect(context.Background(), connectStringopts)
    if err != nil {
        panic(err)
	}

    db := client.Database(dbUrl.Path[1:])

    ctx := context.Background()
    defer db.Client().Disconnect(ctx)

    col := db.Collection("test")

    //先在事务外写一条id为“111”的记录
    _,err = col.InsertOne(ctx, bson.M{"_id": "111", "name": "ddd", "age": 50})
    if(err != nil){
        fmt.Println(err)
        return
    }

    //第一个事务:成功执行
    db.Client().UseSession(ctx, func(sessionContext mongo.SessionContext) error {
        err = sessionContext.StartTransaction()
        if(err != nil){
            fmt.Println(err)
            return err
        }

        //在事务内写一条id为“222”的记录
        _, err = col.InsertOne(sessionContext, bson.M{"_id": "222", "name": "ddd", "age": 50})
        if(err != nil){
            fmt.Println(err)
            return err
        }

        //在事务内写一条id为“333”的记录
        _, err = col.InsertOne(sessionContext, bson.M{"_id": "333", "name": "ddd", "age": 50})
        if err != nil {
            sessionContext.AbortTransaction(sessionContext)
            return err
        }else {
            sessionContext.CommitTransaction(sessionContext)
        }
        return nil
    })

    //第二个事务:执行失败,事务没提交,因最后插入了一条重复id "111",
    err = db.Client().UseSession(ctx, func(sessionContext mongo.SessionContext) error {
        err := sessionContext.StartTransaction()
        if(err != nil){
            fmt.Println(err)
            return err
        }

        //在事务内写一条id为“222”的记录
        _, err = col.InsertOne(sessionContext, bson.M{"_id": "444", "name": "ddd", "age": 50})
        if(err != nil){
            fmt.Println(err)
            return err
        }

		//写重复id
        _, err = col.InsertOne(sessionContext, bson.M{"_id": "111", "name": "ddd", "age": 50})
        if err != nil {
            sessionContext.AbortTransaction(sessionContext)
            return err
        }else {
            sessionContext.CommitTransaction(sessionContext)
        }
        return nil
    })
}

//最终数据只有 "111","222","333" 三条,事务测试成功。

RunCommand

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
{
	var err error
	var client *mongo.Client
	var result bson.M
	client = getMongoClient()
	defer client.Disconnect(context.Background())
	command := bson.D{{Key: "isMaster", Value: 1}}
	if err = client.Database("admin").RunCommand(context.Background(), command).Decode(&result); err != nil {
		t.Fatal(err)
	}
}

gridfs

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
	var err error
	var client *mongo.Client
	var bucket *gridfs.Bucket
	var ustream *gridfs.UploadStream
	str := "This is a test file"
	client = getMongoClient()
	defer client.Disconnect(context.Background())

	if bucket, err = gridfs.NewBucket(client.Database(dbName), options.GridFSBucket().SetName("myFiles")); err != nil {
		t.Fatal(err)
	}

	opts := options.GridFSUpload()
	opts.SetMetadata(bsonx.Doc{{Key: "content-type", Value: bsonx.String("application/json")}})
	if ustream, err = bucket.OpenUploadStream("test.txt", opts); err != nil {
		t.Fatal(err)
	}

	if _, err = ustream.Write([]byte(str)); err != nil {
		t.Fatal(err)
	}

	fileID := ustream.FileID
	ustream.Close()
	var b bytes.Buffer
	w := bufio.NewWriter(&b)

	if _, err = bucket.DownloadToStream(fileID, w); err != nil {
		t.Fatal(err, ustream.FileID)
	}

	if b.String() != str {
		t.Fatal("expected", str, "but got", b.String())
	}

change stream

change_stream.go

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Copyright 2018 Kuei-chun Chen. All rights reserved.

package examples

import (
	"context"
	"fmt"
	"log"

	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

// ChangeStream defines what to watch? client, database or collection
type ChangeStream struct {
	collection string
	database   string
	pipeline   []bson.D
}

type callback func(bson.M)

// SetCollection sets collection
func (cs *ChangeStream) SetCollection(collection string) {
	cs.collection = collection
}

// SetDatabase sets database
func (cs *ChangeStream) SetDatabase(database string) {
	cs.database = database
}

// SetPipeline sets pipeline
func (cs *ChangeStream) SetPipeline(pipeline []bson.D) {
	cs.pipeline = pipeline
}

// NewChangeStream gets a new ChangeStream
func NewChangeStream() *ChangeStream {
	return &ChangeStream{}
}

// Watch prints oplogs in JSON format
func (cs *ChangeStream) Watch(client *mongo.Client, cb callback) {
	var err error
	var ctx = context.Background()
	var cur *mongo.ChangeStream
	fmt.Println("pipeline", cs.pipeline)
	opts := options.ChangeStream()
	opts.SetFullDocument("updateLookup")
	if cs.collection != "" && cs.database != "" {
		fmt.Println("Watching", cs.database+"."+cs.collection)
		var coll = client.Database(cs.database).Collection(cs.collection)
		if cur, err = coll.Watch(ctx, cs.pipeline, opts); err != nil {
			panic(err)
		}
	} else if cs.database != "" {
		fmt.Println("Watching", cs.database)
		var db = client.Database(cs.database)
		if cur, err = db.Watch(ctx, cs.pipeline, opts); err != nil {
			panic(err)
		}
	} else {
		fmt.Println("Watching all")
		if cur, err = client.Watch(ctx, cs.pipeline, opts); err != nil {
			panic(err)
		}
	}

	defer cur.Close(ctx)
	var doc bson.M
	for cur.Next(ctx) {
		if err = cur.Decode(&doc); err != nil {
			log.Fatal(err)
		}
		cb(doc)
	}
	if err = cur.Err(); err != nil {
		log.Fatal(err)
	}
}

change_stream_test.go

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright 2018 Kuei-chun Chen. All rights reserved.

package examples

import (
	"context"
	"encoding/json"
	"os"
	"testing"
	"time"

	"github.com/simagix/keyhole/mdb"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/primitive"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/x/network/connstring"
)

var collection = "oplogs"

// example: argos "mongodb://localhost:27017/argos?replicaSet=replset" students '[{"$match": {"operationType": "update"}}]'
func silent(doc bson.M) {
}

func TestChangeStreamClient(t *testing.T) {
	var err error
	var client *mongo.Client
	var cs connstring.ConnString
	var ctx = context.Background()
	var uri = "mongodb://localhost:27017/argos?replicaSet=replset"
	if os.Getenv("DATABASE_URL") != "" {
		uri = os.Getenv("DATABASE_URL")
	}
	if cs, err = connstring.Parse(uri); err != nil {
		t.Fatal(err)
	}
	client = getMongoClient()
	defer client.Disconnect(ctx)
	var pipeline []bson.D
	pipeline = mongo.Pipeline{}
	c := client.Database(cs.Database).Collection(collection)
	c.InsertOne(ctx, bson.M{"city": "Atlanta"})

	go func(c *mongo.Collection) {
		execute(c)
		client.Disconnect(context.Background())
	}(c)

	stream := NewChangeStream()
	stream.SetPipeline(pipeline)
	// stream.Watch(client)
}

func TestChangeStreamDatabase(t *testing.T) {
	var err error
	var client *mongo.Client
	var cs connstring.ConnString
	var ctx = context.Background()
	var uri = "mongodb://localhost:27017/argos?replicaSet=replset"
	if os.Getenv("DATABASE_URL") != "" {
		uri = os.Getenv("DATABASE_URL")
	}
	if cs, err = connstring.Parse(uri); err != nil {
		t.Fatal(err)
	}
	client = getMongoClient()
	var pipeline []bson.D
	pipeline = mongo.Pipeline{}
	c := client.Database(cs.Database).Collection(collection)
	c.InsertOne(ctx, bson.M{"city": "Atlanta"})

	go func(c *mongo.Collection) {
		execute(c)
		client.Database(cs.Database).Drop(context.Background())
	}(c)

	stream := NewChangeStream()
	stream.SetDatabase(cs.Database)
	stream.SetPipeline(pipeline)
	stream.Watch(client, silent)
}

func TestChangeStreamCollection(t *testing.T) {
	var err error
	var client *mongo.Client
	var cs connstring.ConnString
	var ctx = context.Background()
	var uri = "mongodb://localhost:27017/argos?replicaSet=replset"
	if os.Getenv("DATABASE_URL") != "" {
		uri = os.Getenv("DATABASE_URL")
	}
	if cs, err = connstring.Parse(uri); err != nil {
		t.Fatal(err)
	}
	client = getMongoClient()
	var pipeline []bson.D
	pipeline = mongo.Pipeline{}
	c := client.Database(cs.Database).Collection(collection)
	c.InsertOne(ctx, bson.M{"city": "Atlanta"})

	go func(c *mongo.Collection) {
		execute(c)
	}(c)

	stream := NewChangeStream()
	stream.SetCollection(collection)
	stream.SetDatabase(cs.Database)
	stream.SetPipeline(pipeline)
	stream.Watch(client, silent)
}

func TestChangeStreamCollectionWithPipeline(t *testing.T) {
	var err error
	var client *mongo.Client
	var cs connstring.ConnString
	var ctx = context.Background()
	var uri = "mongodb://localhost:27017/argos?replicaSet=replset"
	if os.Getenv("DATABASE_URL") != "" {
		uri = os.Getenv("DATABASE_URL")
	}
	if cs, err = connstring.Parse(uri); err != nil {
		t.Fatal(err)
	}
	client = getMongoClient()
	var pipeline = mdb.MongoPipeline(`[{"$match": {"operationType": {"$in": ["update", "delete"] } }}]`)
	c := client.Database(cs.Database).Collection(collection)
	c.InsertOne(ctx, bson.M{"city": "Atlanta"})

	go func(c *mongo.Collection) {
		execute(c)
	}(c)

	stream := NewChangeStream()
	stream.SetCollection(collection)
	stream.SetDatabase(cs.Database)
	stream.SetPipeline(pipeline)
	stream.Watch(client, silent)
}

func execute(c *mongo.Collection) {
	time.Sleep(2 * time.Second) // wait for change stream to init
	var doc = bson.M{"_id": primitive.NewObjectID(), "hometown": "Atlanta"}
	c.InsertOne(context.Background(), doc)
	var update bson.M
	json.Unmarshal([]byte(`{ "$set": {"year": 1998}}`), &update)
	c.UpdateOne(context.Background(), bson.M{"_id": doc["_id"]}, update)
	c.DeleteMany(context.Background(), bson.M{"hometown": "Atlanta"})
	time.Sleep(1 * time.Second) // wait for CS to print messages
	c.Drop(context.Background())
}

参考: https://blog.csdn.net/sdghchj/article/details/85249392 https://blog.fudenglong.site/2018/11/17/MongoDB-Go%E5%AE%A2%E6%88%B7%E7%AB%AF%E5%AD%A6%E4%B9%A0/ https://blog.csdn.net/henreash/article/details/86745584 https://github.com/simagix/mongo-go-examples/tree/master/examples