六,mongo事務(wù)

一,writeConcern

writeConcern 決定一個寫操作落到多少個節(jié)點(diǎn)上才算成功。writeConcern 的取值包括:
? 0:發(fā)起寫操作,不關(guān)心是否成功;
? 1~集群最大數(shù)據(jù)節(jié)點(diǎn)數(shù):寫操作需要被復(fù)制到指定節(jié)點(diǎn)數(shù)才算成功;
? majority:寫操作需要被復(fù)制到大多數(shù)節(jié)點(diǎn)上才算成功。
發(fā)起寫操作的程序?qū)⒆枞綄懖僮鞯竭_(dá)指定的節(jié)點(diǎn)數(shù)為止

1.1 writeConcern測試

//進(jìn)入主節(jié)點(diǎn),執(zhí)行
conf=rs.conf()
conf.members[2].slaveDelay = 5//延遲5秒
conf.members[2].priority = 0//投票優(yōu)先級設(shè)置為0
rs.reconfig(conf)

db.test.insertOne({count: 1}, {writeConcern: {w: 3}})//5秒后執(zhí)行完成
//返回
{
        "acknowledged" : true,
        "insertedId" : ObjectId("60072bacd28a60ee717d761e")
}

db.test.insertOne({count:2},{writeConcern: {w: 4, wtimeout:3000}})//超過節(jié)點(diǎn)數(shù)
//返回
WriteConcernError({
        "code" : 100,
        "codeName" : "UnsatisfiableWriteConcern",
        "errmsg" : "Not enough data-bearing nodes",
        "errInfo" : {
                "writeConcern" : {
                        "w" : 4,
                        "wtimeout" : 3000,
                        "provenance" : "clientSupplied"
                }
        }
})
//查詢
db.test.find({count:2})
//返回
{ "_id" : ObjectId("60072f63d28a60ee717d7623"), "count" : 2 }

db.test.insertOne({count:3},{writeConcern: {w: 3, wtimeout:3000}})//設(shè)置3秒超時,返回會報錯
//返回
WriteConcernError({
        "code" : 64,
        "codeName" : "WriteConcernFailed",
        "errmsg" : "waiting for replication timed out",
        "errInfo" : {
                "wtimeout" : true,
                "writeConcern" : {
                        "w" : 3,
                        "wtimeout" : 3000,
                        "provenance" : "clientSupplied"
                }
        }
})

//主節(jié)點(diǎn)查詢
db.test.find({count:3})
//返回
{ "_id" : ObjectId("60072c34d28a60ee717d7620"), "count" : 3 }

//過了5秒,進(jìn)入第二個從節(jié)點(diǎn)執(zhí)行
db.test.find({count:3})
//返回
{ "_id" : ObjectId("60072c34d28a60ee717d7620"), "count" : 3 }

writeConcern大于總節(jié)點(diǎn)數(shù),或者等于總節(jié)點(diǎn)數(shù)但有一個節(jié)點(diǎn)故障,寫入會報錯,但數(shù)據(jù)還是會寫入,但這種錯誤沒有必要,重要數(shù)據(jù)設(shè)置成majority就可以了

設(shè)置超時,雖然會報錯,當(dāng)數(shù)據(jù)實(shí)際上是主節(jié)點(diǎn)先寫入,然后再等待其他節(jié)點(diǎn)同步數(shù)據(jù)時發(fā)生超時,數(shù)據(jù)還是已經(jīng)入庫了,也不會因?yàn)槌瑫r錯誤停止同步

通常應(yīng)對重要數(shù)據(jù)應(yīng)用 {w: “majority”},普通數(shù)據(jù)可以應(yīng)用 {w: 1} 以確保最佳性能。

writeConcern寫操作返回耗時受到同步及從節(jié)點(diǎn)性能影響,但并不會顯著增加集群壓力,因此無論是否等待,寫操作最終都會復(fù)制到所有節(jié)點(diǎn)上。設(shè)置 writeConcern 只是讓寫操作等待復(fù)制后再返回而已

二,讀數(shù)據(jù)

作為集群數(shù)據(jù)庫讀取時需要關(guān)注2個問題:
1,去哪讀-readPreference
2,數(shù)據(jù)隔離性-readConcern

2.1 readPreference

readPreference 決定使用哪一個節(jié)點(diǎn)來滿足正在發(fā)起的讀請求。可選值包括:
? primary: 只選擇主節(jié)點(diǎn);
? primaryPreferred:優(yōu)先選擇主節(jié)點(diǎn),如果不可用則選擇從節(jié)點(diǎn);
? secondary:只選擇從節(jié)點(diǎn);
? secondaryPreferred:優(yōu)先選擇從節(jié)點(diǎn),如果從節(jié)點(diǎn)不可用則選擇主節(jié)點(diǎn);
? nearest:選擇最近的節(jié)點(diǎn);
除此之外還可以通過標(biāo)簽來選擇讀取節(jié)點(diǎn)

2.1 readConcern

? available: 讀取所有可用數(shù)據(jù);
? local:讀取屬于當(dāng)前分片的所有可用數(shù)據(jù);
? majority:讀取大部分已提交的;
? linearizable:線性讀;
? snapshot:快照讀;

available與local的區(qū)別

在復(fù)制集下面local和available沒有區(qū)別,區(qū)別在于分片集遷移數(shù)據(jù)時,當(dāng)chunk x需要從shard1遷移到shard2上面的過程中,在shard1,shard2上面都有chunk x的數(shù)據(jù),但shard1為負(fù)責(zé)方。所有對chunk x的讀寫操作都會進(jìn)入shard1,如果指定對shard2讀取,available會讀取包含chunk x的數(shù)據(jù),而local只會讀取當(dāng)前分片負(fù)責(zé)的數(shù)據(jù)

majority

首先主節(jié)點(diǎn)寫入數(shù)據(jù),然后同步給從節(jié)點(diǎn),當(dāng)主節(jié)點(diǎn)收到大部分節(jié)點(diǎn)的寫入確認(rèn)后,再同步從節(jié)點(diǎn)數(shù)據(jù)已大部分節(jié)點(diǎn)寫入,收到的從節(jié)點(diǎn)就認(rèn)為這個數(shù)據(jù)已經(jīng)是majority的,可被majority讀取的。當(dāng)有個節(jié)點(diǎn)沒有收到數(shù)據(jù)或者majority確認(rèn),但是大部分節(jié)點(diǎn)已經(jīng)寫入了,當(dāng)前節(jié)點(diǎn)的視角是沒有該數(shù)據(jù)的

majority作用:
當(dāng)主節(jié)點(diǎn)掛掉前將數(shù)據(jù)x=1,改為x=2,數(shù)據(jù)還沒同步給從節(jié)點(diǎn),沒有設(shè)置majority,主節(jié)點(diǎn)讀到x=2,主節(jié)點(diǎn)就掛掉了,x=2就再也無法獲取,原來的就變成了臟讀。

linearizable

majority大部分時候都能保證不會出現(xiàn)臟讀,有一種特殊的情況,majority并不能很好支持。
存在server1,server2連接著node1,node1為主節(jié)點(diǎn),當(dāng)node1與其他節(jié)點(diǎn)失聯(lián),當(dāng)server2可以連接node1。這時server1通過選舉重新連接到node2作為主節(jié)點(diǎn),往node2寫入數(shù)據(jù)。但是server2卻認(rèn)為node1是主節(jié)點(diǎn),獲取不到對應(yīng)數(shù)據(jù),導(dǎo)致數(shù)據(jù)不一致。這種情況需要設(shè)置linearizable,這個配置在獲取數(shù)據(jù)前,或從其他節(jié)點(diǎn)檢查數(shù)據(jù)是否是真的最新。

三,因果一致性

有的博客包括極客時間上面提到,寫入mongo時,設(shè)置writeConcern=majority,讀取時設(shè)置readPreference=secondary,readConcern=majority就可以得到很好的性能且數(shù)據(jù)一致

其實(shí)不是的,先解釋下這樣子設(shè)置的意思,首先寫保證大部分節(jié)點(diǎn)已經(jīng)寫入,讀取時選擇從節(jié)點(diǎn),且是被大部分從節(jié)點(diǎn)有被寫入的數(shù)據(jù)才能被讀取。

這種情況會出現(xiàn)數(shù)據(jù)不一致:當(dāng)前有一個一主二從的集群,當(dāng)主節(jié)點(diǎn)與從節(jié)點(diǎn)1數(shù)據(jù)被寫入了,當(dāng)時從節(jié)點(diǎn)2沒有被寫入,在主節(jié)點(diǎn)和從節(jié)點(diǎn)1的視角看,這條數(shù)據(jù)已經(jīng)被寫入,且是mayjority的,但是如果訪問的從節(jié)點(diǎn)2視角看,是不存在這條數(shù)據(jù)的,當(dāng)我們設(shè)置readPreference=secondary時,剛好驅(qū)動剛好選擇的是從節(jié)點(diǎn)2,數(shù)據(jù)會不一致

實(shí)驗(yàn):
首先搭建一個一主二從的集群,進(jìn)入從節(jié)點(diǎn)2的shell,執(zhí)行

/*
把節(jié)點(diǎn)2鎖住,用來模擬從節(jié)點(diǎn)延遲
不要設(shè)置延遲節(jié)點(diǎn)來模擬節(jié)點(diǎn)延遲,在golang的驅(qū)動庫中就算延遲節(jié)點(diǎn)沒有設(shè)置隱藏,也不會選為讀取節(jié)點(diǎn)
*/
db.fsyncLock()

下面這段代碼有三組測試:
1,沒有使用session,會出現(xiàn)數(shù)據(jù)不一致
2,多個session,CausalConsistency為true,會出現(xiàn)不一致
3,單個session,CausalConsistency為false,會出現(xiàn)不一致,當(dāng)把SetCausalConsistency(false)這句刪除,驅(qū)動默認(rèn)為true,數(shù)據(jù)一致

所以在不考慮事務(wù)的情況,需要寫入或者修改的數(shù)據(jù)能被立馬讀到需要滿足:
1,寫與讀在同一個session
2,session的CausalConsistency為true

更多CausalConsistency的介紹:
1,https://docs.mongodb.com/manual/reference/server-sessions/
2,https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency
3,https://docs.mongodb.com/manual/core/transactions/

package main

import (
    "context"
    "fmt"
    "go.mongodb.org/mongo-driver/bson"
    "go.mongodb.org/mongo-driver/mongo"
    "go.mongodb.org/mongo-driver/mongo/options"
    "go.mongodb.org/mongo-driver/mongo/readconcern"
    "go.mongodb.org/mongo-driver/mongo/readpref"
    "go.mongodb.org/mongo-driver/mongo/writeconcern"
    "sync"
    "time"
)

func main() {
    noSessionTest()
    multiSessionTest()
    singleSessionTest()
}

func newCollection(client *mongo.Client) *mongo.Collection {
    option := options.CollectionOptions{}
    return client.Database("alex").
        Collection("test",
            option.SetWriteConcern(writeconcern.New(writeconcern.WMajority())).
                SetReadPreference(readpref.Secondary()).
                SetReadConcern(readconcern.Majority()))
}

func newClient() *mongo.Client {
    uri := "mongodb://member1.example.com:28017,member2.example.com:28018,member3.example.com:28019/admin?replicaSet=rs0"
    ctx := context.Background()
    client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
    if err != nil {
        panic(err)
    }

    err = client.Ping(ctx, nil)
    if err != nil {
        panic(err)
    }

    fmt.Println("Successfully connected and pinged.")
    return client
}

func multiSessionTest() {
    ctx := context.TODO()
    client := newClient()
    opts := options.Session().
        SetDefaultReadConcern(readconcern.Majority()).
        SetDefaultReadPreference(readpref.Secondary())

    sess, err := client.StartSession(opts)
    if err != nil {
        panic(err)
    }

    var insertId interface{}
    err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
        coll :=newCollection(client)
        insertId = insert(sessionContext,coll)
        return nil
    })

    sess, err = client.StartSession(opts)
    if err != nil {
        panic(err)
    }
    err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
        coll :=newCollection(client)
        find(sessionContext,coll,insertId)
        return nil
    })
}

func singleSessionTest()  {
    ctx := context.TODO()
    client := newClient()
    opts := options.Session().
        SetDefaultReadConcern(readconcern.Majority()).
        SetDefaultReadPreference(readpref.Secondary()).
        SetCausalConsistency(false)

    sess, err := client.StartSession(opts)
    if err != nil {
        panic(err)
    }

    var insertId interface{}
    err = mongo.WithSession(ctx, sess, func(sessionContext mongo.SessionContext) error {
        coll :=newCollection(client)
        insertId = insert(sessionContext,coll)
        find(sessionContext,coll,insertId)
        return nil
    })
}

func noSessionTest() {
    ctx := context.TODO()
    client := newClient()
    coll := newCollection(client)
    insertId := insert(ctx, coll)
    find(ctx,coll,insertId)
}

func insert(ctx context.Context, coll *mongo.Collection) (insertId interface{}) {
    res, err := coll.InsertOne(ctx, bson.M{"time": time.Now().Unix()})
    if err != nil {
        panic(err)
    }

    return res.InsertedID
}

func find(ctx context.Context, coll *mongo.Collection, insertId interface{}) {
    wait := sync.WaitGroup{}
    wait.Add(100)
    for i := 0; i < 100; i++ {
        go func() {
            result := coll.FindOne(ctx, bson.M{"_id": insertId})
            if result.Err() != nil {
                fmt.Println(result.Err())
            } else {
                fmt.Println(true)
            }
            wait.Done()
        }()
    }
    wait.Wait()
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容