Flink入門 - Source

source 類型

  • 從集合類獲取
  • 從文本讀取
  • 讀取消息隊(duì)列(Kafka)的數(shù)據(jù)
  • 自定義source

從集合獲取或文本讀取

不做過多介紹,直接看代碼即可

package com.lxs.flink.realtime;

import java.util.Arrays;

import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.lxs.dto.OrderInfo;

/**
 * @author lixinsong
 * @version version
 * @desc source
 * @date 2020/11/25
 */

public class SourceTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 從集合類獲取dataSource
        DataStreamSource<OrderInfo> stream = env.fromCollection(Arrays.asList(
                OrderInfo.builder().orderNumber(111111L).price(23121L).timeStamp(
                        System.currentTimeMillis() - 300).userId(111L).build(),
                OrderInfo.builder().orderNumber(111112L).price(33112L).timeStamp(
                        System.currentTimeMillis() - 200).userId(111L).build(),
                OrderInfo.builder().orderNumber(111113L).price(1234L).timeStamp(
                        System.currentTimeMillis() - 100).userId(112L).build()));

        stream.print("stream");

        // 從文本獲取dataSource
        DataStreamSource<String> stream2 = env.readTextFile(
                "/Users/study/java/frauddetection/src/main/resources/wordcount.txt");
        stream2.print("stream2");

        env.execute();
    }
}

從kafka 獲取

引入 pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.10_2.12</artifactId>
    <version>1.11.2</version>
</dependency>
package com.lxs.flink.realtime;


import java.util.Properties;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;

/**
 * @author lixinsong
 * @version version
 * @desc
 * @date 2020/11/25
 */

public class KafkaDataSourceTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9902");
        properties.setProperty("group.id", "lxs-group");
        properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty("auto.offset.reset", "latest");

        DataStreamSource<String> stream = env.addSource(
                new FlinkKafkaConsumer010<>("lxs-topic", new SimpleStringSchema(), properties));
        stream.print("kafka-stream");
        env.execute("kafka-stream-test");
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 流處理基本步驟: Source 1. 從集合讀取數(shù)據(jù) 運(yùn)行代碼,打印結(jié)果: SLF4J: Failed to lo...
    勇于自信閱讀 751評(píng)論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評(píng)論 19 139
  • 5月以來,哪怕對(duì)市場風(fēng)向再不敏感的人,也感覺到陣陣涼意。二級(jí)市場連續(xù)下挫,一級(jí)市場融資環(huán)境惡化,不論企業(yè)融資數(shù)量還...
    錢皓頻道閱讀 6,457評(píng)論 1 6
  • 推薦指數(shù): 6.0 書籍主旨關(guān)鍵詞:特權(quán)、焦點(diǎn)、注意力、語言聯(lián)想、情景聯(lián)想 觀點(diǎn): 1.統(tǒng)計(jì)學(xué)現(xiàn)在叫數(shù)據(jù)分析,社會(huì)...
    Jenaral閱讀 5,978評(píng)論 0 5
  • 昨天,在回家的路上,坐在車?yán)镉圃沼圃盏乜粗摹度龉衬墓适隆罚冶焕锩娴膬?nèi)容深深吸引住了,盡管上學(xué)時(shí)...
    夜闌曉語閱讀 3,938評(píng)論 2 9

友情鏈接更多精彩內(nèi)容