Flink User-defined Source & Sink

主要引用官方文檔 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/sourcessinks/

動態(tài)表(Dynamic table)是 Flink Table 和 SQL API 的核心概念,用于以統(tǒng)一的方式處理有界和無界數(shù)據(jù)。

動態(tài)表只是一個邏輯概念,F(xiàn)link 并不擁有數(shù)據(jù)本身,動態(tài)表的內容存儲在外部系統(tǒng)(如數(shù)據(jù)庫、KV存儲、消息隊列)或文件中。

動態(tài)源(Dynamic Source)和動態(tài)接收器(Dynamic Sink)可用于在外部系統(tǒng)中讀寫數(shù)據(jù)。在 Flink 文檔中,Source 和 Sink 經常歸類到術語 Connector。

Overview(概覽)

大部分情況下,不需要從頭開始創(chuàng)建一個全新的 Connector,而是希望稍微修改現(xiàn)有的 Connector。在其他情況下,實現(xiàn)者希望創(chuàng)建專門的連接器。下圖顯示了對象如何從元數(shù)據(jù)轉換為運行時對象的處理過程。

Overview

Metadata

Table API 和 SQL 都是聲明式 API,包括表的聲明。執(zhí)行 CREATE TABLE 語句會在目標 Catalog 中更新元數(shù)據(jù)。

動態(tài)表的元數(shù)據(jù)(通過 DDL 創(chuàng)建或由 Catalog 提供)表現(xiàn)為 CatalogTable 的實例。

對于大多數(shù) Catalog 實現(xiàn),不會為此類操作修改外部系統(tǒng)中的物理數(shù)據(jù)。Connector 特定的依賴也不必存在于 classpath 中。WITH 子句中聲明的選項既不進行驗證,也不進行其他解釋。

Planning

當涉及到處理執(zhí)行計劃和優(yōu)化時,CatalogTable 需要解析為 DynamicTableSource(用于在 SELECT 查詢中讀取)和 DynamicTableSink(用于在 INSERT 語句中寫入)。

DynamicTableSourceFactory 和 DynamicTableSinkFactory 提供邏輯將 CatalogTable 的元數(shù)據(jù)轉換為 DynamicTableSource 和 DynamicTableSink 的實例。在大多數(shù)情況下,工廠的目的是驗證 options(例如上圖示例中的 'port'='5022')、配置編碼/解碼格式(如果需要)、創(chuàng)建 Table connector 的參數(shù)化實例。

默認情況下,DynamicTableSourceFactory 和 DynamicTableSinkFactory 的實例是使用 Java 的 SPI(Service Provider Interface)發(fā)現(xiàn)的。connector 選項(例如示例中的 'connector'='custom')必須對應有效的 factory 標識符。

盡管在類命名上可能不明顯,但 DynamicTableSource 和 DynamicTableSink 也可以被視為有狀態(tài)工廠,最終生成具體的運行時實現(xiàn)來讀取/寫入實際數(shù)據(jù)。

Planner 使用 Source 和 Sink 實例找到最佳邏輯計劃。根據(jù)可選聲明的接口(例如 SupportsProjectionPushDown 或 SupportsOverwrite),Planner 可能會對實例應用一些更改,從而改變生成的運行時實現(xiàn)。

Runtime

邏輯計劃完成后,Planner 將獲得運行時實現(xiàn)。運行時邏輯在 Flink 的 core connector 接口(如 InputFormat 或 SourceFunction )中實現(xiàn)。

這些接口按另一抽象級別劃分為 ScanRuntimeProvider、LookupRuntimeProvider 和 SinkRuntimeProvider 的子類。

例如,OutputFormatProvider 和 SinkFunctionProvider 都是 Planner 可以處理的 SinkRuntimeProvider 的具體實例。

Extension Points(擴展點)

本節(jié)介紹擴展 Flink Table Connector 的可用接口。

Dynamic Table Factories

Dynamic table factory 用于為外部存儲系統(tǒng)配置 Dynamic table connector(根據(jù) Catalog 和 Session 信息)。

org.apache.flink.table.factories.DynamicTableSourceFactory 可以被實現(xiàn)用來構造 DynamicTableSource。

org.apache.flink.table.factories.DynamicTableSinkFactory 可以被實現(xiàn)用來構造 DynamicTableSink。

默認情況,工廠是使用 connector 選項的值作為工廠標識符。

在 JAR 文件中,可以將對新實現(xiàn)的引用添加到文件中:META-INF/services/org.apache.flink.table.factors.Factory。Flink 框架將檢查工廠唯一標識和請求的基類來確定匹配的工廠。

如果需要,Catalog 實現(xiàn)可以繞過工廠發(fā)現(xiàn)過程。為此,Catalog 需要返回一個實例,該實例實現(xiàn) org.apache.flink.table.catalog.catalog#getFactory 中請求的基類。

Dynamic Table Source

根據(jù)定義,動態(tài)表可隨時間變化的。讀取動態(tài)表時,可以將內容視為以下情況:

  • 一種 changelog(有限的或無限的),在 changelog 耗盡之前,所有的更改都被連續(xù)地處理。這由 ScanTableSource 接口表示。
  • 一種連續(xù)變化的或非常大的外部表,其內容通常不會被完全讀取,而是在必要時查詢單個值。這由 LookupTableSource 接口表示。

可以同時實現(xiàn)這兩個接口。Planner 根據(jù)指定的查詢決定如何使用。

Scan Table Source

ScanTableSource 在運行時掃描外部存儲系統(tǒng)中的所有行。

  • 對于常規(guī)批處理場景,Source 可以發(fā)出只 Insert 的有界流。
  • 對于常規(guī)流式處理場景,Source 可以發(fā)出只有 Insert 的無界流。
  • 對于更改數(shù)據(jù)捕獲(CDC)場景,Source 可以發(fā)出有界或無界流,其中包含插入、更新和刪除行。

Table Source 可以實現(xiàn)更多的功能接口,如 SupportsProjectionPushDown,這些接口可能會在 Planning 期間改變實例。所有功能接口都可以在 org.apache.flink.table.connector.source.abilities 包中找到。

ScanTableSource 的運行時實現(xiàn)必須生成內部數(shù)據(jù)結構。因此,記錄必須作為 org.apache.flink.table.data.RowData 發(fā)出??蚣芴峁┝诉\行時轉換器,使得 Source 可以在通用數(shù)據(jù)結構上工作,并在最后執(zhí)行轉換。

Lookup Table Source

LookupTableSource 在運行時通過一個或多個鍵查找外部存儲系統(tǒng)的行。

與 ScanTableSource 相比,Source 不必讀取整個表,并且可以在需要時從(表數(shù)據(jù)可能是不斷變化的)外部表中惰性地獲取單個值。

與 ScanTableSource 相比,LookupTableSource 當前只支持發(fā)出只有 Insert 的數(shù)據(jù)。

LookupTableSource 的運行時實現(xiàn)是 TableFunction 或 AsyncTableFunction。在運行時,使用指定的查找鍵的值調用該函數(shù)。

Source Abilities

以下接口只適用于 ScanTableSource,不適用于 LookupTableSource

接口 描述
SupportsFilterPushDown Enables to push down the filter into the DynamicTableSource. For efficiency, a source can push filters further down in order to be close to the actual data generation.
SupportsLimitPushDown Enables to push down a limit (the expected maximum number of produced records) into a DynamicTableSource.
SupportsPartitionPushDown Enables to pass available partitions to the planner and push down partitions into a DynamicTableSource. During the runtime, the source will only read data from the passed partition list for efficiency.
SupportsProjectionPushDown Enables to push down a (possibly nested) projection into a DynamicTableSource. For efficiency, a source can push a projection further down in order to be close to the actual data generation. If the source also implements SupportsReadingMetadata, the source will also read the required metadata only.
SupportsReadingMetadata Enables to read metadata columns from a DynamicTableSource. The source is responsible to add the required metadata at the end of the produced rows. This includes potentially forwarding metadata column from contained formats.
SupportsWatermarkPushDown Enables to push down a watermark strategy into a DynamicTableSource. The watermark strategy is a builder/factory for timestamp extraction and watermark generation. During the runtime, the watermark generator is located inside the source and is able to generate per-partition watermarks.
SupportsSourceWatermark Enables to fully rely on the watermark strategy provided by the ScanTableSource itself. Thus, a CREATE TABLE DDL is able to use SOURCE_WATERMARK() which is a built-in marker function that will be detected by the planner and translated into a call to this interface if available.

Dynamic Table Sink

根據(jù)定義,動態(tài)表可隨時間變化的。寫入動態(tài)表時,可以將內容視為以下情況:

  • 可以始終將內容視為一個 changelog(有限或無限),對于該 changelog,所有更改都會連續(xù)地寫出,直到 changelog 處理完為止。

對于常規(guī)的批處理場景,Sink 僅可以接受只有 insert 的行并生成有界流。

對于常規(guī)的流處理場景,Sink 僅可以接受只有 insert 的行并生成無界流。

對于更改數(shù)據(jù)捕獲(CDC)場景,Sink 可以通過插入行、更新行和刪除行生成有界或無界流。

Table Sink 可以實現(xiàn)更多的功能接口,如 SupportsOverwrite,這些接口可能會在 Planning 期間改變實例。所有功能接口都可以 在org.apache.flink.table.connector.sink.abilities 包中找到。

DynamicTableSink 的運行時實現(xiàn)必須使用內部數(shù)據(jù)結構。因此,記錄必須作為 org.apache.flink.table.data.RowData 被接收??蚣芴峁┻\行時轉換器,使得 Sink 可以在通用數(shù)據(jù)結構上工作,并在開始時執(zhí)行轉換。

Sink Abilities

接口 描述
SupportsOverwrite Enables to overwrite existing data in a DynamicTableSink. By default, if this interface is not implemented, existing tables or partitions cannot be overwritten using e.g. the SQL INSERT OVERWRITE clause.
SupportsPartitioning Enables to write partitioned data in a DynamicTableSink.
SupportsWritingMetadata Enables to write metadata columns into a DynamicTableSource. A table sink is responsible for accepting requested metadata columns at the end of consumed rows and persist them. This includes potentially forwarding metadata columns to contained formats.

Encoding/Decoding Formats

一些 Table Connector 接受不同格式和編碼的 key 和 value。

Format 的工作方式類似于 DynamicTableSourceFactory -> DynamicTableSource -> ScanRuntimeProvider 的處理模式,工廠負責轉換選項,Source 負責創(chuàng)建運行時邏輯。

Format 使用 Java 的 SPI(Service Provider Interface)發(fā)現(xiàn)。例如 Kafka Table Source 需要 DeserializationSchema 作為運行時接口實現(xiàn)解碼。Kafka Table Source 工廠使用 value.format 選項值找到 DeserializationFormatFactory 工廠。

當前支持以下 Format 工廠:

  • org.apache.flink.table.factories.DeserializationFormatFactory
  • org.apache.flink.table.factories.SerializationFormatFactory

Format 工廠轉換選項,生成 EncodingFormat 或 DecodingFormat,這些是另一種工廠,為給定的數(shù)據(jù)類型生成專門的 Format 運行時邏輯。

例如,對于 Kafka table source 工廠,DeserializationFormatFactory 會返回 EncodingFormat<DeserializationSchema> 傳入 Kafka table source。

Full Stack Example(完整示例)

本節(jié)介紹,如果使用 changelog 語義,實現(xiàn)一個具有 Decoding format 的 ScanTableSource。例子解釋了上述組件是如何一起工作的:

  • 創(chuàng)建工廠并解析和驗證選項
  • 實現(xiàn) Table Connector
  • 實現(xiàn)和發(fā)現(xiàn)自定義 Format
  • 使用提供的工具類,例如數(shù)據(jù)結構轉換器 FactoryUtil

Table Source 使用一個簡單的單線程 SourceFunction 來創(chuàng)建一個 Socket 監(jiān)聽傳入字節(jié)

CREATE TABLE UserScores (name STRING, score INT)
WITH (
  'connector' = 'socket',
  'hostname' = 'localhost',
  'port' = '9999',
  'byte-delimiter' = '10',
  'format' = 'changelog-csv',
  'changelog-csv.column-delimiter' = '|'
);

因為支持 changelog 語義,所以可以在運行時接收更新,并創(chuàng)建一個更新視圖,該視圖可以連續(xù)評估不斷變化的數(shù)據(jù):

SELECT name, SUM(score) FROM UserScores GROUP BY name;

在終端發(fā)送以下格式的數(shù)據(jù)

> nc -lk 9999
INSERT|Alice|12
INSERT|Bob|5
DELETE|Alice|12
INSERT|Alice|18

Factories

本節(jié)說明如何將元數(shù)據(jù)轉換為具體的 Connector 實例。這兩個工廠(SocketDynamicTableFactory 和 ChangelogCsvFormatFactory)都已添加到 META-INF/services 目錄。

SocketDynamicTableFactory

SocketDynamicTableFactory 將 CatalogTable 轉換為 TableSource。因為 Table Source 需要 decoding format,為了方便起見,使用框架提供的 FactoryUtil 來發(fā)現(xiàn)該格式。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;

// 實現(xiàn) DynamicTableSourceFactory 接口
public class SocketDynamicTableFactory implements DynamicTableSourceFactory {

  // 定義 options 
  public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
    .stringType()
    .noDefaultValue();

  public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
    .intType()
    .noDefaultValue();

  public static final ConfigOption<Integer> BYTE_DELIMITER = ConfigOptions.key("byte-delimiter")
    .intType()
    .defaultValue(10); // '\n'

  // 工廠標識符,用來匹配 'connector'='socket'
  @Override
  public String factoryIdentifier() {
    return "socket"; 
  }

  // 定義必填 options
  @Override
  public Set<ConfigOption<?>> requiredOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(HOSTNAME);
    options.add(PORT);
    options.add(FactoryUtil.FORMAT); // 預定義 option,即 'format'='...'
    return options;
  }

  // 定義可選 options
  @Override
  public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(BYTE_DELIMITER);
    return options;
  }

  @Override
  public DynamicTableSource createDynamicTableSource(Context context) {
    // 實現(xiàn)自定義驗證邏輯 或 使用提供的幫助工具類 FactoryUtil.TableFactoryHelper
    final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

    // 發(fā)現(xiàn)合適的 DecodingFormat 
    final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
      DeserializationFormatFactory.class,
      FactoryUtil.FORMAT);

    // 驗證 options
    helper.validate();

    final ReadableConfig options = helper.getOptions();
    final String hostname = options.get(HOSTNAME);
    final int port = options.get(PORT);
    final byte byteDelimiter = (byte) (int) options.get(BYTE_DELIMITER);

    // 從 CatalogTable 派生生成的數(shù)據(jù)類型(不包括計算列)
    final DataType producedDataType = context.getCatalogTable().getSchema().toPhysicalRowDataType();

    // 創(chuàng)建并返回 DynamicTableSource
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
  }
}

ChangelogCsvFormatFactory

ChangelogCsvFormatFactory 將 format 選項轉換為一種 Format 實例。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;

// 實現(xiàn) DeserializationFormatFactory 接口
public class ChangelogCsvFormatFactory implements DeserializationFormatFactory {

  // 定義 options 
  public static final ConfigOption<String> COLUMN_DELIMITER = ConfigOptions.key("column-delimiter")
    .stringType()
    .defaultValue("|");

  // 工廠標識符,用來匹配 'format'='changelog-csv'
  @Override
  public String factoryIdentifier() {
    return "changelog-csv";
  }

  @Override
  public Set<ConfigOption<?>> requiredOptions() {
    return Collections.emptySet();
  }

  @Override
  public Set<ConfigOption<?>> optionalOptions() {
    final Set<ConfigOption<?>> options = new HashSet<>();
    options.add(COLUMN_DELIMITER);
    return options;
  }

  @Override
  public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
      DynamicTableFactory.Context context,
      ReadableConfig formatOptions) {
    // 這里實現(xiàn)自定義驗證邏輯 或 使用提供的幫助工具類 FactoryUtil
    FactoryUtil.validateFactoryOptions(this, formatOptions);

    final String columnDelimiter = formatOptions.get(COLUMN_DELIMITER);

    // 創(chuàng)建并返回 DecodingFormat
    return new ChangelogCsvFormat(columnDelimiter);
  }
}

Table Source and Decoding Format

本節(jié)說明如何將計劃層的實例轉換為要提交到集群的運行時實例。

SocketDynamicTableSource

SocketDynamicTableSource 在 Planning 期間使用。主邏輯可以在 getScanRuntimeProvider() 方法中找到。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

// 實現(xiàn) ScanTableSource 接口
public class SocketDynamicTableSource implements ScanTableSource {

  private final String hostname;
  private final int port;
  private final byte byteDelimiter;
  private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
  private final DataType producedDataType;
  
  // 構造函數(shù),傳入必要的參數(shù)
  public SocketDynamicTableSource(
      String hostname,
      int port,
      byte byteDelimiter,
      DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
      DataType producedDataType) {
    this.hostname = hostname;
    this.port = port;
    this.byteDelimiter = byteDelimiter;
    this.decodingFormat = decodingFormat;
    this.producedDataType = producedDataType;
  }

  @Override
  public ChangelogMode getChangelogMode() {
    // 這里由 Format 決定 ChangelogMode,也可以由 Source 自己決定
    return decodingFormat.getChangelogMode();
  }

  @Override
  public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
    final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
      runtimeProviderContext,
      producedDataType);
      
    final SourceFunction<RowData> sourceFunction = new SocketSourceFunction(
      hostname,
      port,
      byteDelimiter,
      deserializer);

    return SourceFunctionProvider.of(sourceFunction, false);
  }

  @Override
  public DynamicTableSource copy() {
    return new SocketDynamicTableSource(hostname, port, byteDelimiter, decodingFormat, producedDataType);
  }

  @Override
  public String asSummaryString() {
    return "Socket Table Source";
  }
}

ChangelogCsvFormat

ChangelogCsvFormat 支持發(fā)出 INSERT 和 DELETE 更改。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;

// 實現(xiàn) DecodingFormat<DeserializationSchema<RowData>> 接口
public class ChangelogCsvFormat implements DecodingFormat<DeserializationSchema<RowData>> {
  // 字段分隔符
  private final String columnDelimiter;

  public ChangelogCsvFormat(String columnDelimiter) {
    this.columnDelimiter = columnDelimiter;
  }

  @Override
  @SuppressWarnings("unchecked")
  public DeserializationSchema<RowData> createRuntimeDecoder(
      DynamicTableSource.Context context,
      DataType producedDataType) {
    // 創(chuàng)建 TypeInformation
    final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(
      producedDataType);

    // DeserializationSchema 中的大多數(shù)代碼將無法在內部數(shù)據(jù)結構上工作,創(chuàng)建一個數(shù)據(jù)結構轉換器用于轉換
    final DataStructureConverter converter = context.createDataStructureConverter(producedDataType);

    // 在運行時使用 LogicalType
    final List<LogicalType> parsingTypes = producedDataType.getLogicalType().getChildren();

    // 創(chuàng)建運行時類
    return new ChangelogCsvDeserializer(parsingTypes, converter, producedTypeInfo, columnDelimiter);
  }

  @Override
  public ChangelogMode getChangelogMode() {
    // 定義此 Format 可以產生插入和刪除行
    return ChangelogMode.newBuilder()
      .addContainedKind(RowKind.INSERT)
      .addContainedKind(RowKind.DELETE)
      .build();
  }
}

Runtime

本節(jié)說明了 SourceFunction 和 DeserializationSchema 的運行時邏輯

ChangelogCsvDeserializer

ChangelogCsvDeserializer 包含一個簡單的解析邏輯,用于將 Bytes 轉換為具有 Integer 和 String 的 Row 類型,并帶有 RowKind 信息。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.RuntimeConverter.Context;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;

// 實現(xiàn) DeserializationSchema<RowData> 接口
public class ChangelogCsvDeserializer implements DeserializationSchema<RowData> {

  private final List<LogicalType> parsingTypes;
  private final DataStructureConverter converter;
  private final TypeInformation<RowData> producedTypeInfo;
  private final String columnDelimiter;

  public ChangelogCsvDeserializer(
      List<LogicalType> parsingTypes,
      DataStructureConverter converter,
      TypeInformation<RowData> producedTypeInfo,
      String columnDelimiter) {
    this.parsingTypes = parsingTypes;
    this.converter = converter;
    this.producedTypeInfo = producedTypeInfo;
    this.columnDelimiter = columnDelimiter;
  }

  @Override
  public TypeInformation<RowData> getProducedType() {
    return producedTypeInfo;
  }

  @Override
  public void open(InitializationContext context) {
    // converters must be open
    converter.open(Context.create(ChangelogCsvDeserializer.class.getClassLoader()));
  }

  @Override
  public RowData deserialize(byte[] message) {
    // 解析 Row
    final String[] columns = new String(message).split(Pattern.quote(columnDelimiter));
    // 解析 RowKind
    final RowKind kind = RowKind.valueOf(columns[0]);
    final Row row = new Row(kind, parsingTypes.size());
    for (int i = 0; i < parsingTypes.size(); i++) {
      row.setField(i, parse(parsingTypes.get(i).getTypeRoot(), columns[i + 1]));
    }
    
    // 類型轉換生成 RowData
    return (RowData) converter.toInternal(row);
  }

  private static Object parse(LogicalTypeRoot root, String value) {
    switch (root) {
      case INTEGER:
        return Integer.parseInt(value);
      case VARCHAR:
        return value;
      default:
        throw new IllegalArgumentException();
    }
  }

  @Override
  public boolean isEndOfStream(RowData nextElement) {
    return false;
  }
}

SocketSourceFunction

SocketSourceFunction 開啟一個 Socket,并監(jiān)聽字節(jié)。按照給定的字節(jié)分隔符('\n')劃分記錄,并將解碼委托給 DeserializationSchema。SourceFunction 只能在并行度為1的情況下工作。

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;

// 繼承 RichSourceFunction 類,實現(xiàn) ResultTypeQueryable<RowData> 接口
public class SocketSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {

  private final String hostname;
  private final int port;
  private final byte byteDelimiter;
  private final DeserializationSchema<RowData> deserializer;

  private volatile boolean isRunning = true;
  private Socket currentSocket;

  public SocketSourceFunction(String hostname, int port, byte byteDelimiter, DeserializationSchema<RowData> deserializer) {
    this.hostname = hostname;
    this.port = port;
    this.byteDelimiter = byteDelimiter;
    this.deserializer = deserializer;
  }

  @Override
  public TypeInformation<RowData> getProducedType() {
    return deserializer.getProducedType();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    deserializer.open(() -> getRuntimeContext().getMetricGroup());
  }

  @Override
  public void run(SourceContext<RowData> ctx) throws Exception {
    while (isRunning) {
      // 創(chuàng)建 Socket 并監(jiān)聽
      try (final Socket socket = new Socket()) {
        currentSocket = socket;
        socket.connect(new InetSocketAddress(hostname, port), 0);
        try (InputStream stream = socket.getInputStream()) {
          ByteArrayOutputStream buffer = new ByteArrayOutputStream();
          int b;
          while ((b = stream.read()) >= 0) {
            if (b != byteDelimiter) {
              buffer.write(b);
            }
            // decode 并發(fā)送數(shù)據(jù),重置 buffer
            else {
              ctx.collect(deserializer.deserialize(buffer.toByteArray()));
              buffer.reset();
            }
          }
        }
      } catch (Throwable t) {
        t.printStackTrace(); 
      }
      Thread.sleep(1000);
    }
  }

  @Override
  public void cancel() {
    isRunning = false;
    try {
      currentSocket.close();
    } catch (Throwable t) {
      // ignore
    }
  }
}
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容