Flink的map/flatMap/filter的開發(fā)示例

Flink算子的開發(fā),需要創(chuàng)建Maven項目,構(gòu)建jar包,在Flink任務啟動時,加載Jar包進行運行算子。

一、創(chuàng)建spring boot工程

pom.xml文件如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.xxx</groupId>
    <artifactId>flatdemo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>flatdemo</name>
    <description>FlatMap Demo</description>
    <properties>
        <java.version>1.8</java.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>
<dependencies>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.83</version>
    </dependency>
    <!-- Flink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>1.13.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.13.5</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.13.5</version>
    </dependency>
</dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.5</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

二、map算子

map算子的功能:將一個數(shù)據(jù)項,通過map中的函數(shù)映射變?yōu)橐粋€新的元素。
創(chuàng)建java類:

package com.xxx.flatdemo;

import org.apache.flink.api.common.functions.RichMapFunction;

public class MyMap extends RichMapFunction<String, String> {

    @Override
    public String map(String value)  {
        // 字符串轉(zhuǎn)換為大寫
        return value.toUpperCase();
    }
}

三、flatMap算子

flatMap算子的功能:將一個數(shù)據(jù)項,通過map中的函數(shù)映射生成零個、一個或者多個元素。
創(chuàng)建java類:

package com.xxx.flatdemo;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;

public class MyFlatMap extends RichFlatMapFunction<String,String> {


    @Override
    public void flatMap(String value, Collector<String> out) throws Exception {

        if(value!=null && value.startsWith("{") && value.endsWith("}")){
            JSONObject obj = JSON.parseObject(value);
            // 獲取json的name字段
            String name = obj.getString("name");
            //按空格把名字拆散為name1和name2兩個字段,如果名字只有一個單詞,則name2為空字符串。
            String[] szName = name.split(" ");
            String name1;
            String name2;
            if (szName.length > 1) {
                name1 = szName[0];
                name2 = szName[1];
            }
            else {
                name1 = szName[0];
                name2 = "";
            }
            int age = obj.getIntValue("age");
            String tel = obj.getString("tel");
            String address = obj.getString("address");
            //拼接字段,字段之間用\u0001分隔,存儲為Hive格式數(shù)據(jù)
            String line = name1+"\u0001" + name2 + "\u0001" + age + "\u0001" + tel + "\u0001" + address;
            out.collect(line);
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }
}

四、filter算子

filter算子的功能:對每個數(shù)據(jù)流中的每個元素執(zhí)行一個布爾函數(shù),只保留返回值為 True 的元素。
創(chuàng)建java類:

package com.xxx.flatdemo;

import org.apache.flink.api.common.functions.RichFilterFunction;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;

public class MyFilter extends RichFilterFunction<String> {

    @Override
    public boolean filter(String value) throws Exception {
        if(value!=null && value.startsWith("{") && value.endsWith("}")){
            JSONObject obj = JSON.parseObject(value);
            int age = obj.getIntValue("age");
            //過濾年齡大于等于100的記錄
            if(age >= 100){
                return true;
            }else{
                return false;
            }
        } else {
            return false;
        }
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容