本文章主要是對開源的kafka-connect-cdc-mssql進(jìn)行編譯并集成到confluent平臺(tái)中,鑒于還不太熟悉部分平臺(tái)的功能,僅簡單介紹此次實(shí)施的步驟。因
1.安裝sql-server的docker image
安裝步驟參考
注意版本以及ubuntu的docker需要root用戶
安裝結(jié)束后使用如下命令進(jìn)入sql server客戶端操作
root@xh:# sqlcmd -S localhost -U SA -P '密碼' //注意密碼必須是大小寫加特殊字符,否則無法創(chuàng)建dcoker鏡像
- 下載并編譯源碼
使用maven進(jìn)行編譯,你需要提前下載JDBC driver,可自行搜索,下載地址,注意自己的sql-server的JDBC driver版本即可
之后maven insatll:
cd ~/Projects/IdeaProjects/kafka-connect-cdc-mssql //x項(xiàng)目所在位置
$ mvn install:install-file -DgroupId=com.microsoft.sqlserver -DartifactId=sqljdbc4 -Dversion=6.0.7130 -Dpackaging=jar -Dfile=<path to the download> //注意你自己的sqljdbc版本即可
然后編譯,因?yàn)閠est環(huán)境不同,易出錯(cuò),可使用如下命令:
$ mvn clean package -Dmaven.test.skip=true
$ cp ./target/kafka-connect-cdc-mssql-0.0.1-SNAPSHOT.jar ~/workspace/confluent-3.3.0/share/java/kafka-connect-mssql //把生成的jar包放到confluent下的/share/java/自己創(chuàng)建的文件夾下 - 啟動(dòng)kafka
首先啟動(dòng)zookeeper
cd ~/worksapce/kafka/zookeeper-3.3.6/ //zookeeper的目錄
bin/zkServer.sh start
啟動(dòng)kafka
cd .. //接上,kafka的目錄
sudo bin/kafka-server-start.sh config/server.properties
以下命令因不涉及到schema-registry,可暫時(shí)忽略不考慮
cd ~/workspace/confluent-3.3.0
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
- 配置sqlserver.properties
之后配置mssql-kafka-connector的相關(guān)文件,并在confluent下的etc中創(chuàng)建目錄sqlserver,且在該目錄下創(chuàng)建sqlserver.properties,其內(nèi)容如下:
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.cdc.mssql.MsSqlSourceConnector
#connector.class=/home/xh/workspace/confluent-3.3.0/share/java/kafka-connect-mssql/kafka-connect-cdc-mssql-0.0.1-SNAPSHOT.jar
# Set these required values
initial.database=Test
server.name=10.19.138.199
password=Xyj123456.
server.port=1433
username=SA
#topicFormat.format=kafka-mssql
change.tracking.tables=dbo.Inventory2 //注意dbo是schemaName,不要寫成Test(DatabaseName)
- 啟動(dòng)mssql-kafka-connector
$ cd ~/workspace/confluent-3.3.0
$ bin/connect-standalone etc/kafka/connect-standalone.properties etc/sqlserver/sqlserver.properties
根據(jù)錯(cuò)誤提示到相應(yīng)的行去找相應(yīng)的maven依賴(~/.m2/repository/下) 并把該依賴jar包c(diǎn)opy到~/workspace/confluent-3.3.0/share/java/kafka-connect-mssql/下,分別如下:

需要的jar包.png
注意其依賴的父類kafka-connector-cdc也需要下載下來且使用maven編譯好,同在該作者的github上。
6.在sql-server中允許database和table的change tracking
參考網(wǎng)址
命令如下:
1>ALTER DATABASE 數(shù)據(jù)庫名 SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)2>go1>ALTER TABLE 表名 ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)2>go另外會(huì)出現(xiàn)一個(gè)提示要求你允許數(shù)據(jù)庫snapshot隔離,命令如下:
1>ALTER DATABASE 數(shù)據(jù)庫名 SET ALLOW_SNAPSHOT_ISOLATION ON2>go
- 使用該mssql-kafka-connector
在sql server客戶端插入一條數(shù)據(jù),另開一個(gè)kafka消費(fèi)者的端口,可以看到相應(yīng)的binlog輸出

kafka消費(fèi)者端內(nèi)容.png
接下來要去測試吞吐量和時(shí)延了~Come on!