主要引用官方文檔 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sourcessinks/
動態(tài)表(Dynamic table)是 Flink Table 和 SQL API 的核心概念,用于以統(tǒng)一的方式處理有界和無界數(shù)據(jù)。
動態(tài)表只是一個邏輯概念,F(xiàn)link 并不擁有數(shù)據(jù)本身,動態(tài)表的內容存儲在外部系統(tǒng)(如數(shù)據(jù)庫、KV存儲、消息隊列)或文件中。
動態(tài)源(Dynamic Source)和動態(tài)接收器(Dynamic Sink)可用于在外部系統(tǒng)中讀寫數(shù)據(jù)。在 Flink 文檔中,Source 和 Sink 經常歸類到術語 Connector。
Overview(概覽)
大部分情況下,不需要從頭開始創(chuàng)建一個全新的 Connector,而是希望稍微修改現(xiàn)有的 Connector。在其他情況下,實現(xiàn)者希望創(chuàng)建專門的連接器。下圖顯示了對象如何從元數(shù)據(jù)轉換為運行時對象的處理過程。

Metadata
Table API 和 SQL 都是聲明式 API,包括表的聲明。執(zhí)行 CREATE TABLE 語句會在目標 Catalog 中更新元數(shù)據(jù)。
動態(tài)表的元數(shù)據(jù)(通過 DDL 創(chuàng)建或由 Catalog 提供)表現(xiàn)為 CatalogTable 的實例。
對于大多數(shù) Catalog 實現(xiàn),不會為此類操作修改外部系統(tǒng)中的物理數(shù)據(jù)。Connector 特定的依賴也不必存在于 classpath 中。WITH 子句中聲明的選項既不進行驗證,也不進行其他解釋。
Planning
當涉及到處理執(zhí)行計劃和優(yōu)化時,CatalogTable 需要解析為 DynamicTableSource(用于在 SELECT 查詢中讀取)和 DynamicTableSink(用于在 INSERT 語句中寫入)。
DynamicTableSourceFactory 和 DynamicTableSinkFactory 提供邏輯將 CatalogTable 的元數(shù)據(jù)轉換為 DynamicTableSource 和 DynamicTableSink 的實例。在大多數(shù)情況下,工廠的目的是驗證 options(例如上圖示例中的 'port'='5022')、配置編碼/解碼格式(如果需要)、創(chuàng)建 Table connector 的參數(shù)化實例。
默認情況下,DynamicTableSourceFactory 和 DynamicTableSinkFactory 的實例是使用 Java 的 SPI(Service Provider Interface)發(fā)現(xiàn)的。connector 選項(例如示例中的 'connector'='custom')必須對應有效的 factory 標識符。
盡管在類命名上可能不明顯,但 DynamicTableSource 和 DynamicTableSink 也可以被視為有狀態(tài)工廠,最終生成具體的運行時實現(xiàn)來讀取/寫入實際數(shù)據(jù)。
Planner 使用 Source 和 Sink 實例找到最佳邏輯計劃。根據(jù)可選聲明的接口(例如 SupportsProjectionPushDown 或 SupportsOverwrite),Planner 可能會對實例應用一些更改,從而改變生成的運行時實現(xiàn)。
Runtime
邏輯計劃完成后,Planner 將獲得運行時實現(xiàn)。運行時邏輯在 Flink 的 core connector 接口(如 InputFormat 或 SourceFunction )中實現(xiàn)。
這些接口按另一抽象級別劃分為 ScanRuntimeProvider、LookupRuntimeProvider 和 SinkRuntimeProvider 的子類。
例如,OutputFormatProvider 和 SinkFunctionProvider 都是 Planner 可以處理的 SinkRuntimeProvider 的具體實例。
Extension Points(擴展點)
本節(jié)介紹擴展 Flink Table Connector 的可用接口。
Dynamic Table Factories
Dynamic table factory 用于為外部存儲系統(tǒng)配置 Dynamic table connector(根據(jù) Catalog 和 Session 信息)。
org.apache.flink.table.factories.DynamicTableSourceFactory 可以被實現(xiàn)用來構造 DynamicTableSource。
org.apache.flink.table.factories.DynamicTableSinkFactory 可以被實現(xiàn)用來構造 DynamicTableSink。
默認情況,工廠是使用 connector 選項的值作為工廠標識符。
在 JAR 文件中,可以將對新實現(xiàn)的引用添加到文件中:META-INF/services/org.apache.flink.table.factors.Factory。Flink 框架將檢查工廠唯一標識和請求的基類來確定匹配的工廠。
如果需要,Catalog 實現(xiàn)可以繞過工廠發(fā)現(xiàn)過程。為此,Catalog 需要返回一個實例,該實例實現(xiàn) org.apache.flink.table.catalog.catalog#getFactory 中請求的基類。
Dynamic Table Source
根據(jù)定義,動態(tài)表可隨時間變化的。讀取動態(tài)表時,可以將內容視為以下情況:
- 一種 changelog(有限的或無限的),在 changelog 耗盡之前,所有的更改都被連續(xù)地處理。這由 ScanTableSource 接口表示。
- 一種連續(xù)變化的或非常大的外部表,其內容通常不會被完全讀取,而是在必要時查詢單個值。這由 LookupTableSource 接口表示。
可以同時實現(xiàn)這兩個接口。Planner 根據(jù)指定的查詢決定如何使用。
Scan Table Source
ScanTableSource 在運行時掃描外部存儲系統(tǒng)中的所有行。
- 對于常規(guī)批處理場景,Source 可以發(fā)出只 Insert 的有界流。
- 對于常規(guī)流式處理場景,Source 可以發(fā)出只有 Insert 的無界流。
- 對于更改數(shù)據(jù)捕獲(CDC)場景,Source 可以發(fā)出有界或無界流,其中包含插入、更新和刪除行。
Table Source 可以實現(xiàn)更多的功能接口,如 SupportsProjectionPushDown,這些接口可能會在 Planning 期間改變實例。所有功能接口都可以在 org.apache.flink.table.connector.source.abilities 包中找到。
ScanTableSource 的運行時實現(xiàn)必須生成內部數(shù)據(jù)結構。因此,記錄必須作為 org.apache.flink.table.data.RowData 發(fā)出??蚣芴峁┝诉\行時轉換器,使得 Source 可以在通用數(shù)據(jù)結構上工作,并在最后執(zhí)行轉換。
Lookup Table Source
LookupTableSource 在運行時通過一個或多個鍵查找外部存儲系統(tǒng)的行。
與 ScanTableSource 相比,Source 不必讀取整個表,并且可以在需要時從(表數(shù)據(jù)可能是不斷變化的)外部表中惰性地獲取單個值。
與 ScanTableSource 相比,LookupTableSource 當前只支持發(fā)出只有 Insert 的數(shù)據(jù)。
LookupTableSource 的運行時實現(xiàn)是 TableFunction 或 AsyncTableFunction。在運行時,使用指定的查找鍵的值調用該函數(shù)。
Source Abilities
以下接口只適用于 ScanTableSource,不適用于 LookupTableSource
| 接口 | 描述 |
|---|---|
| SupportsFilterPushDown | Enables to push down the filter into the DynamicTableSource. For efficiency, a source can push filters further down in order to be close to the actual data generation. |
| SupportsLimitPushDown | Enables to push down a limit (the expected maximum number of produced records) into a DynamicTableSource. |
| SupportsPartitionPushDown | Enables to pass available partitions to the planner and push down partitions into a DynamicTableSource. During the runtime, the source will only read data from the passed partition list for efficiency. |
| SupportsProjectionPushDown | Enables to push down a (possibly nested) projection into a DynamicTableSource. For efficiency, a source can push a projection further down in order to be close to the actual data generation. If the source also implements SupportsReadingMetadata, the source will also read the required metadata only. |
| SupportsReadingMetadata | Enables to read metadata columns from a DynamicTableSource. The source is responsible to add the required metadata at the end of the produced rows. This includes potentially forwarding metadata column from contained formats. |
| SupportsWatermarkPushDown | Enables to push down a watermark strategy into a DynamicTableSource. The watermark strategy is a builder/factory for timestamp extraction and watermark generation. During the runtime, the watermark generator is located inside the source and is able to generate per-partition watermarks. |
| SupportsSourceWatermark | Enables to fully rely on the watermark strategy provided by the ScanTableSource itself. Thus, a CREATE TABLE DDL is able to use SOURCE_WATERMARK() which is a built-in marker function that will be detected by the planner and translated into a call to this interface if available. |
Dynamic Table Sink
根據(jù)定義,動態(tài)表可隨時間變化的。寫入動態(tài)表時,可以將內容視為以下情況:
- 可以始終將內容視為一個 changelog(有限或無限),對于該 changelog,所有更改都會連續(xù)地寫出,直到 changelog 處理完為止。
對于常規(guī)的批處理場景,Sink 僅可以接受只有 insert 的行并生成有界流。
對于常規(guī)的流處理場景,Sink 僅可以接受只有 insert 的行并生成無界流。
對于更改數(shù)據(jù)捕獲(CDC)場景,Sink 可以通過插入行、更新行和刪除行生成有界或無界流。
Table Sink 可以實現(xiàn)更多的功能接口,如 SupportsOverwrite,這些接口可能會在 Planning 期間改變實例。所有功能接口都可以 在org.apache.flink.table.connector.sink.abilities 包中找到。
DynamicTableSink 的運行時實現(xiàn)必須使用內部數(shù)據(jù)結構。因此,記錄必須作為 org.apache.flink.table.data.RowData 被接收??蚣芴峁┻\行時轉換器,使得 Sink 可以在通用數(shù)據(jù)結構上工作,并在開始時執(zhí)行轉換。
Sink Abilities
| 接口 | 描述 |
|---|---|
| SupportsOverwrite | Enables to overwrite existing data in a DynamicTableSink. By default, if this interface is not implemented, existing tables or partitions cannot be overwritten using e.g. the SQL INSERT OVERWRITE clause. |
| SupportsPartitioning | Enables to write partitioned data in a DynamicTableSink. |
| SupportsWritingMetadata | Enables to write metadata columns into a DynamicTableSource. A table sink is responsible for accepting requested metadata columns at the end of consumed rows and persist them. This includes potentially forwarding metadata columns to contained formats. |
Encoding/Decoding Formats
一些 Table Connector 接受不同格式和編碼的 key 和 value。
Format 的工作方式類似于 DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider 的處理模式,工廠負責轉換選項,Source 負責創(chuàng)建運行時邏輯。
Format 使用 Java 的 SPI(Service Provider Interface)發(fā)現(xiàn)。例如 Kafka Table Source 需要 DeserializationSchema 作為運行時接口實現(xiàn)解碼。Kafka Table Source 工廠使用 value.format 選項值找到 DeserializationFormatFactory 工廠。
當前支持以下 Format 工廠:
- org.apache.flink.table.factories.DeserializationFormatFactory
- org.apache.flink.table.factories.SerializationFormatFactory
Format 工廠轉換選項,生成 EncodingFormat 或 DecodingFormat,這些是另一種工廠,為給定的數(shù)據(jù)類型生成專門的 Format 運行時邏輯。
例如,對于 Kafka table source 工廠,DeserializationFormatFactory 會返回 EncodingFormat<DeserializationSchema> 傳入 Kafka table source。
Full Stack Example(完整示例)
本節(jié)介紹,如果使用 changelog 語義,實現(xiàn)一個具有 Decoding format 的 ScanTableSource。例子解釋了上述組件是如何一起工作的:
- 創(chuàng)建工廠并解析和驗證選項
- 實現(xiàn) Table Connector
- 實現(xiàn)和發(fā)現(xiàn)自定義 Format
- 使用提供的工具類,例如數(shù)據(jù)結構轉換器 FactoryUtil
Table Source 使用一個簡單的單線程 SourceFunction 來創(chuàng)建一個 Socket 監(jiān)聽傳入字節(jié)
CREATE TABLE UserScores (name STRING, score INT)
WITH (
'connector' = 'socket',
'hostname' = 'localhost',
'port' = '9999',
'byte-delimiter' = '10',
'format' = 'changelog-csv',
'changelog-csv.column-delimiter' = '|'
);
因為支持 changelog 語義,所以可以在運行時接收更新,并創(chuàng)建一個更新視圖,該視圖可以連續(xù)評估不斷變化的數(shù)據(jù):
SELECT name, SUM(score) FROM UserScores GROUP BY name;
在終端發(fā)送以下格式的數(shù)據(jù)
> nc -lk 9999
INSERT|Alice|12
INSERT|Bob|5
DELETE|Alice|12
INSERT|Alice|18
Factories
本節(jié)說明如何將元數(shù)據(jù)轉換為具體的 Connector 實例。這兩個工廠(SocketDynamicTableFactory 和 ChangelogCsvFormatFactory)都已添加到 META-INF/services 目錄。
SocketDynamicTableFactory
SocketDynamicTableFactory 將 CatalogTable 轉換為 TableSource。因為 Table Source 需要 decoding format,為了方便起見,使用框架提供的 FactoryUtil 來發(fā)現(xiàn)該格式。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
// 實現(xiàn) DynamicTableSourceFactory 接口
public class SocketDynamicTableFactory implements DynamicTableSourceFactory {
// 定義 options
public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
.stringType()
.noDefaultValue();
public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
.intType()
.noDefaultValue();
public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
.intType()
.defaultValue(10); // '\n'
// 工廠標識符,用來匹配 'connector'='socket'
@Override
public String factoryIdentifier() {
return "socket";
}
// 定義必填 options
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(PORT);
options.add(FactoryUtil.FORMAT); // 預定義 option,即 'format'='...'
return options;
}
// 定義可選 options
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(BYTE_DELIMITER);
return options;
}
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// 實現(xiàn)自定義驗證邏輯 或 使用提供的幫助工具類 FactoryUtil.TableFactoryHelper
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// 發(fā)現(xiàn)合適的 DecodingFormat
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
// 驗證 options
helper.validate();
final ReadableConfig options = helper.getOptions();
final String hostname = options.get(HOSTNAME);
final int port = options.get(PORT);
final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);
// 從 CatalogTable 派生生成的數(shù)據(jù)類型(不包括計算列)
final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();
// 創(chuàng)建并返回 DynamicTableSource
return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
}
}
ChangelogCsvFormatFactory
ChangelogCsvFormatFactory 將 format 選項轉換為一種 Format 實例。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
// 實現(xiàn) DeserializationFormatFactory 接口
public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {
// 定義 options
public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
.stringType()
.defaultValue("|");
// 工廠標識符,用來匹配 'format'='changelog-csv'
@Override
public String factoryIdentifier() {
return "changelog-csv";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(COLUMN_DELIMITER);
return options;
}
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
ReadableConfig formatOptions) {
// 這里實現(xiàn)自定義驗證邏輯 或 使用提供的幫助工具類 FactoryUtil
FactoryUtil.validateFactoryOptions(this, formatOptions);
final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);
// 創(chuàng)建并返回 DecodingFormat
return new ChangelogCsvFormat(columnDelimiter);
}
}
Table Source and Decoding Format
本節(jié)說明如何將計劃層的實例轉換為要提交到集群的運行時實例。
SocketDynamicTableSource
SocketDynamicTableSource 在 Planning 期間使用。主邏輯可以在 getScanRuntimeProvider() 方法中找到。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
// 實現(xiàn) ScanTableSource 接口
public class SocketDynamicTableSource implements ScanTableSource {
private final String hostname;
private final int port;
private final byte byteDelimiter;
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
private final DataType producedDataType;
// 構造函數(shù),傳入必要的參數(shù)
public SocketDynamicTableSource(
String hostname,
int port,
byte byteDelimiter,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DataType producedDataType) {
this.hostname = hostname;
this.port = port;
this.byteDelimiter = byteDelimiter;
this.decodingFormat = decodingFormat;
this.producedDataType = producedDataType;
}
@Override
public ChangelogMode getChangelogMode() {
// 這里由 Format 決定 ChangelogMode,也可以由 Source 自己決定
return decodingFormat.getChangelogMode();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
runtimeProviderContext,
producedDataType);
final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(
hostname,
port,
byteDelimiter,
deserializer);
return SourceFunctionProvider.of(sourceFunction, false);
}
@Override
public DynamicTableSource copy() {
return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
}
@Override
public String asSummaryString() {
return "Socket Table Source";
}
}
ChangelogCsvFormat
ChangelogCsvFormat 支持發(fā)出 INSERT 和 DELETE 更改。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
// 實現(xiàn) DecodingFormat<DeserializationSchema<RowData>> 接口
public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {
// 字段分隔符
private final String columnDelimiter;
public ChangelogCsvFormat(String columnDelimiter) {
this.columnDelimiter = columnDelimiter;
}
@Override
@SuppressWarnings("unchecked")
public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context,
DataType producedDataType) {
// 創(chuàng)建 TypeInformation
final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(
producedDataType);
// DeserializationSchema 中的大多數(shù)代碼將無法在內部數(shù)據(jù)結構上工作,創(chuàng)建一個數(shù)據(jù)結構轉換器用于轉換
final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);
// 在運行時使用 LogicalType
final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();
// 創(chuàng)建運行時類
return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
}
@Override
public ChangelogMode getChangelogMode() {
// 定義此 Format 可以產生插入和刪除行
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
.addContainedKind(RowKind.DELETE)
.build();
}
}
Runtime
本節(jié)說明了 SourceFunction 和 DeserializationSchema 的運行時邏輯
ChangelogCsvDeserializer
ChangelogCsvDeserializer 包含一個簡單的解析邏輯,用于將 Bytes 轉換為具有 Integer 和 String 的 Row 類型,并帶有 RowKind 信息。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.RuntimeConverter.Context;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
// 實現(xiàn) DeserializationSchema<RowData> 接口
public class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {
private final List<LogicalType> parsingTypes;
private final DataStructureConverter converter;
private final TypeInformation<RowData> producedTypeInfo;
private final String columnDelimiter;
public ChangelogCsvDeserializer(
List<LogicalType> parsingTypes,
DataStructureConverter converter,
TypeInformation<RowData> producedTypeInfo,
String columnDelimiter) {
this.parsingTypes = parsingTypes;
this.converter = converter;
this.producedTypeInfo = producedTypeInfo;
this.columnDelimiter = columnDelimiter;
}
@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
}
@Override
public void open(InitializationContext context) {
// converters must be open
converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
}
@Override
public RowData deserialize(byte[] message) {
// 解析 Row
final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
// 解析 RowKind
final RowKind kind = RowKind.valueOf(columns[0]);
final Row row = new Row(kind, parsingTypes.size());
for (int i = 0; i < parsingTypes.size(); i++) {
row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1]));
}
// 類型轉換生成 RowData
return (RowData) converter.toInternal(row);
}
private static Object parse(LogicalTypeRoot root, String value) {
switch (root) {
case INTEGER:
return Integer.parseInt(value);
case VARCHAR:
return value;
default:
throw new IllegalArgumentException();
}
}
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
}
}
SocketSourceFunction
SocketSourceFunction 開啟一個 Socket,并監(jiān)聽字節(jié)。按照給定的字節(jié)分隔符('\n')劃分記錄,并將解碼委托給 DeserializationSchema。SourceFunction 只能在并行度為1的情況下工作。
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;
// 繼承 RichSourceFunction 類,實現(xiàn) ResultTypeQueryable<RowData> 接口
public class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
private final String hostname;
private final int port;
private final byte byteDelimiter;
private final DeserializationSchema<RowData> deserializer;
private volatile boolean isRunning = true;
private Socket currentSocket;
public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) {
this.hostname = hostname;
this.port = port;
this.byteDelimiter = byteDelimiter;
this.deserializer = deserializer;
}
@Override
public TypeInformation<RowData> getProducedType() {
return deserializer.getProducedType();
}
@Override
public void open(Configuration parameters) throws Exception {
deserializer.open(() -> getRuntimeContext().getMetricGroup());
}
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
while (isRunning) {
// 創(chuàng)建 Socket 并監(jiān)聽
try (final Socket socket = new Socket()) {
currentSocket = socket;
socket.connect(new InetSocketAddress(hostname, port), 0);
try (InputStream stream = socket.getInputStream()) {
ByteArrayOutputStream buffer = new ByteArrayOutputStream();
int b;
while ((b = stream.read()) >= 0) {
if (b != byteDelimiter) {
buffer.write(b);
}
// decode 并發(fā)送數(shù)據(jù),重置 buffer
else {
ctx.collect(deserializer.deserialize(buffer.toByteArray()));
buffer.reset();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
try {
currentSocket.close();
} catch (Throwable t) {
// ignore
}
}
}