NiFi 應(yīng)用

需求描述

1, 使用nifi每天跑一次,把confluence的昨天的誰寫了什么題目的記錄同步到一張新表
2, 使用superset設(shè)置一個dashboard,觀看最近兩周的每人每天的貢獻度

相關(guān)數(shù)據(jù)

confluence.CONTENT

CREATE TABLE `CONTENT` (
`CONTENTID`  bigint(20) NOT NULL ,
`HIBERNATEVERSION`  int(11) NOT NULL DEFAULT 0 ,
`CONTENTTYPE`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL ,
`TITLE`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`LOWERTITLE`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`VERSION`  int(11) NULL DEFAULT NULL ,
`CREATOR`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`CREATIONDATE`  datetime NULL DEFAULT NULL ,
`LASTMODIFIER`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`LASTMODDATE`  datetime NULL DEFAULT NULL ,
`VERSIONCOMMENT`  mediumtext CHARACTER SET utf8 COLLATE utf8_bin NULL ,
`PREVVER`  bigint(20) NULL DEFAULT NULL ,
`CONTENT_STATUS`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`PAGEID`  bigint(20) NULL DEFAULT NULL ,
`SPACEID`  bigint(20) NULL DEFAULT NULL ,
`CHILD_POSITION`  int(11) NULL DEFAULT NULL ,
`PARENTID`  bigint(20) NULL DEFAULT NULL ,
`MESSAGEID`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`PLUGINKEY`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`PLUGINVER`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`PARENTCCID`  bigint(20) NULL DEFAULT NULL ,
`DRAFTPAGEID`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`DRAFTSPACEKEY`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`DRAFTTYPE`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
`DRAFTPAGEVERSION`  int(11) NULL DEFAULT NULL ,
`PARENTCOMMENTID`  bigint(20) NULL DEFAULT NULL ,
`USERNAME`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
PRIMARY KEY (`CONTENTID`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_bin
ROW_FORMAT=DYNAMIC
;

confluence.user_mapping

CREATE TABLE `user_mapping` (
`user_key`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL ,
`username`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NOT NULL ,
`lower_username`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL ,
PRIMARY KEY (`user_key`),
UNIQUE INDEX `unq_lwr_username` (`lower_username`) USING BTREE 
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_bin
ROW_FORMAT=DYNAMIC
;

nifi_db.Commitments

CREATE TABLE `Commitments` (
`ContentId`  bigint(20) NOT NULL COMMENT '內(nèi)容ID' ,
`WeekOfYear`  varchar(255) CHARACTER SET latin1 COLLATE latin1_swedish_ci NULL DEFAULT NULL COMMENT '年份周數(shù)' ,
`Title`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '內(nèi)容抬頭' ,
`Modifier`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '更新人' ,
`LastModDate`  datetime NOT NULL COMMENT '最后更新時間' ,
`Creator`  varchar(255) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '創(chuàng)建人' ,
`CreateDate`  datetime NULL DEFAULT NULL COMMENT '創(chuàng)建時間' ,
PRIMARY KEY (`ContentId`, `LastModDate`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=latin1 COLLATE=latin1_swedish_ci
ROW_FORMAT=DYNAMIC
;

配置服務(wù)控制器(Controller Service)

DBCPForConfluence(DBCPConnectionPool)

PROPERTIES

properties values
Database Connection URL jdbc:mysql://47.96.97.244:3306/confluence?useUnicode=true&characterEncoding=utf8
Database Driver Class Name com.mysql.jdbc.Driver
Database Driver Location(s) /usr/share/java/mysql-connector-java.jar
Database User root
Password ***

DBCPForNiFi_db(DBCPConnectionPool)

PROPERTIES

properties values
Database Connection URL jdbc:mysql://gateway001:3306/nifi_db?useUnicode=true&characterEncoding=utf8
Database Driver Class Name com.mysql.jdbc.Driver
Database Driver Location(s) /usr/share/java/mysql-connector-java.jar
Database User root
Password ***

全量導(dǎo)入歷史貢獻記錄

獲取數(shù)據(jù)(ExecuteSQL)

SCHEDULING

Scheduling Strategy Timer driven
Run Schedule 1 days
Execution Primary node
Concurrent Tasks 1

PROPERTIES

Property Value
Database Connection Pooling Service DBCPForConfluence(見上文Controller Service)
SQL select query 代碼見下文
Max Wait Time 0 seconds
Normalize Table/Column Names false
Use Avro Logical Types false
Default Decimal Precision 10
Default Decimal Scale 0
SELECT
    a.CONTENTID,
    a.TITLE,
    b.USERNAME AS CREATOR,
    c.USERNAME AS MODIFIER,
    a.CREATIONDATE AS CREATEDATE,
    a.LASTMODDATE,
    YEARWEEK(a.LASTMODDATE) AS WeekOfYear
FROM
    CONTENT a
LEFT JOIN user_mapping b ON a.CREATOR = b.user_key
LEFT JOIN user_mapping c ON a.LASTMODIFIER = c.user_key
WHERE
    a.CONTENTTYPE = 'PAGE'
AND a.spaceid = 98306
AND a.title IS NOT NULL
AND a.PARENTID IS NOT NULL
ORDER BY
    WeekOfYear DESC,
    modifier DESC,
    LASTMODDATE;

格式轉(zhuǎn)化(ConvertAvroToJSON)

直接新增該Processor,默認(rèn)配置即可。

SQL生成(ConvertJSONToSQL)

PROPERTIES

properties values
JDBC Connection Pool DBCPForNiFi_db(見上文)
Statement Type INSERT
Table Name Commitments

SQL寫入(PutSQL)

PROPERTIES

Property Value
JDBC Connection Pool DBCPForNiFi_db
SQL Statement 見下文
Support Fragmented Transactions true
Transaction TimeoutNo value setBatch Size 100
Obtain Generated Keys false
Rollback On Failure true

SQL Statement

注:

該參數(shù)可為空,

當(dāng)為空時,則默認(rèn)執(zhí)行ConvertJSONToSQL處理器提供的SQL。

當(dāng)該參數(shù)不為空時,則忽略ConvertJSONToSQL處理器提供的SQL,只取其數(shù)據(jù)。

本需求場景下,此處建議置空;

REPLACE INTO Commitments (
    ContentId,
    Title,
    Creator,
    Modifier,
    CreateDate,
    LastModDate,
    WeekOfYear
)
VALUES
    (?, ?, ?, ?, ?, ?, ?)

整體流程如圖:

定期(每天)導(dǎo)入貢獻記錄

獲取數(shù)據(jù)(ExecuteSQL)

SCHEDULING

Scheduling Strategy Timer driven
Run Schedule 1 days
Execution Primary node
Concurrent Tasks 1

PROPERTIES

Property Value
Database Connection Pooling Service DBCPForConfluence(見上文Controller Service)
SQL select query 代碼見下文
Max Wait Time 0 seconds
Normalize Table/Column Names false
Use Avro Logical Types false
Default Decimal Precision 10
Default Decimal Scale 0
SELECT
    a.CONTENTID,
    a.TITLE,
    b.USERNAME AS CREATOR,
    c.USERNAME AS MODIFIER,
    a.CREATIONDATE AS CREATEDATE,
    a.LASTMODDATE,
    YEARWEEK(a.LASTMODDATE) AS WeekOfYear
FROM
    CONTENT a
LEFT JOIN user_mapping b ON a.CREATOR = b.user_key
LEFT JOIN user_mapping c ON a.LASTMODIFIER = c.user_key
WHERE
    a.CONTENTTYPE = 'PAGE'
AND a.spaceid = 98306
AND a.title IS NOT NULL
AND a.PARENTID IS NOT NULL
AND WEEK (a.CREATIONDATE) = WEEK (CURRENT_DATE())
ORDER BY
    WeekOfYear DESC,
    modifier DESC,
    LASTMODDATE;

后續(xù)處理器配置同全量導(dǎo)入即可(見上文);

整體流程如下圖:

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容