JanusGraph---Transaction Log

事務(wù)日志

記錄事務(wù)日志

tx = graph.buildTransaction().logIdentifier('addedPerson').start()
u = tx.addVertex(label, 'human')
u.property('name', 'proteros')
u.property('age', 36)
tx.commit()
  • 添加事務(wù)日志,使用addedPerson標(biāo)識(shí)。
  • 事務(wù)中改變都會(huì)被記錄到用戶日志系統(tǒng)并以id區(qū)別。
  • 日志保存在存儲(chǔ)后端中。

處理日志log processor framework

  • 使用日志處理器對(duì)特定的事務(wù)日志(指定要處理的事務(wù)日志的id標(biāo)識(shí))進(jìn)行處理
  • 為addedPerson事務(wù)日志配置日志處理器進(jìn)行處理
import java.util.concurrent.atomic.*;
import org.janusgraph.core.log.*;
import java.util.concurrent.*;
//打開日志處理器
logProcessor = JanusGraphFactory.openTransactionLog(g);
totalHumansAdded = new AtomicInteger(0);
totalGodsAdded = new AtomicInteger(0);
//指定要處理的事務(wù)日志名稱
logProcessor.addLogProcessor("addedPerson").
//事務(wù)處理器名稱
        setProcessorIdentifier("addedPersonCounter").
//事務(wù)處理器讀取日志的位置(通過時(shí)間指定)
        setStartTime(System.currentTimeMillis(), TimeUnit.MILLISECONDS).
//添加ChangeProcessor處理器1進(jìn)行處理
        addProcessor(new ChangeProcessor() {
            @Override
            public void process(JanusGraphTransaction tx, TransactionId txId, ChangeState changeState) {
                for (v in changeState.getVertices(Change.ADDED)) {
                    if (v.label().equals("human")) totalHumansAdded.incrementAndGet();
                }
            }
        }).
//添加ChangeProcessor處理器2處理:多個(gè)處理器并發(fā)執(zhí)行
        addProcessor(new ChangeProcessor() {
            @Override
            public void process(JanusGraphTransaction tx, TransactionId txId, ChangeState changeState) {
                for (v in changeState.getVertices(Change.ADDED)) {
                    if (v.label().equals("god")) totalGodsAdded.incrementAndGet();
                }
            }
        }).
        build();
  • addedPersonCounter日志處理器順序讀取addedPerson事務(wù)日志記錄的日志,但是注冊(cè)的兩個(gè)ChangeProcessor處理器會(huì)并發(fā)執(zhí)行讀取到的每一條日志。
  • 日志處理的的標(biāo)識(shí)addedPersonCounter有重要作用:定期維護(hù)日志處理器的狀態(tài)。即其會(huì)在最后一個(gè)日志記錄上打上一個(gè)標(biāo)記,如果日志處理器在執(zhí)行很長時(shí)候掛掉了,當(dāng)其重新開始并使用相同的名稱addedPersonCounter,那么其就會(huì)從上次最后標(biāo)記的日志位置開始讀取。

ChangeProcessor接口

  • 其有一個(gè)process方法
public void process(JanusGraphTransaction tx, TransactionId txId, ChangeState changeState) {
    for (v in changeState.getVertices(Change.ADDED)) {
         if (v.label().equals("god")) totalGodsAdded.incrementAndGet();
     }
 }
  • JanusGraphTransaction代表正常的圖事務(wù),可以利用執(zhí)行正常的操作。
  • TransactionId txId:事務(wù)id值,通過其和獲得產(chǎn)生日志的原事務(wù)實(shí)例id,和執(zhí)行原事務(wù)的圖對(duì)象實(shí)例JanusGraph instance 。
    • 原文:The provided transaction id can be used to investigate the origin of the transaction which is uniquely identified by the combination of the id of the JanusGraph instance that executed the transaction (txId.getInstanceId()) and the instance specific transaction id (txId.getTransactionId()).

Transaction Log Use Cases

Record of Change

  • 統(tǒng)計(jì)改變記錄

Downstream Updates

  • 事務(wù)日志和日志處理框架提供了一些必要工具,用來將一些改變信息通知到其它系統(tǒng)。

Triggers

  • 在原事務(wù)中不建議實(shí)現(xiàn)觸發(fā)器(慢呀),而是通過日志處理框架在稍后處理觸發(fā)器。

Log Configuration

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、平臺(tái)架構(gòu) 1.1 接入層 1.1.1 設(shè)備采集(IoT)數(shù)據(jù)采集(DAQ),是指從傳感器和其它待測設(shè)備等模擬和...
    玄鳥西閱讀 3,409評(píng)論 0 4
  • Kafka高級(jí)特性解析(三) 物理存儲(chǔ) 日志存儲(chǔ)概述 Kafka 消息是以主題為單位進(jìn)行歸類,各個(gè)主題之間是彼此獨(dú)...
    奮斗的蛐蛐閱讀 791評(píng)論 0 0
  • 事務(wù)不是一個(gè)天然存在的東西,它是被人為創(chuàng)造出來的,目的是簡化應(yīng)用層的編程模型。有了事務(wù),應(yīng)用程序不用考慮并發(fā)或各種...
    wangshanshi閱讀 747評(píng)論 0 1
  • 上節(jié)回顧及尾巴 =============== 聚集索引 與 輔助索引 區(qū)別 1. 聚集索引 只能有一個(gè),輔助索引...
    張鑫澤_2109閱讀 429評(píng)論 0 0
  • 我是黑夜里大雨紛飛的人啊 1 “又到一年六月,有人笑有人哭,有人歡樂有人憂愁,有人驚喜有人失落,有的覺得收獲滿滿有...
    陌忘宇閱讀 8,814評(píng)論 28 54

友情鏈接更多精彩內(nèi)容