etl engine 實現(xiàn) redis與mysql之間的數(shù)據(jù)同步

etl engine 實現(xiàn) redis與mysql之間的數(shù)據(jù)同步

Redis是一個開源的使用C語言編寫、支持網(wǎng)絡(luò)、可基于內(nèi)存亦可持久化的日志型、Key-Value數(shù)據(jù)庫,
因其讀取速度快、也可用于消息隊列使用等場景,已經(jīng)成為項目中不可缺少的一部分。
本案例是通過etl engine實現(xiàn)redis與mysql之間的數(shù)據(jù)同步。

需求

讀redis寫mysql; 讀mysql寫redis

前置條件

事先準備一個可讀寫redis服務(wù)器;一個可讀寫mysql服務(wù)器;
讀redis的key寫到mysql的t_redis_info表;讀mysql的t_redis_info表記錄寫到redis

  • MySQL模擬數(shù)據(jù)

CREATE TABLE t_redis_info (
  id VARCHAR(32) NOT NULL,
  caption VARCHAR(50),
  tag VARCHAR(50),
  memo VARCHAR(100),
  writetime VARCHAR(19),
  PRIMARY KEY (id)
);
INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('1','herbin_beer_550','啤酒','哈爾濱雪花550ML','2023-01-01 11:12:13');
INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('2','qingdao_beer_550','啤酒','青島純生550ML','2023-01-02 01:02:03');
INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('3','qingdao_beer_330','啤酒','青島干啤330ML','2023-02-03 01:02:03');
INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('4','herbin_beer_330','啤酒','哈爾濱勇闖天涯330ML','2023-02-03 01:02:03');
INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('5','budweiser_beer_330','啤酒','美國百威330ML','2023-03-04 01:02:03');
INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('6','wahaha_water_600','純凈水','娃哈哈600ML','2023-03-04 01:02:03');
INSERT INTO t_redis_info(id,caption,tag,memo,writetime) VALUES ('7','nongfushanquan_water_600','純凈水','農(nóng)夫山泉600ML','2023-03-05 01:02:03');

配置模型圖

redis與mysql同步.png

配置文件內(nèi)容

<?xml version="1.0" encoding="UTF-8"?>
<Graph runMode="1">

    <Node id="DB_INPUT_TABLE_1" type="DB_INPUT_TABLE" fetchSize="1000" dbConnection="CONNECT_1"  desc="讀數(shù)據(jù)表" >
        <Script name="sqlScript">
            <![CDATA[ SELECT caption AS k  ,CONCAT(id,';',caption,';',memo,';', tag) AS v  FROM t_redis_info]]>
        </Script>
    </Node>
    <Node id="DB_OUTPUT_TABLE_1" type="DB_OUTPUT_TABLE"  dbConnection="CONNECT_1"  outputFields="id;caption;memo;tag;writetime" renameOutputFields="id;caption;memo;tag;writetime"     desc="寫數(shù)據(jù)表" >
        <Script name="sqlScript">
            <![CDATA[INSERT INTO t_redis_info (id,caption,memo,tag,writetime) VALUES(?,?,?,?,?);]]>
        </Script>
        <BeforeOut>
            <![CDATA[package ext
import (
    "errors"
    "fmt"
    "strconv"
    "strings"
    "time"
    "github.com/tidwall/gjson"
    "github.com/tidwall/sjson"
    "etl-engine/etl/tool/extlibs/common"
)
func RunScript(dataValue string) (result string, topErr error) {
    newRows := ""
    rows := gjson.Get(dataValue, "rows")
    for index, row := range rows.Array() {
      
        //增加一個字段名稱為id的列
        tmpStr, _ := sjson.Set(row.String(), "id",  common.GetUUID() )
        
        //將系統(tǒng)默認輸出的value字段拆分,并創(chuàng)建多個字段
        values := gjson.Get(row.String(),"value").String()
        vArr := strings.Split(values, ";")
        caption := vArr[1]
        memo := vArr[2]
        tag := vArr[3]
        tmpStr, _ = sjson.Set(tmpStr, "caption",  caption )
        
        tmpStr, _ = sjson.Set(tmpStr, "memo",  memo )
        tmpStr, _ = sjson.Set(tmpStr, "tag",  tag )
            tmpStr, _ = sjson.Set(tmpStr, "writetime", time.Now().Format("2006-01-02 15:04:05"))
        common.GetLogger().Infoln("新行數(shù)據(jù)結(jié)構(gòu)tmpStr:",tmpStr)
        newRows, _ = sjson.SetRaw(newRows, "rows.-1", tmpStr)
    }
    return newRows, nil
}]]>
        </BeforeOut>
    </Node>
    <Node id="REDIS_WRITER_1" type="REDIS_WRITER" nameServer="127.0.0.1:16379" password="******" db="1" isGetTTL="false" patternMatchKey="true" outputFields="k;v" renameOutputFields="key;value" desc="寫redis"  />
    <Node id="REDIS_READER_1" type="REDIS_READER" nameServer="127.0.0.1:16379" password="******" db="1" isGetTTL="false" patternMatchKey="true" keys="*" desc="讀redis"  />
    <Line from="DB_INPUT_TABLE_1" to="REDIS_WRITER_1" type="STANDARD" order="0" metadata="METADATA_1"   id="LINE_STANDARD_1"/>
    <Line from="REDIS_READER_1" to="DB_OUTPUT_TABLE_1" type="STANDARD" order="1" metadata="METADATA_2"   id="LINE_STANDARD_2"/>
    <Metadata id="METADATA_2" >
        <Field name="id" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="caption" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="memo" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="tag" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="writetime" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
    </Metadata>
    <Metadata id="METADATA_1" >
        <Field name="key" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
        <Field name="value" type="string" default="" nullable="true" errDefault="" dataFormat="" dataLen=""/>
    </Metadata>
    <Connection   id="CONNECT_1" type="MYSQL" dbURL="127.0.0.1:3306" database="db1" username="root" password="******"  />
</Graph>

總結(jié)主要配置環(huán)節(jié)

  1. 配置串行執(zhí)行任務(wù)
    Graph標簽中 設(shè)置 runMode="1" ,使下面兩個任務(wù)流可以按order配置的順序執(zhí)行。
  2. 畫兩個任務(wù)流
    兩個連接線中order屬性分別設(shè)置0 和 1,任務(wù)執(zhí)行行先執(zhí)行order為0的任務(wù),再執(zhí)行order為1的任務(wù)。
    第1個任務(wù)流(讀mysql -> 寫redis)
    第2個任務(wù)流(讀redis -> 寫mysql)
  3. 第1個任務(wù)流
  • 讀數(shù)據(jù)表節(jié)點設(shè)置
    script 屬性
    SELECT caption AS k ,CONCAT(id,';',caption,';',memo,';', tag) AS v FROM t_redis_info
    caption為redis中的鍵名稱,組合的v為redis中的鍵值內(nèi)容.
  • 寫redis節(jié)點設(shè)置
    patternMatchKey="true"
    outputFields 設(shè)置 k;v
    renameOutputFields 設(shè)置key;value
    系統(tǒng)默認會為redis的輸出數(shù)據(jù)流生成key和value兩個字段的數(shù)據(jù)結(jié)構(gòu)
  • 創(chuàng)建元數(shù)據(jù)
    METADATA_0 結(jié)構(gòu)是兩個字段 key和value
    連接線中order屬性設(shè)置0 ,元數(shù)據(jù)選擇 METADATA_0
    該元數(shù)據(jù)用于寫redis節(jié)點輸出數(shù)據(jù)流時使用。
    1. 第2個任務(wù)流
    • 讀redis節(jié)點設(shè)置
      patternMatchKey="true"
      keys="*"
    • 寫數(shù)據(jù)表節(jié)點設(shè)置
      script 屬性
      INSERT INTO t_redis_info (id,caption,memo,tag,writetime) VALUES(?,?,?,?,?);
      outputFields 設(shè)置
      id;caption;memo;tag;writetime
      注意,通過嵌入go腳本來重新處理輸入數(shù)據(jù)流中的各字段,因此outputFields中設(shè)置的字段名稱要跟腳本中創(chuàng)建的字段名稱相符
      renameOutputFields 設(shè)置
      id;caption;memo;tag;writetime
      注意outputFields和renameOutputFields字段個數(shù)保持一致
    • 嵌入go腳本,增加一個字段名稱為id,調(diào)用了內(nèi)置函數(shù)生成uuid
      BeforeOut標簽中嵌入go腳本,目的是將輸入數(shù)據(jù)流結(jié)構(gòu)轉(zhuǎn)換成目標表中的各字段結(jié)構(gòu)。
    • 創(chuàng)建元數(shù)據(jù)METADATA_1 結(jié)構(gòu)是5個字段 id,caption,memo,tag,writetime
      連接線中order屬性設(shè)置1 ,元數(shù)據(jù)選擇 METADATA_1
      該元數(shù)據(jù)用于寫數(shù)據(jù)表節(jié)點輸出數(shù)據(jù)流時使用。

輸出結(jié)果圖

image.png
image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容