Flink Sql教程(2)

從kafka到mysql

新建Java項(xiàng)目

  • 最簡(jiǎn)單的方式是按照官網(wǎng)的方法,命令行執(zhí)行curl https://flink.apache.org/q/quickstart.sh | bash -s 1.10.0,不過這種方法有些包還得自行添加,大家可以復(fù)制我的pom.xml,我已經(jīng)將常用的包都放進(jìn)去了,并且排除了沖突的包。注意的是,本地測(cè)試的時(shí)候,記得將scope注掉,不然會(huì)出現(xiàn)少包的情況。也可以在Run -> Edit Configurations中,勾選Include dependencies with "Provided" scope。最好在resources目錄下丟個(gè)log4j的配置文件,這樣有時(shí)候方便我們看日志找問題。

  • 新建完項(xiàng)目之后,我們要做的第一件事,自然是寫個(gè)Flink 版本的Hello World。所以,新建測(cè)試類,然后輸入代碼

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        DataStream dataStream = env.fromElements("Hello World");
    
        dataStream.print();
      
        env.execute("test");
    

    看一下控制臺(tái)

        4> Hello World
    

    如愿以償?shù)牡玫搅讼胍慕Y(jié)果,不過這個(gè)4>是什么玩應(yīng)?其實(shí)這個(gè)4代表是第四個(gè)分區(qū)輸出的結(jié)果。很多人可能會(huì)問,我也妹指定并發(fā)啊,數(shù)據(jù)怎么會(huì)跑到第四個(gè)分區(qū)呢?其實(shí)是因?yàn)楸镜啬J降臅r(shí)候,會(huì)以匹配CPU的核數(shù),啟動(dòng)對(duì)應(yīng)數(shù)量的分區(qū)。只要我們?cè)诿總€(gè)算子之后加上setParallelism(1),就會(huì)只以一個(gè)分區(qū)來(lái)執(zhí)行了。至此,我們的DataStream 版的Hellow World試驗(yàn)完畢,這里主要是為了驗(yàn)證一下環(huán)境是否正確,接下來(lái)才是我們今天的主題從kafka到mysql。另外,如果更想了解DataStream的內(nèi)容,歡迎大家關(guān)注另一個(gè)系列Flink DataStream(不過目前還沒開始寫)

新建kafka數(shù)據(jù)源表

接下來(lái)咱們廢話不多說(shuō),直接貼代碼

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;


public class FlinkSql02 {
    public static final String  KAFKA_TABLE_SOURCE_DDL = "" +
            "CREATE TABLE user_behavior (\n" +
            "    user_id BIGINT,\n" +
            "    item_id BIGINT,\n" +
            "    category_id BIGINT,\n" +
            "    behavior STRING,\n" +
            "    ts TIMESTAMP(3)\n" +
            ") WITH (\n" +
            "    'connector.type' = 'kafka',  -- 指定連接類型是kafka\n" +
            "    'connector.version' = '0.11',  -- 與我們之前Docker安裝的kafka版本要一致\n" +
            "    'connector.topic' = 'mykafka', -- 之前創(chuàng)建的topic \n" +
            "    'connector.properties.group.id' = 'flink-test-0', -- 消費(fèi)者組,相關(guān)概念可自行百度\n" +
            "    'connector.startup-mode' = 'earliest-offset',  --指定從最早消費(fèi)\n" +
            "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址\n" +
            "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址\n" +
            "    'format.type' = 'json'  -- json格式,和topic中的消息格式保持一致\n" +
            ")";
    public static void main(String[] args) throws Exception {
        //構(gòu)建StreamExecutionEnvironment 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //構(gòu)建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        
        //構(gòu)建StreamTableEnvironment 
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
        
        //通過DDL,注冊(cè)kafka數(shù)據(jù)源表
        tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL);
        
        //執(zhí)行查詢
        Table table = tEnv.sqlQuery("select * from user_behavior");
        
        //轉(zhuǎn)回DataStream并輸出
        tEnv.toAppendStream(table, Row.class).print().setParallelism(1);

        //任務(wù)啟動(dòng),這行必不可少!
        env.execute("test");

    }
}

接下來(lái)就是激動(dòng)人性的測(cè)試了,右擊,run!查看控制臺(tái)

    543462,1715,1464116,pv,2017-11-26T01:00
    543462,1715,1464116,pv,2017-11-26T01:00
    543462,1715,1464116,pv,2017-11-26T01:00
    543462,1715,1464116,pv,2017-11-26T01:00

嗯,跟我之前往kafka中丟的數(shù)據(jù)一樣,沒毛?。?/p>

如果大家在使用過程中遇到Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in這種異常,請(qǐng)仔細(xì)查看你的DDL語(yǔ)句,是否缺少或者用錯(cuò)了配置,這里大家可以參考一下Flink官網(wǎng)的手冊(cè),查看一下對(duì)應(yīng)的配置。也可以在下方留言,一起交流。

新建mysql數(shù)據(jù)結(jié)果表

  • 現(xiàn)在mysql中把表創(chuàng)建,畢竟flink現(xiàn)在還沒法幫你自動(dòng)建表,只能自己動(dòng)手豐衣足食咯。
CREATE TABLE `user_behavior` (
  `user_id` bigint(20) DEFAULT NULL,
  `item_id` bigint(20) DEFAULT NULL,
  `behavior` varchar(255) DEFAULT NULL,
  `category_id` bigint(20) DEFAULT NULL,
  `ts` timestamp(6) NULL DEFAULT NULL
)

在mysql端創(chuàng)建完成后,回到我們的代碼,注冊(cè)mysql數(shù)據(jù)結(jié)果表,并將從kafka中讀取到的數(shù)據(jù),插入mysql結(jié)果表中。下面是完整代碼,包含kafka數(shù)據(jù)源表的構(gòu)建。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;


public class FlinkSql02 {
    public static final String  KAFKA_TABLE_SOURCE_DDL = "" +
            "CREATE TABLE user_behavior (\n" +
            "    user_id BIGINT,\n" +
            "    item_id BIGINT,\n" +
            "    category_id BIGINT,\n" +
            "    behavior STRING,\n" +
            "    ts TIMESTAMP(3)\n" +
            ") WITH (\n" +
            "    'connector.type' = 'kafka',  -- 指定連接類型是kafka\n" +
            "    'connector.version' = '0.11',  -- 與我們之前Docker安裝的kafka版本要一致\n" +
            "    'connector.topic' = 'mykafka', -- 之前創(chuàng)建的topic \n" +
            "    'connector.properties.group.id' = 'flink-test-0', -- 消費(fèi)者組,相關(guān)概念可自行百度\n" +
            "    'connector.startup-mode' = 'earliest-offset',  --指定從最早消費(fèi)\n" +
            "    'connector.properties.zookeeper.connect' = 'localhost:2181',  -- zk地址\n" +
            "    'connector.properties.bootstrap.servers' = 'localhost:9092',  -- broker地址\n" +
            "    'format.type' = 'json'  -- json格式,和topic中的消息格式保持一致\n" +
            ")";

    public static final String MYSQL_TABLE_SINK_DDL=""+
            "CREATE TABLE `user_behavior_mysql` (\n" +
            "  `user_id` bigint  ,\n" +
            "  `item_id` bigint  ,\n" +
            "  `behavior` varchar  ,\n" +
            "  `category_id` bigint  ,\n" +
            "  `ts` timestamp(3)   \n" +
            ")WITH (\n" +
            "  'connector.type' = 'jdbc', -- 連接方式\n" +
            "  'connector.url' = 'jdbc:mysql://localhost:3306/mysql', -- jdbc的url\n" +
            "  'connector.table' = 'user_behavior',  -- 表名\n" +
            "  'connector.driver' = 'com.mysql.jdbc.Driver', -- 驅(qū)動(dòng)名字,可以不填,會(huì)自動(dòng)從上面的jdbc url解析 \n" +
            "  'connector.username' = 'root', -- 顧名思義 用戶名\n" +
            "  'connector.password' = '123456' , -- 密碼\n" +
            "  'connector.write.flush.max-rows' = '5000', -- 意思是攢滿多少條才觸發(fā)寫入 \n" +
            "  'connector.write.flush.interval' = '2s' -- 意思是攢滿多少秒才觸發(fā)寫入;這2個(gè)參數(shù),無(wú)論數(shù)據(jù)滿足哪個(gè)條件,就會(huì)觸發(fā)寫入\n"+
            ")"



            ;
    public static void main(String[] args) throws Exception {
        //構(gòu)建StreamExecutionEnvironment 
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        
        //構(gòu)建EnvironmentSettings 并指定Blink Planner
        EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        
        //構(gòu)建StreamTableEnvironment 
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);
        
        //通過DDL,注冊(cè)kafka數(shù)據(jù)源表
        tEnv.sqlUpdate(KAFKA_TABLE_SOURCE_DDL);

        //通過DDL,注冊(cè)mysql數(shù)據(jù)結(jié)果表
        tEnv.sqlUpdate(MYSQL_TABLE_SINK_DDL);
        
        //將從kafka中查到的數(shù)據(jù),插入mysql中
        tEnv.sqlUpdate("insert into user_behavior_mysql select user_id,item_id,behavior,category_id,ts from user_behavior");
        
        //任務(wù)啟動(dòng),這行必不可少!
        env.execute("test");

    }
}

打開我們的Navicat,看看我們的數(shù)據(jù)是否正確輸入到mysql中。

user_id item_id behavior category_id ts
543462 1715 pv 1464116 2017-11-26 01:00:00.000
543462 1715 pv 1464116 2017-11-26 01:00:00.000
543462 1715 pv 1464116 2017-11-26 01:00:00.000
543462 1715 pv 1464116 2017-11-26 01:00:00.000

成功!并且數(shù)據(jù)和我們kafka中的數(shù)據(jù)也是一致,大家也可以通過上一章講過的Java連接kafka來(lái)對(duì)比驗(yàn)證數(shù)據(jù)的一致性,此處就不再贅述。那么好了,本次的Flink Sql之旅就結(jié)束,下一章我們將帶大家,在這次課程的基礎(chǔ)上,完成常用聚合查詢以及目前Flink Sql原生支持的維表Join。另外,有同學(xué)反映有些地方不知道為什么要這樣做,不想只知其然而不知所以然,我們之后同樣會(huì)有另外的專題講述Flink 原理。

附錄

pom.xml

    
    <properties>
        <flink.version>1.10.0</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
    </properties>

    <dependencies>
        <!-- Flink modules -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>

            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>scala-library</artifactId>
                    <groupId>org.scala-lang</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- CLI dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
            <exclusions>
                <exclusion>
                    <artifactId>javassist</artifactId>
                    <groupId>org.javassist</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>scala-parser-combinators_2.11</artifactId>
                    <groupId>org.scala-lang.modules</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>snappy-java</artifactId>
                    <groupId>org.xerial.snappy</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.3</version>
            <exclusions>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <exclusions>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.37</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-redis_2.11</artifactId>
            <version>1.1.5</version>
            <exclusions>
                <exclusion>
                    <artifactId>force-shading</artifactId>
                    <groupId>org.apache.flink</groupId>
                </exclusion>
                <exclusion>
                    <artifactId>slf4j-api</artifactId>
                    <groupId>org.slf4j</groupId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.5</version>
        </dependency>

        <dependency>
            <groupId>io.lettuce</groupId>
            <artifactId>lettuce-core</artifactId>
            <version>5.0.5.RELEASE</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.46</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.11</artifactId>
            <version>1.10.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.4.Final</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-jdbc -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-jdbc_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>

    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <encoding>UTF-8</encoding>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <artifactSet>
                                <excludes>
                                    <exclude>junit:junit</exclude>
                                </excludes>
                            </artifactSet>

                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

有點(diǎn)亂,懶得整理了,大家直接復(fù)制過去用就行。

log4j.xml

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">

<log4j:configuration xmlns:log4j='http://jakarta.apache.org/log4j/' >

    <appender name="myConsole" class="org.apache.log4j.ConsoleAppender">
        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern"
                   value="[%d{dd HH:mm:ss,SSS\} %-5p] [%t] %c{2\} - %m%n" />
        </layout>
        <!--過濾器設(shè)置輸出的級(jí)別-->
        <filter class="org.apache.log4j.varia.LevelRangeFilter">
            <param name="levelMin" value="info" />
            <param name="levelMax" value="error" />
            <param name="AcceptOnMatch" value="true" />
        </filter>
    </appender>

    <!-- 指定logger的設(shè)置,additivity指示是否遵循缺省的繼承機(jī)制-->
    <logger name="com.runway.bssp.activeXdemo" additivity="false">
        <appender-ref ref="myConsole" />
    </logger>

    <!-- 根logger的設(shè)置-->
    <root>
        <priority value ="debug"/>
        <appender-ref ref="myConsole"/>
    </root>
</log4j:configuration>

記得要放在resource目錄下,別放錯(cuò)了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容