source 類型
- 從集合類獲取
- 從文本讀取
- 讀取消息隊(duì)列(Kafka)的數(shù)據(jù)
- 自定義source
從集合獲取或文本讀取
不做過多介紹,直接看代碼即可
package com.lxs.flink.realtime;
import java.util.Arrays;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import com.lxs.dto.OrderInfo;
/**
* @author lixinsong
* @version version
* @desc source
* @date 2020/11/25
*/
public class SourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 從集合類獲取dataSource
DataStreamSource<OrderInfo> stream = env.fromCollection(Arrays.asList(
OrderInfo.builder().orderNumber(111111L).price(23121L).timeStamp(
System.currentTimeMillis() - 300).userId(111L).build(),
OrderInfo.builder().orderNumber(111112L).price(33112L).timeStamp(
System.currentTimeMillis() - 200).userId(111L).build(),
OrderInfo.builder().orderNumber(111113L).price(1234L).timeStamp(
System.currentTimeMillis() - 100).userId(112L).build()));
stream.print("stream");
// 從文本獲取dataSource
DataStreamSource<String> stream2 = env.readTextFile(
"/Users/study/java/frauddetection/src/main/resources/wordcount.txt");
stream2.print("stream2");
env.execute();
}
}
從kafka 獲取
引入 pom
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.12</artifactId>
<version>1.11.2</version>
</dependency>
package com.lxs.flink.realtime;
import java.util.Properties;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
/**
* @author lixinsong
* @version version
* @desc
* @date 2020/11/25
*/
public class KafkaDataSourceTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9902");
properties.setProperty("group.id", "lxs-group");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("auto.offset.reset", "latest");
DataStreamSource<String> stream = env.addSource(
new FlinkKafkaConsumer010<>("lxs-topic", new SimpleStringSchema(), properties));
stream.print("kafka-stream");
env.execute("kafka-stream-test");
}
}