再也不擔(dān)心寫(xiě)出臃腫的Flink流處理程序啦,發(fā)現(xiàn)一款將Flink與Spring生態(tài)完美融合的腳手架工程-懶松鼠Flink-Boot

[TOC]

還在為開(kāi)發(fā)Flink流處理應(yīng)用程序時(shí)無(wú)法像開(kāi)發(fā)Spring Boot程序那么優(yōu)雅的分層以及裝配Bean而煩惱嗎?

你可能面臨如下苦惱:

  1. 開(kāi)發(fā)的Flink流處理應(yīng)用程序,業(yè)務(wù)邏輯全部寫(xiě)在Flink的操作符中,代碼無(wú)法服用,無(wú)法分層

  2. 要是有一天它可以像開(kāi)發(fā)Spring Boot程序那樣可以優(yōu)雅的分層,優(yōu)雅的裝配Bean,不需要自己new對(duì)象好了

  3. 可以使用各種Spring生態(tài)的框架,一些瑣碎的邏輯不再硬編碼到代碼中。

GitHub最近超火的一款開(kāi)源框架,懶松鼠Flink-Boot腳手架,該腳手架簡(jiǎn)直是Spring開(kāi)發(fā)工程師的福音,完美融合Spring生態(tài)體系,再也不需要手動(dòng)在Java類中創(chuàng)建臃腫的Java對(duì)象,簡(jiǎn)直是開(kāi)發(fā)大型流處理應(yīng)用程序的必不可少的工具。地址:懶松鼠Flink-Boot 腳手架由《深入理解Flink核心設(shè)計(jì)與實(shí)踐原理》作者開(kāi)發(fā)。

image

接口緩存

你的現(xiàn)狀


static Map<String,String> cache=new HashMap<String,String>();

public String findUUID(FlowData flowData) {

    String value=cache.get(flowData.getSubTestItem());

    if(value==null)

    {

        String uuid=userMapper.findUUID(flowData);

        cache.put(uuid,value);

        return uuid;

    }

    return value;

}

你想要的是這樣


@Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem")

public String findUUID(FlowData flowData) {

    return userMapper.findUUID(flowData);

}

重試機(jī)制

你的現(xiàn)狀


public void insertFlow(FlowData flowData) {

    try{

        userMapper.insertFlow(flowData);

      }Cache(Exception e)

      {

        Thread.sleep(10000);

        userMapper.insertFlow(flowData);

      }

}

你想要的是這樣


    @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))

    @Override

    public void insertFlow(FlowData flowData) {

        userMapper.insertFlow(flowData);

    }

Bean校驗(yàn)

你的現(xiàn)狀


if(flowData.getSubTestItem().length()<2&&flowData.getSubTestItem().length()>7)

{

    return null;

}

if(flowData.getBillNumber()==null)

{

    return null;

}

你想要的是這樣


Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData);

if (validate != null) {

    System.out.println(validate);

    return null;

}

public class FlowData {

    private String uuid;

    //聲明該參數(shù)的校驗(yàn)規(guī)則字符串長(zhǎng)度必須在7到20之間

    @Size(min = 7, max = 20, message = "長(zhǎng)度必須在{min}-{max}之間")

    private String subTestItem;

    //聲明該參數(shù)的校驗(yàn)規(guī)則字符串不能為空

    @NotBlank(message = "billNumber不能為空")

    private String billNumber;

}

等等......

GitHub最近超火的一款開(kāi)源框架,懶松鼠Flink-Boot腳手架,該腳手架簡(jiǎn)直是Spring開(kāi)發(fā)工程師的福音,完美融合Spring生態(tài)體系,再也不需要手動(dòng)在Java類中創(chuàng)建臃腫的Java對(duì)象,簡(jiǎn)直是開(kāi)發(fā)大型流處理應(yīng)用程序的必不可少的工具。懶松鼠Flink-Boot 腳手架由《深入理解Flink核心設(shè)計(jì)與實(shí)踐原理》作者開(kāi)發(fā)。

它為流計(jì)算開(kāi)發(fā)工程師解決了

  1. 將所有對(duì)象的創(chuàng)建和依賴關(guān)系的維護(hù)工作都交給Spring容器的管理,降低了對(duì)象之間的耦合性,使代碼變得更簡(jiǎn)潔,拒絕臃腫。

  2. 消除在工程中對(duì)單例的過(guò)多使用。

  3. 聲明式事務(wù)處理,通過(guò)配置就可以完成對(duì)事物的管理,而無(wú)須手動(dòng)編程。

  4. 聲明式注解,可以通過(guò)注解定義方法的緩沖功能,無(wú)序手動(dòng)編程。

  5. 注解式定義Bean對(duì)象的校驗(yàn)規(guī)則,通過(guò)注解即可完成對(duì)對(duì)象的參數(shù)校驗(yàn),無(wú)序手動(dòng)編程。

  6. 集成MyBatis ORM框架,注解式維護(hù)實(shí)例對(duì)象的依賴關(guān)系。

  7. 解耦Flink SQL,SQL語(yǔ)句剝離出JAVA文件,以簡(jiǎn)潔的模式表現(xiàn)在XML文件中。

  8. 封裝Flink API,僅提供業(yè)務(wù)方法去編寫(xiě),Spring生態(tài)融合全部搞定,無(wú)需操心。

有了它你的代碼就像這樣子:


/**

* github地址: https://github.com/intsmaze

* 博客地址:https://www.cnblogs.com/intsmaze/

* 出版書(shū)籍《深入理解Flink核心設(shè)計(jì)與實(shí)踐原理》 隨書(shū)代碼

* RichFlatMapFunction為Flink框架的一個(gè)通用型操作符(算子),開(kāi)發(fā)者一般在該算子的flatMap方法中編寫(xiě)業(yè)務(wù)邏輯

* @auther: intsmaze(劉洋)

* @date: 2020/10/15 18:33

*/

public class MybatisFlatMap extends RichFlatMapFunction<String, String> {

  private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();

    protected ApplicationContext beanFactory;

    //mybatis的Service對(duì)象,操作數(shù)據(jù)庫(kù)的user表

    private UserService userService;

    @Override

    public void open(Configuration parameters) {

        ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()

                .getExecutionConfig().getGlobalJobParameters();

        beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters);

        userService = beanFactory.getBean(UserServiceImpl.class);

    }

    @Override

    public void flatMap(String value, Collector<String> out){

        FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() {

        }.getType());

        Map<String, StringBuffer> validate = ValidatorUtil.validate(flowData);

        if (validate != null) {

            System.out.println(validate);

            return null;

        }

        //數(shù)據(jù)庫(kù)查詢,屏蔽掉獲取數(shù)據(jù)庫(kù)連接,是否數(shù)據(jù)庫(kù)連接,事務(wù)的聲明等

        String flowUUID = userService.findUUID(flowData);

        if (StringUtils.isBlank(flowUUID)) {

            flowUUID = UUID.randomUUID().toString();

            flowData.setUuid(flowUUID);

            //數(shù)據(jù)庫(kù)插入,屏蔽掉獲取數(shù)據(jù)庫(kù)連接,是否數(shù)據(jù)庫(kù)連接,事務(wù)的聲明等

            userService.insertFlow(flowData);

        }

        out.collect(gson.toJson(flowData));

    }

}

public interface UserService {

    String findUUID(FlowData flowData);

    void insertFlow(FlowData flowData);

}

//通過(guò)注解實(shí)例化Bean對(duì)象。

@Service

//通過(guò)注解聲明進(jìn)行事務(wù)管理

@Transactional

//通過(guò)注解聲明方法具有異常重試機(jī)制

@EnableRetry

public class UserServiceImpl implements UserService {

  //通過(guò)注解進(jìn)行依賴注入

    @Resource

    private UserMapper userMapper;

    @Cacheable(value = "FlowData.findUUID", key = "#flowData.subTestItem")

    @Override

    public String findUUID(FlowData flowData) {

        return userMapper.findUUID(flowData);

    }

  //通過(guò)注解聲明該方法異常后的重試機(jī)制,無(wú)需手動(dòng)編程

    @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(delay = 2000L, multiplier = 1.5))

    @Override

    public void insertFlow(FlowData flowData) {

        userMapper.insertFlow(flowData);

    }

}

public interface UserMapper {

    String findUUID(FlowData flowData);

    void insertFlow(FlowData flowData);

}

//注解式聲明參數(shù)校驗(yàn)規(guī)則

public class FlowData {

    private String uuid;

    //聲明該參數(shù)的校驗(yàn)規(guī)則字符串長(zhǎng)度必須在7到20之間

    @Size(min = 7, max = 20, message = "長(zhǎng)度必須在{min}-{max}之間")

    private String subTestItem;

    //聲明該參數(shù)的校驗(yàn)規(guī)則字符串不能為空

    @NotBlank(message = "billNumber不能為空")

    private String billNumber;

    @NotBlank(message = "barcode不能為空")

    private String barcode;

    private String flowName;

    private String flowStatus;

    ......

}

倉(cāng)庫(kù)地址:懶松鼠Flink-Boot

倉(cāng)庫(kù)地址:懶松鼠Flink-Boot腳手架由《深入理解Flink核心設(shè)計(jì)與實(shí)踐原理》作者開(kāi)發(fā)。

image
  1. 該腳手架屏蔽掉組裝Flink API細(xì)節(jié),讓跨界變得簡(jiǎn)單,使得開(kāi)發(fā)者能以傳統(tǒng)Java WEB模式的開(kāi)發(fā)方式開(kāi)發(fā)出具備分布式計(jì)算能力的流處理程序。

  2. 開(kāi)發(fā)者完全不需要理解分布式計(jì)算的理論知識(shí)和Flink框架的細(xì)節(jié),便可以快速編寫(xiě)業(yè)務(wù)代碼實(shí)現(xiàn)。

  3. 為了進(jìn)一步提升開(kāi)發(fā)者使用該腳手架開(kāi)發(fā)大型項(xiàng)目的敏捷的程度,該腳手架工程默認(rèn)集成Spring框架進(jìn)行Bean管理,同時(shí)將微服務(wù)以及WEB開(kāi)發(fā)領(lǐng)域中經(jīng)常用到的框架集成進(jìn)來(lái),進(jìn)一步提升開(kāi)發(fā)速度。

  4. 除此之外針對(duì)目前流行的各大Java框架,該Flink腳手架工程也進(jìn)行了集成,加快開(kāi)發(fā)人員的編碼速度,比如:

  • 集成Jbcp-template對(duì)Mysql,Oracle,SQLServer等關(guān)系型數(shù)據(jù)庫(kù)的快速訪問(wèn)。

  • 集成Hibernate Validator框架進(jìn)行參數(shù)校驗(yàn)。

  • 集成Spring Retry框架進(jìn)行重試標(biāo)志。

  • 集成Mybatis框架,提高對(duì)關(guān)系型數(shù)據(jù)庫(kù)增,刪,改,查的開(kāi)發(fā)速度。

  • 集成Spring Cache框架,實(shí)現(xiàn)注解式定義方法緩存。

  • ......

1. 組織結(jié)構(gòu)


Flink-Boot

├── Flink-Base -- Flink-Boot工程基礎(chǔ)模塊

├── Flink-Client -- Flink-Boot 客戶端模塊

├── flink-annotation -- 注解生效模塊

├── flink-mybatis -- mybatis orm模塊

├── flink-retry -- 注解重試機(jī)制模式

├── flink-validate -- 校驗(yàn)?zāi)K

├── flink-sql -- Flink SQL解耦至XML配置模塊

├── flink-cache-annotation -- 接口緩沖模塊

├── flink-junit -- 單元測(cè)試模塊

├── flink-apollo -- 阿波羅配置客戶端模塊

2. 技術(shù)選項(xiàng)和集成情況

技術(shù) | 名稱 | 狀態(tài) |

----|------|----

Spring Framework | 容器 | 已集成

Spring 基于XML方式配置Bean | 裝配Bean | 已集成

Spring 基于注解方式配置Bean | 裝配Bean | 已集成

Spring 基于注解聲明方法重試機(jī)制 | Retry注解 | 已集成

Spring 基于注解聲明方法緩存 | Cache注解 | 已集成

Hibernate Validator | 校驗(yàn)框架 | 已集成

Druid | 數(shù)據(jù)庫(kù)連接池 | 已集成

MyBatis | ORM框架 | 已集成

Kafka | 消息隊(duì)列 | 已集成

HDFS | 分布式文件系統(tǒng) | 已集成

Log4J | 日志組件 | 已集成

Junit | 單元測(cè)試 | 已集成

Mybatis-Plus | MyBatis擴(kuò)展包 | 進(jìn)行中

PageHelper | MyBatis物理分頁(yè)插件 | 進(jìn)行中

ZooKeeper | 分布式協(xié)調(diào)服務(wù) | 進(jìn)行中

Dubbo | 分布式服務(wù)框架 | 進(jìn)行中

Redis | 分布式緩存數(shù)據(jù)庫(kù) | 進(jìn)行中

Solr & Elasticsearch | 分布式全文搜索引擎 | 進(jìn)行中

Ehcache | 進(jìn)程內(nèi)緩存框架 | 進(jìn)行中

sequence | 分布式高效ID生產(chǎn) | 進(jìn)行中

Dubbole消費(fèi)者 | 服務(wù)消費(fèi)者 | 進(jìn)行中

Spring eurake消費(fèi)者 | 服務(wù)消費(fèi)者 | 進(jìn)行中

Apollo配置中心 | 攜程阿波羅配置中心 | 進(jìn)行中

Spring Config配置中心 | Spring Cloud Config配置中心 | 進(jìn)行中

3. 快速開(kāi)始

下面是集成Spring生態(tài)的基礎(chǔ)手冊(cè).

3.1 核心基礎(chǔ)工程

  • flink-base :基礎(chǔ)工程,封裝了開(kāi)發(fā)Flink工程的必須參數(shù),同時(shí)集成Spring容器,為后續(xù)集成Spring各類框架提供了支撐。

    1. 可以在本地開(kāi)發(fā)環(huán)境和Flink集群運(yùn)行環(huán)境中隨意切換。

    2. 可以在增量檢查點(diǎn)和全量檢查點(diǎn)之間隨意切換。

    3. 內(nèi)置使用HDFS作為檢查點(diǎn)的持久存儲(chǔ)介質(zhì)。

    4. 默認(rèn)使用Kafka作為數(shù)據(jù)源

    5. 內(nèi)置實(shí)現(xiàn)了任務(wù)的暫停機(jī)制-達(dá)到任務(wù)仍在運(yùn)行但不再接收Kafka數(shù)據(jù)源中的數(shù)據(jù),代替了停止任務(wù)后再重新部署任務(wù)這一繁瑣流程。

  • flink-client:業(yè)務(wù)工程,該工程依賴flink-base工程,開(kāi)發(fā)任務(wù)在該工程中進(jìn)行業(yè)務(wù)邏輯的開(kāi)發(fā)。

3.2 Spring容器

該容器模式配置了JdbcTemplate實(shí)例,數(shù)據(jù)庫(kù)連接池采用Druid,在業(yè)務(wù)方法中只需要獲取容器中的JdbcTemplate實(shí)例便可以快速與關(guān)系型數(shù)據(jù)庫(kù)進(jìn)行交互,dataService實(shí)例封裝了一些訪問(wèn)數(shù)據(jù)庫(kù)表的方法。

topology-base.xml

<beans ......

      default-lazy-init="true" default-init-method="init">

    <context:property-placeholder location="classpath:config.properties"/>

    <bean id="druidDataSource" class="com.alibaba.druid.pool.DruidDataSource">

        <property name="driverClassName" value="com.mysql.jdbc.Driver"></property>

        <property name="url"

                  value="${jdbc.url}"></property>

        <property name="username" value="${jdbc.user}"></property>

        <property name="password" value="${jdbc.password}"></property>

    </bean>

    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">

        <constructor-arg ref="druidDataSource"></constructor-arg>

    </bean>

    <bean id="dataService" class="com.intsmaze.flink.base.service.DataService">

        <property name="jdbcTemplate" ref="jdbcTemplate"></property>

    </bean>

</beans>

config.properties

jdbc.user = intsmaze

jdbc.password = intsmaze

jdbc.url = jdbc:mysql://127.0.0.1:3306/flink-boot?useUnicode=true&characterEncoding=UTF-8

3.3 啟動(dòng)類示例

如下是SimpleClient(com.intsmaze.flink.client.SimpleClient)類的示例代碼,該類繼承了BaseFlink,可以看到對(duì)應(yīng)實(shí)現(xiàn)的方法中分別設(shè)置如下:

  • public String getTopoName():定義本作業(yè)的名稱。

  • public String getConfigName():定義本作業(yè)需要讀取的spring配置文件的名稱

  • public String getPropertiesName():定義本作業(yè)需要讀取的properties配置文件的名稱。

  • public void createTopology(StreamExecutionEnvironment builder):構(gòu)造本作業(yè)的拓?fù)浣Y(jié)構(gòu)。


/**

* github地址: https://github.com/intsmaze

* 博客地址:https://www.cnblogs.com/intsmaze/

* 出版書(shū)籍《深入理解Flink核心設(shè)計(jì)與實(shí)踐原理》 隨書(shū)代碼

*

* @auther: intsmaze(劉洋)

* @date: 2020/10/15 18:33

*/

public class SimpleClient extends BaseFlink {

    public static void main(String[] args) throws Exception {

        SimpleClient topo = new SimpleClient();

        topo.run(ParameterTool.fromArgs(args));

    }

    @Override

    public String getTopoName() {

        return "SimpleClient";

    }

    @Override

    public String getConfigName() {

        return "topology-base.xml";

    }

    @Override

    public String getPropertiesName() {

        return "config.properties";

    }

    @Override

    public void createTopology(StreamExecutionEnvironment builder) {

        DataStream<String> inputDataStrem = env.addSource(new SimpleDataSource());

        DataStream<String> processDataStream = inputDataStrem.flatMap(new SimpleFunction());

        processDataStream.print("輸出結(jié)果");

    }

}

3.4 數(shù)據(jù)源

采用自定義數(shù)據(jù)源,用戶需要編寫(xiě)自定義DataSource類,該類需要繼承XXX抽象類,實(shí)現(xiàn)如下方法。

  • public abstract void open(StormBeanFactory beanFactory):獲取本作業(yè)在Spring配置文件中配置的bean對(duì)象。

  • public abstract String sendMessage():本作業(yè)spout生成數(shù)據(jù)的方法,在該方法內(nèi)編寫(xiě)業(yè)務(wù)邏輯產(chǎn)生源數(shù)據(jù),產(chǎn)生的數(shù)據(jù)以String類型進(jìn)行返回。


public class SimpleDataSource extends CommonDataSource {

    private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();

......

    @Override

    public void open(Configuration parameters) throws Exception {

        super.open(parameters);

        ...//構(gòu)造讀取各類外部系統(tǒng)數(shù)據(jù)的連接實(shí)例

    }

    @Override

    public String sendMess() throws InterruptedException {

        Thread.sleep(1000);

......

        MainData mainData = new MainData();

        ......//通過(guò)外部系統(tǒng)數(shù)據(jù)的連接實(shí)例讀取外部系統(tǒng)數(shù)據(jù),封裝進(jìn)MainData對(duì)象中,然后返回即可。

        return gson.toJson(mainData);

    }

}

3.5 業(yè)務(wù)邏輯實(shí)現(xiàn)

本作業(yè)計(jì)算的業(yè)務(wù)邏輯在Flink轉(zhuǎn)換操作符中進(jìn)行實(shí)現(xiàn),一般來(lái)說(shuō)開(kāi)發(fā)者只需要實(shí)現(xiàn)flatMap算子即可以滿足大部分算子的使用。

用戶編寫(xiě)的自定義類需要繼承com.intsmaze.flink.base.transform.CommonFunction抽象類,均需實(shí)現(xiàn)如下方法。

  • public abstract String execute(String message):本作業(yè)業(yè)務(wù)邏輯計(jì)算的方法,參數(shù)message為Kafka主題中讀取過(guò)來(lái)的參數(shù),默認(rèn)參數(shù)為String類型,如果需要將處理的數(shù)據(jù)發(fā)送給Kakfa主題中,則要通過(guò)return將處理的數(shù)據(jù)返回即可。

public class SimpleFunction extends CommonFunction {

    private static Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss").create();

    @Override

    public String execute(String message) throws Exception {

        FlowData flowData = gson.fromJson(message, new TypeToken<FlowData>() {

        }.getType());

        String flowUUID = dataService.findUUID(flowData);

        if (StringUtils.isBlank(flowUUID)) {

            flowUUID = UUID.randomUUID().toString();

            flowData.setUuid(flowUUID);

            dataService.insertFlow(flowData);

        }

        return gson.toJson(flowData);

    }

}

CommonFunction

CommonFunction抽象類中默認(rèn)在open方法中通過(guò)BeanFactory對(duì)象獲取到了Spring容器中對(duì)于的dataService實(shí)例,對(duì)于Spring中的其他實(shí)例同理在SimpleFunction類中的open方法中獲取即可。


public abstract class CommonFunction extends RichFlatMapFunction<String, String> {

    private IntCounter numLines = new IntCounter();

    protected DataService dataService;

    protected ApplicationContext beanFactory;

    @Override

    public void open(Configuration parameters) {

        getRuntimeContext().addAccumulator("num-FlatMap", this.numLines);

        ExecutionConfig.GlobalJobParameters globalJobParameters = getRuntimeContext()

                .getExecutionConfig().getGlobalJobParameters();

        beanFactory = BeanFactory.getBeanFactory((Configuration) globalJobParameters);

        dataService = beanFactory.getBean(DataService.class);

    }

    @Override

    public void flatMap(String value, Collector<String> out) throws Exception {

        this.numLines.add(1);

        String execute = execute(value);

        if (StringUtils.isNotBlank(execute)) {

            out.collect(execute);

        }

    }

    public abstract String execute(String message) throws Exception;

}

可以根據(jù)情況選擇重寫(xiě)open(Configuration parameters)方法,同時(shí)重寫(xiě)的open(Configuration parameters)方法的第一行要調(diào)用父類的open(Configuration parameters)方法。


public void open(Configuration parameters){

super.open(parameters);

......

//獲取在Spring配置文件中配置的實(shí)例

XXX xxx=beanFactory.getBean(XXX.class);

}

3.6 集群/本地運(yùn)行

在自定義的Topology類編寫(xiě)Main方法,創(chuàng)建自定義的Topology對(duì)象后,調(diào)用對(duì)象的run(...)方法。

public class SimpleClient extends BaseFlink {

/**

* 本地啟動(dòng)參數(shù)  -isLocal local

* 集群?jiǎn)?dòng)參數(shù)  -isIncremental isIncremental

*/

public static void main(String[] args) throws Exception {

    SimpleClient topo = new SimpleClient();

    topo.run(ParameterTool.fromArgs(args));

}

.......   
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容