環境 
macOS 
Go 1.13.4 
MongoDB 4.2.2 
 
前言 對 MongoDB 有多個操作時,可以使用 bulkWrite() 方法提高效能。此方法將每 100,000 個請求做為一個批次發送至服務器,而不是每一次發送一個請求。
建立專案 建立專案目錄。
1 mkdir  -p $GOPATH /src/github.com/memochou1993/mongo-bulk-example
進到專案目錄。
1 cd  $GOPATH /src/github.com/memochou1993/mongo-bulk-example
初始化 Go Modules。
1 go mod init github.com/memochou1993/mongo-bulk-example 
安裝套件 安裝 go.mongodb.org/mongo-driver 套件。
1 go get go.mongodb.org/mongo-driver 
做法 新增 main.go 檔:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package  mainimport  (	"context"  	"fmt"  	"log"  	"strconv"  	"time"  	"go.mongodb.org/mongo-driver/bson"  	"go.mongodb.org/mongo-driver/mongo"  	"go.mongodb.org/mongo-driver/mongo/options"  ) const  (	uri        = "mongodb://localhost:27017"  	database   = "mongo"  	collection = "items"  ) var  (	err      error  	client   *mongo.Client 	item     Item 	duration time.Duration  	times    int64  = 10   	amount   int    = 100   	method   int   ) type  Item struct  {	Value string  } func  main () 	ctx, cancel := context.WithTimeout(context.Background(), 30 *time.Minute) 	defer  cancel() 	opts := options.Client().ApplyURI(uri) 	if  client, err = mongo.Connect(ctx, opts); err != nil  { 		log.Fatalln(err.Error()) 	} 	 	 	fmt.Println(duration / (time.Duration(times) * time.Millisecond) * time.Millisecond) } 
新增一個 upsert() 方法,用來更新或新增記錄。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func  upsert (ctx context.Context, c *mongo.Collection, amount int ) 	defer  measure(time.Now()) 	for  i := 0 ; i <= amount; i++ { 		query := bson.M{"id" : i} 		update := bson.M{"$set" : Item{Value: "New Item "  + strconv.Itoa(i)}} 		opts := options.Update().SetUpsert(true ) 		_, err := c.UpdateOne(ctx, query, update, opts) 		if  err != nil  { 			log.Fatalln(err.Error()) 		} 	} 	 	if  err := c.Drop(ctx); err != nil  { 		log.Fatalln(err.Error()) 	} } 
新增一個 bulkUpsert() 方法,用來批量更新或新增記錄。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func  bulkUpsert (ctx context.Context, c *mongo.Collection, amount int ) 	defer  measure(time.Now()) 	models := []mongo.WriteModel{} 	for  i := 0 ; i <= amount; i++ { 		query := bson.M{"id" : i} 		update := bson.M{"$set" : Item{Value: "New Item "  + strconv.Itoa(i)}} 		model := mongo.NewUpdateOneModel() 		models = append (models, model.SetFilter(query).SetUpdate(update).SetUpsert(true )) 	} 	 	opts := options.BulkWrite().SetOrdered(false ) 	_, err := c.BulkWrite(ctx, models, opts) 	if  err != nil  { 		log.Fatalln(err.Error()) 	} 	 	if  err = c.Drop(ctx); err != nil  { 		log.Fatalln(err.Error()) 	} } 
新增一個 measure 方法,用來計算經過時間:
1 2 3 4 5 6 7 func  measure (start time.Time) 	duration += time.Since(start) 	 	log.Printf("Execution time: %s" , time.Since(start)) 	 	log.Printf("Elapsed time: %s " , duration) } 
測試 將 main() 方法修改如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func  main () 	ctx, cancel := context.WithTimeout(context.Background(), 30 *time.Minute) 	defer  cancel() 	opts := options.Client().ApplyURI(uri) 	if  client, err = mongo.Connect(ctx, opts); err != nil  { 		log.Fatalln(err.Error()) 	} 	c := client.Database(database).Collection(collection) 	 	fmt.Scan(&method) 	var  i int64  	for  i = 0 ; i < times; i++ { 		if  method == 1  { 			upsert(ctx, c, amount) 			continue  		} 		if  method == 2  { 			bulkUpsert(ctx, c, amount) 			continue  		} 		break  	} 	log.Printf("Average time: %s" , duration/(time.Duration(times)*time.Millisecond)*time.Millisecond) } 
測試 upsert() 方法:
測試 bulkUpsert() 方法:
結果 執行時間比較如下:
測試次數 
資料筆數 
非批量寫入(秒) 
批量寫入(秒) 
 
 
10 
10 
0.024 
0.016 
 
10 
100 
0.088 
0.029 
 
10 
1,000 
1.1 
0.424 
 
5 
10,000 
42.3 
34.8 
 
3 
5,0000 
478.3 
415.6 
 
寫入速度比較如下:
測試次數 
資料筆數 
非批量寫入(筆/秒) 
批量寫入(筆/秒) 
 
 
10 
10 
416.7 
625.0 
 
10 
100 
1136.4 
3448.3 
 
10 
1,000 
909.0 
2358.5 
 
5 
10,000 
236.4 
287.4 
 
3 
5,0000 
104.529 
120.3 
 
程式碼 
參考資料