一,writeConcern
writeConcern 決定一個寫操作落到多少個節(jié)點(diǎn)上才算成功。writeConcern 的取值包括:
? 0:發(fā)起寫操作,不關(guān)心是否成功;
? 1~集群最大數(shù)據(jù)節(jié)點(diǎn)數(shù):寫操作需要被復(fù)制到指定節(jié)點(diǎn)數(shù)才算成功;
? majority:寫操作需要被復(fù)制到大多數(shù)節(jié)點(diǎn)上才算成功。
發(fā)起寫操作的程序?qū)⒆枞綄懖僮鞯竭_(dá)指定的節(jié)點(diǎn)數(shù)為止
1.1 writeConcern測試
//進(jìn)入主節(jié)點(diǎn),執(zhí)行
conf=rs.conf()
conf.members[2].slaveDelay = 5//延遲5秒
conf.members[2].priority = 0//投票優(yōu)先級設(shè)置為0
rs.reconfig(conf)
db.test.insertOne({count: 1}, {writeConcern: {w: 3}})//5秒后執(zhí)行完成
//返回
{
"acknowledged" : true,
"insertedId" : ObjectId("60072bacd28a60ee717d761e")
}
db.test.insertOne({count:2},{writeConcern: {w: 4, wtimeout:3000}})//超過節(jié)點(diǎn)數(shù)
//返回
WriteConcernError({
"code" : 100,
"codeName" : "UnsatisfiableWriteConcern",
"errmsg" : "Not enough data-bearing nodes",
"errInfo" : {
"writeConcern" : {
"w" : 4,
"wtimeout" : 3000,
"provenance" : "clientSupplied"
}
}
})
//查詢
db.test.find({count:2})
//返回
{ "_id" : ObjectId("60072f63d28a60ee717d7623"), "count" : 2 }
db.test.insertOne({count:3},{writeConcern: {w: 3, wtimeout:3000}})//設(shè)置3秒超時,返回會報錯
//返回
WriteConcernError({
"code" : 64,
"codeName" : "WriteConcernFailed",
"errmsg" : "waiting for replication timed out",
"errInfo" : {
"wtimeout" : true,
"writeConcern" : {
"w" : 3,
"wtimeout" : 3000,
"provenance" : "clientSupplied"
}
}
})
//主節(jié)點(diǎn)查詢
db.test.find({count:3})
//返回
{ "_id" : ObjectId("60072c34d28a60ee717d7620"), "count" : 3 }
//過了5秒,進(jìn)入第二個從節(jié)點(diǎn)執(zhí)行
db.test.find({count:3})
//返回
{ "_id" : ObjectId("60072c34d28a60ee717d7620"), "count" : 3 }
writeConcern大于總節(jié)點(diǎn)數(shù),或者等于總節(jié)點(diǎn)數(shù)但有一個節(jié)點(diǎn)故障,寫入會報錯,但數(shù)據(jù)還是會寫入,但這種錯誤沒有必要,重要數(shù)據(jù)設(shè)置成majority就可以了
設(shè)置超時,雖然會報錯,當(dāng)數(shù)據(jù)實(shí)際上是主節(jié)點(diǎn)先寫入,然后再等待其他節(jié)點(diǎn)同步數(shù)據(jù)時發(fā)生超時,數(shù)據(jù)還是已經(jīng)入庫了,也不會因?yàn)槌瑫r錯誤停止同步
通常應(yīng)對重要數(shù)據(jù)應(yīng)用 {w: “majority”},普通數(shù)據(jù)可以應(yīng)用 {w: 1} 以確保最佳性能。
writeConcern寫操作返回耗時受到同步及從節(jié)點(diǎn)性能影響,但并不會顯著增加集群壓力,因此無論是否等待,寫操作最終都會復(fù)制到所有節(jié)點(diǎn)上。設(shè)置 writeConcern 只是讓寫操作等待復(fù)制后再返回而已
二,讀數(shù)據(jù)
作為集群數(shù)據(jù)庫讀取時需要關(guān)注2個問題:
1,去哪讀-readPreference
2,數(shù)據(jù)隔離性-readConcern
2.1 readPreference
readPreference 決定使用哪一個節(jié)點(diǎn)來滿足正在發(fā)起的讀請求。可選值包括:
? primary: 只選擇主節(jié)點(diǎn);
? primaryPreferred:優(yōu)先選擇主節(jié)點(diǎn),如果不可用則選擇從節(jié)點(diǎn);
? secondary:只選擇從節(jié)點(diǎn);
? secondaryPreferred:優(yōu)先選擇從節(jié)點(diǎn),如果從節(jié)點(diǎn)不可用則選擇主節(jié)點(diǎn);
? nearest:選擇最近的節(jié)點(diǎn);
除此之外還可以通過標(biāo)簽來選擇讀取節(jié)點(diǎn)
2.1 readConcern
? available: 讀取所有可用數(shù)據(jù);
? local:讀取屬于當(dāng)前分片的所有可用數(shù)據(jù);
? majority:讀取大部分已提交的;
? linearizable:線性讀;
? snapshot:快照讀;
available與local的區(qū)別
在復(fù)制集下面local和available沒有區(qū)別,區(qū)別在于分片集遷移數(shù)據(jù)時,當(dāng)chunk x需要從shard1遷移到shard2上面的過程中,在shard1,shard2上面都有chunk x的數(shù)據(jù),但shard1為負(fù)責(zé)方。所有對chunk x的讀寫操作都會進(jìn)入shard1,如果指定對shard2讀取,available會讀取包含chunk x的數(shù)據(jù),而local只會讀取當(dāng)前分片負(fù)責(zé)的數(shù)據(jù)
majority
首先主節(jié)點(diǎn)寫入數(shù)據(jù),然后同步給從節(jié)點(diǎn),當(dāng)主節(jié)點(diǎn)收到大部分節(jié)點(diǎn)的寫入確認(rèn)后,再同步從節(jié)點(diǎn)數(shù)據(jù)已大部分節(jié)點(diǎn)寫入,收到的從節(jié)點(diǎn)就認(rèn)為這個數(shù)據(jù)已經(jīng)是majority的,可被majority讀取的。當(dāng)有個節(jié)點(diǎn)沒有收到數(shù)據(jù)或者majority確認(rèn),但是大部分節(jié)點(diǎn)已經(jīng)寫入了,當(dāng)前節(jié)點(diǎn)的視角是沒有該數(shù)據(jù)的
majority作用:
當(dāng)主節(jié)點(diǎn)掛掉前將數(shù)據(jù)x=1,改為x=2,數(shù)據(jù)還沒同步給從節(jié)點(diǎn),沒有設(shè)置majority,主節(jié)點(diǎn)讀到x=2,主節(jié)點(diǎn)就掛掉了,x=2就再也無法獲取,原來的就變成了臟讀。
linearizable
majority大部分時候都能保證不會出現(xiàn)臟讀,有一種特殊的情況,majority并不能很好支持。
存在server1,server2連接著node1,node1為主節(jié)點(diǎn),當(dāng)node1與其他節(jié)點(diǎn)失聯(lián),當(dāng)server2可以連接node1。這時server1通過選舉重新連接到node2作為主節(jié)點(diǎn),往node2寫入數(shù)據(jù)。但是server2卻認(rèn)為node1是主節(jié)點(diǎn),獲取不到對應(yīng)數(shù)據(jù),導(dǎo)致數(shù)據(jù)不一致。這種情況需要設(shè)置linearizable,這個配置在獲取數(shù)據(jù)前,或從其他節(jié)點(diǎn)檢查數(shù)據(jù)是否是真的最新。
三,因果一致性
有的博客包括極客時間上面提到,寫入mongo時,設(shè)置writeConcern=majority,讀取時設(shè)置readPreference=secondary,readConcern=majority就可以得到很好的性能且數(shù)據(jù)一致
其實(shí)不是的,先解釋下這樣子設(shè)置的意思,首先寫保證大部分節(jié)點(diǎn)已經(jīng)寫入,讀取時選擇從節(jié)點(diǎn),且是被大部分從節(jié)點(diǎn)有被寫入的數(shù)據(jù)才能被讀取。
這種情況會出現(xiàn)數(shù)據(jù)不一致:當(dāng)前有一個一主二從的集群,當(dāng)主節(jié)點(diǎn)與從節(jié)點(diǎn)1數(shù)據(jù)被寫入了,當(dāng)時從節(jié)點(diǎn)2沒有被寫入,在主節(jié)點(diǎn)和從節(jié)點(diǎn)1的視角看,這條數(shù)據(jù)已經(jīng)被寫入,且是mayjority的,但是如果訪問的從節(jié)點(diǎn)2視角看,是不存在這條數(shù)據(jù)的,當(dāng)我們設(shè)置readPreference=secondary時,剛好驅(qū)動剛好選擇的是從節(jié)點(diǎn)2,數(shù)據(jù)會不一致
實(shí)驗(yàn):
首先搭建一個一主二從的集群,進(jìn)入從節(jié)點(diǎn)2的shell,執(zhí)行
/*
把節(jié)點(diǎn)2鎖住,用來模擬從節(jié)點(diǎn)延遲
不要設(shè)置延遲節(jié)點(diǎn)來模擬節(jié)點(diǎn)延遲,在golang的驅(qū)動庫中就算延遲節(jié)點(diǎn)沒有設(shè)置隱藏,也不會選為讀取節(jié)點(diǎn)
*/
db.fsyncLock()
下面這段代碼有三組測試:
1,沒有使用session,會出現(xiàn)數(shù)據(jù)不一致
2,多個session,CausalConsistency為true,會出現(xiàn)不一致
3,單個session,CausalConsistency為false,會出現(xiàn)不一致,當(dāng)把SetCausalConsistency(false)這句刪除,驅(qū)動默認(rèn)為true,數(shù)據(jù)一致
所以在不考慮事務(wù)的情況,需要寫入或者修改的數(shù)據(jù)能被立馬讀到需要滿足:
1,寫與讀在同一個session
2,session的CausalConsistency為true
更多CausalConsistency的介紹:
1,https://docs.mongodb.com/manual/reference/server-sessions/
2,https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency
3,https://docs.mongodb.com/manual/core/transactions/
package main
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readconcern"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/mongo/writeconcern"
"sync"
"time"
)
func main() {
noSessionTest()
multiSessionTest()
singleSessionTest()
}
func newCollection(client *mongo.Client) *mongo.Collection {
option := options.CollectionOptions{}
return client.Database("alex").
Collection("test",
option.SetWriteConcern(writeconcern.New(writeconcern.WMajority())).
SetReadPreference(readpref.Secondary()).
SetReadConcern(readconcern.Majority()))
}
func newClient() *mongo.Client {
uri := "mongodb://member1.example.com:28017,member2.example.com:28018,member3.example.com:28019/admin?replicaSet=rs0"
ctx := context.Background()
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err != nil {
panic(err)
}
err = client.Ping(ctx, nil)
if err != nil {
panic(err)
}
fmt.Println("Successfully connected and pinged.")
return client
}
func multiSessionTest() {
ctx := context.TODO()
client := newClient()
opts := options.Session().
SetDefaultReadConcern(readconcern.Majority()).
SetDefaultReadPreference(readpref.Secondary())
sess, err := client.StartSession(opts)
if err != nil {
panic(err)
}
var insertId interface{}
err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
coll :=newCollection(client)
insertId = insert(sessionContext,coll)
return nil
})
sess, err = client.StartSession(opts)
if err != nil {
panic(err)
}
err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
coll :=newCollection(client)
find(sessionContext,coll,insertId)
return nil
})
}
func singleSessionTest() {
ctx := context.TODO()
client := newClient()
opts := options.Session().
SetDefaultReadConcern(readconcern.Majority()).
SetDefaultReadPreference(readpref.Secondary()).
SetCausalConsistency(false)
sess, err := client.StartSession(opts)
if err != nil {
panic(err)
}
var insertId interface{}
err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
coll :=newCollection(client)
insertId = insert(sessionContext,coll)
find(sessionContext,coll,insertId)
return nil
})
}
func noSessionTest() {
ctx := context.TODO()
client := newClient()
coll := newCollection(client)
insertId := insert(ctx, coll)
find(ctx,coll,insertId)
}
func insert(ctx context.Context, coll *mongo.Collection) (insertId interface{}) {
res, err := coll.InsertOne(ctx, bson.M{"time": time.Now().Unix()})
if err != nil {
panic(err)
}
return res.InsertedID
}
func find(ctx context.Context, coll *mongo.Collection, insertId interface{}) {
wait := sync.WaitGroup{}
wait.Add(100)
for i := 0; i < 100; i++ {
go func() {
result := coll.FindOne(ctx, bson.M{"_id": insertId})
if result.Err() != nil {
fmt.Println(result.Err())
} else {
fmt.Println(true)
}
wait.Done()
}()
}
wait.Wait()
}