[TOC]
還在為開(kāi)發(fā)Flink流處理應(yīng)用程序時(shí)無(wú)法像開(kāi)發(fā)Spring Boot程序那么優(yōu)雅的分層以及裝配Bean而煩惱嗎?
你可能面臨如下苦惱:
開(kāi)發(fā)的Flink流處理應(yīng)用程序,業(yè)務(wù)邏輯全部寫(xiě)在Flink的操作符中,代碼無(wú)法服用,無(wú)法分層
要是有一天它可以像開(kāi)發(fā)Spring Boot程序那樣可以優(yōu)雅的分層,優(yōu)雅的裝配Bean,不需要自己new對(duì)象好了
可以使用各種Spring生態(tài)的框架,一些瑣碎的邏輯不再硬編碼到代碼中。
GitHub最近超火的一款開(kāi)源框架,懶松鼠Flink-Boot腳手架,該腳手架簡(jiǎn)直是Spring開(kāi)發(fā)工程師的福音,完美融合Spring生態(tài)體系,再也不需要手動(dòng)在Java類中創(chuàng)建臃腫的Java對(duì)象,簡(jiǎn)直是開(kāi)發(fā)大型流處理應(yīng)用程序的必不可少的工具。地址:懶松鼠Flink-Boot 腳手架由《深入理解Flink核心設(shè)計(jì)與實(shí)踐原理》作者開(kāi)發(fā)。

接口緩存
你的現(xiàn)狀
static Map<String,String> cache=new HashMap<String,String>();
public String findUUID(FlowData flowData) {
String value=cache.get(flowData.getSubTestItem());
if(value==null)
{
String uuid=userMapper.findUUID(flowData);
cache.put(uuid,value);
return uuid;
}
return value;
}
你想要的是這樣
@Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem")
public String findUUID(FlowData flowData) {
return userMapper.findUUID(flowData);
}
重試機(jī)制
你的現(xiàn)狀
public void insertFlow(FlowData flowData) {
try{
userMapper.insertFlow(flowData);
}Cache(Exception e)
{
Thread.sleep(10000);
userMapper.insertFlow(flowData);
}
}
你想要的是這樣
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))
@Override
public void insertFlow(FlowData flowData) {
userMapper.insertFlow(flowData);
}
Bean校驗(yàn)
你的現(xiàn)狀
if(flowData.getSubTestItem().length()<2&&flowData.getSubTestItem().length()>7)
{
return null;
}
if(flowData.getBillNumber()==null)
{
return null;
}
你想要的是這樣
Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData);
if (validate != null) {
System.out.println(validate);
return null;
}
public class FlowData {
private String uuid;
//聲明該參數(shù)的校驗(yàn)規(guī)則字符串長(zhǎng)度必須在7到20之間
@Size(min = 7, max = 20, message = "長(zhǎng)度必須在{min}-{max}之間")
private String subTestItem;
//聲明該參數(shù)的校驗(yàn)規(guī)則字符串不能為空
@NotBlank(message = "billNumber不能為空")
private String billNumber;
}
等等......
GitHub最近超火的一款開(kāi)源框架,懶松鼠Flink-Boot腳手架,該腳手架簡(jiǎn)直是Spring開(kāi)發(fā)工程師的福音,完美融合Spring生態(tài)體系,再也不需要手動(dòng)在Java類中創(chuàng)建臃腫的Java對(duì)象,簡(jiǎn)直是開(kāi)發(fā)大型流處理應(yīng)用程序的必不可少的工具。懶松鼠Flink-Boot 腳手架由《深入理解Flink核心設(shè)計(jì)與實(shí)踐原理》作者開(kāi)發(fā)。
它為流計(jì)算開(kāi)發(fā)工程師解決了
將所有對(duì)象的創(chuàng)建和依賴關(guān)系的維護(hù)工作都交給Spring容器的管理,降低了對(duì)象之間的耦合性,使代碼變得更簡(jiǎn)潔,拒絕臃腫。
消除在工程中對(duì)單例的過(guò)多使用。
聲明式事務(wù)處理,通過(guò)配置就可以完成對(duì)事物的管理,而無(wú)須手動(dòng)編程。
聲明式注解,可以通過(guò)注解定義方法的緩沖功能,無(wú)序手動(dòng)編程。
注解式定義Bean對(duì)象的校驗(yàn)規(guī)則,通過(guò)注解即可完成對(duì)對(duì)象的參數(shù)校驗(yàn),無(wú)序手動(dòng)編程。
集成MyBatis ORM框架,注解式維護(hù)實(shí)例對(duì)象的依賴關(guān)系。
解耦Flink SQL,SQL語(yǔ)句剝離出JAVA文件,以簡(jiǎn)潔的模式表現(xiàn)在XML文件中。
封裝Flink API,僅提供業(yè)務(wù)方法去編寫(xiě),Spring生態(tài)融合全部搞定,無(wú)需操心。
有了它你的代碼就像這樣子:
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版書(shū)籍《深入理解Flink核心設(shè)計(jì)與實(shí)踐原理》 隨書(shū)代碼
* RichFlatMapFunction為Flink框架的一個(gè)通用型操作符(算子),開(kāi)發(fā)者一般在該算子的flatMap方法中編寫(xiě)業(yè)務(wù)邏輯
* @auther: intsmaze(劉洋)
* @date: 2020/10/15 18:33
*/
public class MybatisFlatMap extends RichFlatMapFunction<String, String> {
private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
protected ApplicationContext beanFactory;
//mybatis的Service對(duì)象,操作數(shù)據(jù)庫(kù)的user表
private UserService userService;
@Override
public void open(Configuration parameters) {
ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters);
userService = beanFactory.getBean(UserServiceImpl.class);
}
@Override
public void flatMap(String value, Collector<String> out){
FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() {
}.getType());
Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData);
if (validate != null) {
System.out.println(validate);
return null;
}
//數(shù)據(jù)庫(kù)查詢,屏蔽掉獲取數(shù)據(jù)庫(kù)連接,是否數(shù)據(jù)庫(kù)連接,事務(wù)的聲明等
String flowUUID = userService.findUUID(flowData);
if (StringUtils.isBlank(flowUUID)) {
flowUUID = UUID.randomUUID().toString();
flowData.setUuid(flowUUID);
//數(shù)據(jù)庫(kù)插入,屏蔽掉獲取數(shù)據(jù)庫(kù)連接,是否數(shù)據(jù)庫(kù)連接,事務(wù)的聲明等
userService.insertFlow(flowData);
}
out.collect(gson.toJson(flowData));
}
}
public interface UserService {
String findUUID(FlowData flowData);
void insertFlow(FlowData flowData);
}
//通過(guò)注解實(shí)例化Bean對(duì)象。
@Service
//通過(guò)注解聲明進(jìn)行事務(wù)管理
@Transactional
//通過(guò)注解聲明方法具有異常重試機(jī)制
@EnableRetry
public class UserServiceImpl implements UserService {
//通過(guò)注解進(jìn)行依賴注入
@Resource
private UserMapper userMapper;
@Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem")
@Override
public String findUUID(FlowData flowData) {
return userMapper.findUUID(flowData);
}
//通過(guò)注解聲明該方法異常后的重試機(jī)制,無(wú)需手動(dòng)編程
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))
@Override
public void insertFlow(FlowData flowData) {
userMapper.insertFlow(flowData);
}
}
public interface UserMapper {
String findUUID(FlowData flowData);
void insertFlow(FlowData flowData);
}
//注解式聲明參數(shù)校驗(yàn)規(guī)則
public class FlowData {
private String uuid;
//聲明該參數(shù)的校驗(yàn)規(guī)則字符串長(zhǎng)度必須在7到20之間
@Size(min = 7, max = 20, message = "長(zhǎng)度必須在{min}-{max}之間")
private String subTestItem;
//聲明該參數(shù)的校驗(yàn)規(guī)則字符串不能為空
@NotBlank(message = "billNumber不能為空")
private String billNumber;
@NotBlank(message = "barcode不能為空")
private String barcode;
private String flowName;
private String flowStatus;
......
}
倉(cāng)庫(kù)地址:懶松鼠Flink-Boot
倉(cāng)庫(kù)地址:懶松鼠Flink-Boot腳手架由《深入理解Flink核心設(shè)計(jì)與實(shí)踐原理》作者開(kāi)發(fā)。

該腳手架屏蔽掉組裝Flink API細(xì)節(jié),讓跨界變得簡(jiǎn)單,使得開(kāi)發(fā)者能以傳統(tǒng)Java WEB模式的開(kāi)發(fā)方式開(kāi)發(fā)出具備分布式計(jì)算能力的流處理程序。
開(kāi)發(fā)者完全不需要理解分布式計(jì)算的理論知識(shí)和Flink框架的細(xì)節(jié),便可以快速編寫(xiě)業(yè)務(wù)代碼實(shí)現(xiàn)。
為了進(jìn)一步提升開(kāi)發(fā)者使用該腳手架開(kāi)發(fā)大型項(xiàng)目的敏捷的程度,該腳手架工程默認(rèn)集成Spring框架進(jìn)行Bean管理,同時(shí)將微服務(wù)以及WEB開(kāi)發(fā)領(lǐng)域中經(jīng)常用到的框架集成進(jìn)來(lái),進(jìn)一步提升開(kāi)發(fā)速度。
除此之外針對(duì)目前流行的各大Java框架,該Flink腳手架工程也進(jìn)行了集成,加快開(kāi)發(fā)人員的編碼速度,比如:
集成Jbcp-template對(duì)Mysql,Oracle,SQLServer等關(guān)系型數(shù)據(jù)庫(kù)的快速訪問(wèn)。
集成Hibernate Validator框架進(jìn)行參數(shù)校驗(yàn)。
集成Spring Retry框架進(jìn)行重試標(biāo)志。
集成Mybatis框架,提高對(duì)關(guān)系型數(shù)據(jù)庫(kù)增,刪,改,查的開(kāi)發(fā)速度。
集成Spring Cache框架,實(shí)現(xiàn)注解式定義方法緩存。
......
1. 組織結(jié)構(gòu)
Flink-Boot
├── Flink-Base -- Flink-Boot工程基礎(chǔ)模塊
├── Flink-Client -- Flink-Boot 客戶端模塊
├── flink-annotation -- 注解生效模塊
├── flink-mybatis -- mybatis orm模塊
├── flink-retry -- 注解重試機(jī)制模式
├── flink-validate -- 校驗(yàn)?zāi)K
├── flink-sql -- Flink SQL解耦至XML配置模塊
├── flink-cache-annotation -- 接口緩沖模塊
├── flink-junit -- 單元測(cè)試模塊
├── flink-apollo -- 阿波羅配置客戶端模塊
2. 技術(shù)選項(xiàng)和集成情況
技術(shù) | 名稱 | 狀態(tài) |
----|------|----
Spring Framework | 容器 | 已集成
Spring 基于XML方式配置Bean | 裝配Bean | 已集成
Spring 基于注解方式配置Bean | 裝配Bean | 已集成
Spring 基于注解聲明方法重試機(jī)制 | Retry注解 | 已集成
Spring 基于注解聲明方法緩存 | Cache注解 | 已集成
Hibernate Validator | 校驗(yàn)框架 | 已集成
Druid | 數(shù)據(jù)庫(kù)連接池 | 已集成
MyBatis | ORM框架 | 已集成
Kafka | 消息隊(duì)列 | 已集成
HDFS | 分布式文件系統(tǒng) | 已集成
Log4J | 日志組件 | 已集成
Junit | 單元測(cè)試 | 已集成
Mybatis-Plus | MyBatis擴(kuò)展包 | 進(jìn)行中
PageHelper | MyBatis物理分頁(yè)插件 | 進(jìn)行中
ZooKeeper | 分布式協(xié)調(diào)服務(wù) | 進(jìn)行中
Dubbo | 分布式服務(wù)框架 | 進(jìn)行中
Redis | 分布式緩存數(shù)據(jù)庫(kù) | 進(jìn)行中
Solr & Elasticsearch | 分布式全文搜索引擎 | 進(jìn)行中
Ehcache | 進(jìn)程內(nèi)緩存框架 | 進(jìn)行中
sequence | 分布式高效ID生產(chǎn) | 進(jìn)行中
Dubbole消費(fèi)者 | 服務(wù)消費(fèi)者 | 進(jìn)行中
Spring eurake消費(fèi)者 | 服務(wù)消費(fèi)者 | 進(jìn)行中
Apollo配置中心 | 攜程阿波羅配置中心 | 進(jìn)行中
Spring Config配置中心 | Spring Cloud Config配置中心 | 進(jìn)行中
3. 快速開(kāi)始
下面是集成Spring生態(tài)的基礎(chǔ)手冊(cè).
3.1 核心基礎(chǔ)工程
-
flink-base :基礎(chǔ)工程,封裝了開(kāi)發(fā)Flink工程的必須參數(shù),同時(shí)集成Spring容器,為后續(xù)集成Spring各類框架提供了支撐。
可以在本地開(kāi)發(fā)環(huán)境和Flink集群運(yùn)行環(huán)境中隨意切換。
可以在增量檢查點(diǎn)和全量檢查點(diǎn)之間隨意切換。
內(nèi)置使用HDFS作為檢查點(diǎn)的持久存儲(chǔ)介質(zhì)。
默認(rèn)使用Kafka作為數(shù)據(jù)源
內(nèi)置實(shí)現(xiàn)了任務(wù)的暫停機(jī)制-達(dá)到任務(wù)仍在運(yùn)行但不再接收Kafka數(shù)據(jù)源中的數(shù)據(jù),代替了停止任務(wù)后再重新部署任務(wù)這一繁瑣流程。
flink-client:業(yè)務(wù)工程,該工程依賴flink-base工程,開(kāi)發(fā)任務(wù)在該工程中進(jìn)行業(yè)務(wù)邏輯的開(kāi)發(fā)。
3.2 Spring容器
該容器模式配置了JdbcTemplate實(shí)例,數(shù)據(jù)庫(kù)連接池采用Druid,在業(yè)務(wù)方法中只需要獲取容器中的JdbcTemplate實(shí)例便可以快速與關(guān)系型數(shù)據(jù)庫(kù)進(jìn)行交互,dataService實(shí)例封裝了一些訪問(wèn)數(shù)據(jù)庫(kù)表的方法。
topology-base.xml
<beans ......
default-lazy-init="true" default-init-method="init">
<context:property-placeholder location="classpath:config.properties"/>
<bean id="druidDataSource" class="com.alibaba.druid.pool.DruidDataSource">
<property name="driverClassName" value="com.mysql.jdbc.Driver"></property>
<property name="url"
value="${jdbc.url}"></property>
<property name="username" value="${jdbc.user}"></property>
<property name="password" value="${jdbc.password}"></property>
</bean>
<bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
<constructor-arg ref="druidDataSource"></constructor-arg>
</bean>
<bean id="dataService" class="com.intsmaze.flink.base.service.DataService">
<property name="jdbcTemplate" ref="jdbcTemplate"></property>
</bean>
</beans>
config.properties
jdbc.user = intsmaze
jdbc.password = intsmaze
jdbc.url = jdbc:mysql://127.0.0.1:3306/flink-boot?useUnicode=true&characterEncoding=UTF-8
3.3 啟動(dòng)類示例
如下是SimpleClient(com.intsmaze.flink.client.SimpleClient)類的示例代碼,該類繼承了BaseFlink,可以看到對(duì)應(yīng)實(shí)現(xiàn)的方法中分別設(shè)置如下:
public String getTopoName():定義本作業(yè)的名稱。
public String getConfigName():定義本作業(yè)需要讀取的spring配置文件的名稱
public String getPropertiesName():定義本作業(yè)需要讀取的properties配置文件的名稱。
public void createTopology(StreamExecutionEnvironment builder):構(gòu)造本作業(yè)的拓?fù)浣Y(jié)構(gòu)。
/**
* github地址: https://github.com/intsmaze
* 博客地址:https://www.cnblogs.com/intsmaze/
* 出版書(shū)籍《深入理解Flink核心設(shè)計(jì)與實(shí)踐原理》 隨書(shū)代碼
*
* @auther: intsmaze(劉洋)
* @date: 2020/10/15 18:33
*/
public class SimpleClient extends BaseFlink {
public static void main(String[] args) throws Exception {
SimpleClient topo = new SimpleClient();
topo.run(ParameterTool.fromArgs(args));
}
@Override
public String getTopoName() {
return "SimpleClient";
}
@Override
public String getConfigName() {
return "topology-base.xml";
}
@Override
public String getPropertiesName() {
return "config.properties";
}
@Override
public void createTopology(StreamExecutionEnvironment builder) {
DataStream<String> inputDataStrem = env.addSource(new SimpleDataSource());
DataStream<String> processDataStream = inputDataStrem.flatMap(new SimpleFunction());
processDataStream.print("輸出結(jié)果");
}
}
3.4 數(shù)據(jù)源
采用自定義數(shù)據(jù)源,用戶需要編寫(xiě)自定義DataSource類,該類需要繼承XXX抽象類,實(shí)現(xiàn)如下方法。
public abstract void open(StormBeanFactory beanFactory):獲取本作業(yè)在Spring配置文件中配置的bean對(duì)象。
public abstract String sendMessage():本作業(yè)spout生成數(shù)據(jù)的方法,在該方法內(nèi)編寫(xiě)業(yè)務(wù)邏輯產(chǎn)生源數(shù)據(jù),產(chǎn)生的數(shù)據(jù)以String類型進(jìn)行返回。
public class SimpleDataSource extends CommonDataSource {
private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
......
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
...//構(gòu)造讀取各類外部系統(tǒng)數(shù)據(jù)的連接實(shí)例
}
@Override
public String sendMess() throws InterruptedException {
Thread.sleep(1000);
......
MainData mainData = new MainData();
......//通過(guò)外部系統(tǒng)數(shù)據(jù)的連接實(shí)例讀取外部系統(tǒng)數(shù)據(jù),封裝進(jìn)MainData對(duì)象中,然后返回即可。
return gson.toJson(mainData);
}
}
3.5 業(yè)務(wù)邏輯實(shí)現(xiàn)
本作業(yè)計(jì)算的業(yè)務(wù)邏輯在Flink轉(zhuǎn)換操作符中進(jìn)行實(shí)現(xiàn),一般來(lái)說(shuō)開(kāi)發(fā)者只需要實(shí)現(xiàn)flatMap算子即可以滿足大部分算子的使用。
用戶編寫(xiě)的自定義類需要繼承com.intsmaze.flink.base.transform.CommonFunction抽象類,均需實(shí)現(xiàn)如下方法。
- public abstract String execute(String message):本作業(yè)業(yè)務(wù)邏輯計(jì)算的方法,參數(shù)message為Kafka主題中讀取過(guò)來(lái)的參數(shù),默認(rèn)參數(shù)為String類型,如果需要將處理的數(shù)據(jù)發(fā)送給Kakfa主題中,則要通過(guò)return將處理的數(shù)據(jù)返回即可。
public class SimpleFunction extends CommonFunction {
private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();
@Override
public String execute(String message) throws Exception {
FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() {
}.getType());
String flowUUID = dataService.findUUID(flowData);
if (StringUtils.isBlank(flowUUID)) {
flowUUID = UUID.randomUUID().toString();
flowData.setUuid(flowUUID);
dataService.insertFlow(flowData);
}
return gson.toJson(flowData);
}
}
CommonFunction
CommonFunction抽象類中默認(rèn)在open方法中通過(guò)BeanFactory對(duì)象獲取到了Spring容器中對(duì)于的dataService實(shí)例,對(duì)于Spring中的其他實(shí)例同理在SimpleFunction類中的open方法中獲取即可。
public abstract class CommonFunction extends RichFlatMapFunction<String, String> {
private IntCounter numLines = new IntCounter();
protected DataService dataService;
protected ApplicationContext beanFactory;
@Override
public void open(Configuration parameters) {
getRuntimeContext().addAccumulator("num-FlatMap", this.numLines);
ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()
.getExecutionConfig().getGlobalJobParameters();
beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters);
dataService = beanFactory.getBean(DataService.class);
}
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
this.numLines.add(1);
String execute = execute(value);
if (StringUtils.isNotBlank(execute)) {
out.collect(execute);
}
}
public abstract String execute(String message) throws Exception;
}
可以根據(jù)情況選擇重寫(xiě)open(Configuration parameters)方法,同時(shí)重寫(xiě)的open(Configuration parameters)方法的第一行要調(diào)用父類的open(Configuration parameters)方法。
public void open(Configuration parameters){
super.open(parameters);
......
//獲取在Spring配置文件中配置的實(shí)例
XXX xxx=beanFactory.getBean(XXX.class);
}
3.6 集群/本地運(yùn)行
在自定義的Topology類編寫(xiě)Main方法,創(chuàng)建自定義的Topology對(duì)象后,調(diào)用對(duì)象的run(...)方法。
public class SimpleClient extends BaseFlink {
/**
* 本地啟動(dòng)參數(shù) -isLocal local
* 集群?jiǎn)?dòng)參數(shù) -isIncremental isIncremental
*/
public static void main(String[] args) throws Exception {
SimpleClient topo = new SimpleClient();
topo.run(ParameterTool.fromArgs(args));
}
.......