環境
macOS
Go 1.13.4
MongoDB 4.2.2
前言 對 MongoDB 有多個操作時,可以使用 bulkWrite()
方法提高效能。此方法將每 100,000 個請求做為一個批次發送至服務器,而不是每一次發送一個請求。
建立專案 建立專案目錄。
1 mkdir -p $GOPATH /src/github.com/memochou1993/mongo-bulk-example
進到專案目錄。
1 cd $GOPATH /src/github.com/memochou1993/mongo-bulk-example
初始化 Go Modules。
1 go mod init github.com/memochou1993/mongo-bulk-example
安裝套件 安裝 go.mongodb.org/mongo-driver
套件。
1 go get go.mongodb.org/mongo-driver
做法 新增 main.go
檔:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package mainimport ( "context" "fmt" "log" "strconv" "time" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) const ( uri = "mongodb://localhost:27017" database = "mongo" collection = "items" ) var ( err error client *mongo.Client item Item duration time.Duration times int64 = 10 amount int = 100 method int ) type Item struct { Value string } func main () { ctx, cancel := context.WithTimeout(context.Background(), 30 *time.Minute) defer cancel() opts := options.Client().ApplyURI(uri) if client, err = mongo.Connect(ctx, opts); err != nil { log.Fatalln(err.Error()) } fmt.Println(duration / (time.Duration(times) * time.Millisecond) * time.Millisecond) }
新增一個 upsert()
方法,用來更新或新增記錄。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 func upsert (ctx context.Context, c *mongo.Collection, amount int ) { defer measure(time.Now()) for i := 0 ; i <= amount; i++ { query := bson.M{"id" : i} update := bson.M{"$set" : Item{Value: "New Item " + strconv.Itoa(i)}} opts := options.Update().SetUpsert(true ) _, err := c.UpdateOne(ctx, query, update, opts) if err != nil { log.Fatalln(err.Error()) } } if err := c.Drop(ctx); err != nil { log.Fatalln(err.Error()) } }
新增一個 bulkUpsert()
方法,用來批量更新或新增記錄。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 func bulkUpsert (ctx context.Context, c *mongo.Collection, amount int ) { defer measure(time.Now()) models := []mongo.WriteModel{} for i := 0 ; i <= amount; i++ { query := bson.M{"id" : i} update := bson.M{"$set" : Item{Value: "New Item " + strconv.Itoa(i)}} model := mongo.NewUpdateOneModel() models = append (models, model.SetFilter(query).SetUpdate(update).SetUpsert(true )) } opts := options.BulkWrite().SetOrdered(false ) _, err := c.BulkWrite(ctx, models, opts) if err != nil { log.Fatalln(err.Error()) } if err = c.Drop(ctx); err != nil { log.Fatalln(err.Error()) } }
新增一個 measure
方法,用來計算經過時間:
1 2 3 4 5 6 7 func measure (start time.Time) { duration += time.Since(start) log.Printf("Execution time: %s" , time.Since(start)) log.Printf("Elapsed time: %s " , duration) }
測試 將 main()
方法修改如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 func main () { ctx, cancel := context.WithTimeout(context.Background(), 30 *time.Minute) defer cancel() opts := options.Client().ApplyURI(uri) if client, err = mongo.Connect(ctx, opts); err != nil { log.Fatalln(err.Error()) } c := client.Database(database).Collection(collection) fmt.Scan(&method) var i int64 for i = 0 ; i < times; i++ { if method == 1 { upsert(ctx, c, amount) continue } if method == 2 { bulkUpsert(ctx, c, amount) continue } break } log.Printf("Average time: %s" , duration/(time.Duration(times)*time.Millisecond)*time.Millisecond) }
測試 upsert()
方法:
測試 bulkUpsert()
方法:
結果 執行時間比較如下:
測試次數
資料筆數
非批量寫入(秒)
批量寫入(秒)
10
10
0.024
0.016
10
100
0.088
0.029
10
1,000
1.1
0.424
5
10,000
42.3
34.8
3
5,0000
478.3
415.6
寫入速度比較如下:
測試次數
資料筆數
非批量寫入(筆/秒)
批量寫入(筆/秒)
10
10
416.7
625.0
10
100
1136.4
3448.3
10
1,000
909.0
2358.5
5
10,000
236.4
287.4
3
5,0000
104.529
120.3
程式碼
參考資料