65_數(shù)據(jù)建模實戰(zhàn)_基于document鎖實現(xiàn)悲觀鎖并發(fā)控制

<meta charset="utf-8">

65_數(shù)據(jù)建模實戰(zhàn)_基于document鎖實現(xiàn)悲觀鎖并發(fā)控制

1、對document level鎖,詳細的講解

全局鎖,一次性就鎖整個index,對這個index的所有增刪改操作都會被block住,如果上鎖不頻繁,還可以,比較簡單

細粒度的一個鎖,document鎖,顧名思義,每次就鎖你要操作的,你要執(zhí)行增刪改的那些doc,doc鎖了,其他線程就不能對這些doc執(zhí)行增刪改操作了

但是你只是鎖了部分doc,其他線程對其他的doc還是可以上鎖和執(zhí)行增刪改操作的

document鎖,是用腳本進行上鎖,

要先創(chuàng)建一個judge-lock.groovy文件放入es的config文件夾下,在上鎖語法中調(diào)用即可

image.png
POST /fs/lock/1/_update { "upsert": { "process_id": 123 }, "script": "if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';" "params": { "process_id": 123 } }

語法詳解:

/fs/lock,是固定的,就是說fs下的lock type,專門用于進行上鎖

/fs/lock/id,比如1,id其實就是你要上鎖的那個doc的id,代表了某個doc數(shù)據(jù)對應(yīng)的lock(也是一個doc)

_update + upsert:執(zhí)行upsert操作

params,里面有個process_id,process_id,是你的要執(zhí)行增刪改操作的進程的唯一id,比如說可以在java系統(tǒng),啟動的時候,給你的每個線程都用UUID自動生成一個thread id,你的系統(tǒng)進程啟動的時候給整個進程也分配一個UUID。process_id + thread_id就代表了某一個進程下的某個線程的唯一標(biāo)識??梢宰约河肬UID生成一個唯一ID

process_id很重要,會在lock中,設(shè)置對對應(yīng)的doc加鎖的進程的id,這樣其他進程過來的時候,才知道,這條數(shù)據(jù)已經(jīng)被別人給鎖了

assert false,不是當(dāng)前進程加鎖的話,則拋出異常

ctx.op='noop',不做任何修改

如果該document之前沒有被鎖,/fs/lock/1之前不存在,也就是doc id=1沒有被別人上過鎖; upsert的語法,那么執(zhí)行index操作,創(chuàng)建一個/fs/lock/id這條數(shù)據(jù),而且用params中的數(shù)據(jù)作為這個lock的數(shù)據(jù)。process_id被設(shè)置為123,script不執(zhí)行。這個時候象征著process_id=123的進程已經(jīng)鎖了一個doc了。

如果document被鎖了,就是說/fs/lock/1已經(jīng)存在了,代表doc id=1已經(jīng)被某個進程給鎖了。那么執(zhí)行update操作,script,此時會比對process_id,如果相同,就是說,某個進程,之前鎖了這個doc,然后這次又過來,就可以直接對這個doc執(zhí)行操作,說明是該進程之前鎖的doc,則不報錯,不執(zhí)行任何操作,返回success; 如果process_id比對不上,說明doc被其他doc給鎖了,此時報錯

/fs/lock/1

{

"process_id": 123

}

情況一; 如果兩個process_id相同,說明是一個進程先加鎖,然后又過來嘗試加鎖,可能是要執(zhí)行另外一個操作,此時就不會block,對同一個process_id是不會block,ctx.op= 'noop',什么都不做,返回一個success

情況二: process_id不相等,說明這個doc之前已經(jīng)被別人上鎖了,process_id=123上鎖了; process_id=234過來再次嘗試上鎖,失敗,assert false,就會報錯

此時遇到報錯的process,就應(yīng)該嘗試重新上鎖,直到上鎖成功

有報錯的話,如果有些doc被鎖了,那么需要重試

直到所有鎖定都成功,執(zhí)行自己的操作。。。

釋放所有的鎖

2、上document鎖的完整實驗過程

2.1 上鎖語句:我們在script中創(chuàng)建一個文件,寫入如下語法,代表悲觀鎖的并發(fā)控制.

scripts/judge-lock.groovy: if ( ctx._source.process_id != process_id ) { assert false }; ctx.op = 'noop';

2.2 進行process_id 對doc1進行上鎖

POST /fs/lock/1/_update { "upsert": { "process_id": 123 }, "script": { "lang": "groovy", "file": "judge-lock", "params": { "process_id": 123 } } }

上鎖成功

{ "_index": "fs", "_type": "lock", "_id": "1", "_version": 1, "result": "created", "_shards": { "total": 2, "successful": 1, "failed": 0 } }

2.3獲得當(dāng)前鎖的信息

GET /fs/lock/1

查詢信息,即process_id:123

{ "_index": "fs", "_type": "lock", "_id": "1", "_version": 1, "found": true, "_source": { "process_id": 123 } }

2.4此時另外一個進程來對doc1文件更新,會報錯

POST /fs/lock/1/_update { "upsert": { "process_id": 234 }, "script": { "lang": "groovy", "file": "judge-lock", "params": { "process_id": 234 } } }

如下報錯信息

{ "error": { "root_cause": [ { "type": "remote_transport_exception", "reason": "[4onsTYV][127.0.0.1:9300][indices:data/write/update[s]]" } ], "type": "illegal_argument_exception", "reason": "failed to execute script", "caused_by": { "type": "script_exception", "reason": "error evaluating judge-lock", "caused_by": { "type": "power_assertion_error", "reason": "assert false\n" }, "script_stack": [], "script": "", "lang": "groovy" } }, "status": 400 }

2.5而原進程對文件重新加鎖即process_id相同則返回noop,即不做任何操作

POST /fs/lock/1/_update { "upsert": { "process_id": 123 }, "script": { "lang": "groovy", "file": "judge-lock", "params": { "process_id": 123 } } }

如下信息返回noop

{ "_index": "fs", "_type": "lock", "_id": "1", "_version": 1, "result": "noop", "_shards": { "total": 0, "successful": 0, "failed": 0 } }

2.6原進程對文件進行更新,成功

POST /fs/file/1/_update { "doc": { "name": "README1.txt" } }

成功信息

{ "_index": "fs", "_type": "file", "_id": "1", "_version": 4, "result": "updated", "_shards": { "total": 2, "successful": 1, "failed": 0 } }

2.7更新成功后,進行刷新操作

POST /fs/_refresh

2.8查詢鎖信息操作

GET /fs/lock/_search?scroll=1m { "query": { "term": { "process_id": 123 } } }

獲取信息如下

{ "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBQAAAAAAACPkFjRvbnNUWVZaVGpHdklqOV9zcFd6MncAAAAAAAAj5RY0b25zVFlWWlRqR3ZJajlfc3BXejJ3AAAAAAAAI-YWNG9uc1RZVlpUakd2SWo5X3NwV3oydwAAAAAAACPnFjRvbnNUWVZaVGpHdklqOV9zcFd6MncAAAAAAAAj6BY0b25zVFlWWlRqR3ZJajlfc3BXejJ3", "took": 51, "timed_out": false, "_shards": { "total": 5, "successful": 5, "failed": 0 }, "hits": { "total": 1, "max_score": 1, "hits": [ { "_index": "fs", "_type": "lock", "_id": "1", "_score": 1, "_source": { "process_id": 123 } } ] } }

2.9批量刪除鎖操作

PUT /fs/lock/_bulk { "delete": { "_id": 1}}

刪除鎖信息如下

{ "took": 20, "errors": false, "items": [ { "delete": { "found": true, "_index": "fs", "_type": "lock", "_id": "1", "_version": 2, "result": "deleted", "_shards": { "total": 2, "successful": 1, "failed": 0 }, "status": 200 } } ] }

2.10此時另外一個線程再對此文件上鎖就成功了

POST /fs/lock/1/_update { "upsert": { "process_id": 234 }, "script": { "lang": "groovy", "file": "judge-lock", "params": { "process_id": 234 } } }

process_id=234上鎖就成功了

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容