感謝您的關(guān)注 + 點贊 + 再看,對博主的肯定,會督促博主持續(xù)的輸出更多的優(yōu)質(zhì)實戰(zhàn)內(nèi)容?。?!
1.序篇-本文結(jié)構(gòu)
protobuf 作為目前各大公司中最廣泛使用的高效的協(xié)議數(shù)據(jù)交換格式工具庫,會大量作為流式數(shù)據(jù)傳輸?shù)男蛄谢绞?,所以?flink sql 中如果能實現(xiàn) protobuf 的 format 會非常有用(目前社區(qū)已經(jīng)有對應(yīng)的實現(xiàn),不過目前還沒有 merge,預(yù)計在 1.14 系列版本中能 release)。
pr 見:https://github.com/apache/flink/pull/14376
這一節(jié)主要介紹 flink sql 中怎么自定義實現(xiàn) format,其中以最常使用的 protobuf 作為案例來介紹。
- 背景篇-為啥需要 protobuf format
- 目標(biāo)篇-protobuf format 預(yù)期效果
- 難點剖析篇-此框架建設(shè)的難點、目前有哪些實現(xiàn)
- 維表實現(xiàn)篇-實現(xiàn)的過程
- 總結(jié)與展望篇
如果想在本地直接測試下:
- 在后臺回復(fù)
- flink sql 知其所以然(五)| sql 自定義 protobuf format獲取源碼(源碼基于 1.13.1 實現(xiàn))
- flink sql 知其所以然(五)| sql 自定義 protobuf format獲取源碼(源碼基于 1.13.1 實現(xiàn))
- flink sql 知其所以然(五)| sql 自定義 protobuf format獲取源碼(源碼基于 1.13.1 實現(xiàn))
- 執(zhí)行源碼包中的
flink.examples.sql._05.format.formats.SocketWriteTest測試類來制造 protobuf 數(shù)據(jù) - 然后執(zhí)行源碼包中的
flink.examples.sql._05.format.formats.ProtobufFormatTest測試類來消費 protobuf 數(shù)據(jù),并且打印在 console 中,然后就可以在 console 中看到結(jié)果。
2.背景篇-為啥需要 protobuf format
關(guān)于為什么選擇 protobuf 可以看這篇文章,寫的很詳細(xì):
http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral
在實時計算的領(lǐng)域中,為了可讀性會選擇 json,為了效率以及一些已經(jīng)依賴了 grpc 的公司會選擇 protobuf 來做數(shù)據(jù)序列化,那么自然而然,日志的序列化方式也會選擇 protobuf。
而官方目前已經(jīng) release 的版本中是沒有提供 flink sql api 的 protobuf format 的。如下圖,基于 1.13 版本。
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

因此本文在介紹怎樣自定義一個 format 的同時,實現(xiàn)一個 protobuf format 來給大家使用。
3.目標(biāo)篇-protobuf format 預(yù)期效果
預(yù)期效果是先實現(xiàn)幾種最基本的數(shù)據(jù)類型,包括 protobuf 中的 message(自定義 model)、map(映射)、repeated(列表)、其他基本數(shù)據(jù)類型等,這些都是我們最常使用的類型。
預(yù)期 protobuf message 定義如下:

測試數(shù)據(jù)源數(shù)據(jù)如下,博主把 protobuf 的數(shù)據(jù)轉(zhuǎn)換為 json,以方便展示,如下圖:

預(yù)期 flink sql:
數(shù)據(jù)源表 DDL:
CREATE TABLE protobuf_source (
name STRING
, names ARRAY<STRING>
, si_map MAP<STRING, INT>
)
WITH (
'connector' = 'socket',
'hostname' = 'localhost',
'port' = '9999',
'format' = 'protobuf',
'protobuf.class-name' = 'flink.examples.sql._04.format.formats.protobuf.Test'
)
數(shù)據(jù)匯表 DDL:
CREATE TABLE print_sink (
name STRING
, names ARRAY<STRING>
, si_map MAP<STRING, INT>
) WITH (
'connector' = 'print'
)
Transform 執(zhí)行邏輯:
INSERT INTO print_sink
SELECT *
FROM protobuf_source
下面是我在本地跑的結(jié)果:


可以看到打印的結(jié)果,數(shù)據(jù)是正確的被反序列化讀入,并且最終輸出到 console。
4.難點剖析篇-目前有哪些實現(xiàn)
目前業(yè)界可以參考的實現(xiàn)如下:https://github.com/maosuhan/flink-pb, 也就是這位哥們負(fù)責(zé)目前 flink protobuf 的 format。
這種實現(xiàn)的具體使用方式如下:

其實現(xiàn)有幾個特點:
- 復(fù)雜性:用戶需要在 flink sql 程序運行時,將對應(yīng)的 protobuf java 文件引入 classpath,這個特點是復(fù)合 flink 這樣的通用框架的特點的。但是如果需要在各個公司場景要做一個流式處理平臺的場景下,各個 protobuf sdk 可能都位于不同的 jar 包中,那么其 jar 包管理可能是一個比較大的問題。
- 高效 serde:一般很多場景下為了通用化 serde protobuf message,可能會選擇 DynamicMessage 來處理 protobuf message,但是其 serde 性能相比原生 java code 的性能比較差。因為特點 1 引入了 protobuf 的 java class,所以其 serde function 可以基于 codegen 實現(xiàn),而這將極大提高 serde 效率,效率提高就代表著省錢啊,可以吹逼的。
[圖片上傳失敗...(image-66c35b-1644940704671)]
Notes:
當(dāng)然博主針對第一點也有一些想法,比如怎樣做到不依賴 protobuf java 文件,只依賴 protobuf 的 message 定義即可或者只依賴其 descriptor。
目前博主的想法如下:
- flink 程序在客戶端獲取到對應(yīng)的 protobuf message 定義
- 然后根據(jù)這個定義恢復(fù)出 proto 文件
- 客戶端本地執(zhí)行 protoc 將此文件編譯為 java 文件
- 客戶端本地動態(tài)將此 java 文件編譯并 load 到 jvm 中
- 使用 codegen 然后動態(tài)生成執(zhí)行代碼
一氣呵成?。?!
具體實現(xiàn)其實可以參考:https://stackoverflow.com/questions/28381659/how-to-compile-protocol-buffers-schema-at-runtime
5.實現(xiàn)篇-實現(xiàn)的過程
5.1.flink format 工作原理
其實上節(jié)已經(jīng)詳細(xì)描述了 flink sql 對于 source\sink\format 的加載機制。
- 通過 SPI 機制加載所有的 source\sink\format 工廠
Factory - 過濾出 DeserializationFormatFactory\SerializationFormatFactory + format 標(biāo)識的 format 工廠類
- 通過 format 工廠類創(chuàng)建出對應(yīng)的 format


如圖 serde format 是通過 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 創(chuàng)建的
// either implement your custom validation logic here ...
// or use the provided helper utility
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
// discover a suitable decoding format
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);

所有通過 SPI 的 source\sink\formt 插件都繼承自 Factory。
整體創(chuàng)建 format 方法的調(diào)用鏈如下圖。

5.2.flink protobuf format 實現(xiàn)
最終實現(xiàn)如下,涉及到了幾個實現(xiàn)類:
ProtobufFormatFactoryProtobufOptionsProtobufRowDataDeserializationSchemaProtobufToRowDataConverters

具體流程:
- 定義 SPI 的工廠類
ProtobufFormatFactory implements DeserializationFormatFactory,并且在 resource\META-INF 下創(chuàng)建 SPI 的插件文件 - 實現(xiàn)
ProtobufFormatFactory#factoryIdentifier標(biāo)識protobuf - 實現(xiàn)
ProtobufFormatFactory#createDecodingFormat來創(chuàng)建對應(yīng)的DecodingFormat<DeserializationSchema<RowData>>,DecodingFormat是用來封裝具體的反序列化器的,實現(xiàn)DecodingFormat<DeserializationSchema<RowData>>#createRuntimeDecoder,返回ProtobufRowDataDeserializationSchema - 定義
ProtobufRowDataDeserializationSchema implements DeserializationSchema<RowData>,這個就是具體的反序列化器,其實與 datastream api 相同 - 實現(xiàn)
ProtobufRowDataDeserializationSchema#deserialize方法,與 datastream 相同,這個方法就是將byte[]序列化為RowData的具體邏輯 - 注意這里還實現(xiàn)了一個類
ProtobufToRowDataConverters,其作用就是在客戶端創(chuàng)建出具體的將byte[]序列化為RowData的具體工具類,其會根據(jù)用戶定義的表字段類型動態(tài)生成數(shù)據(jù)轉(zhuǎn)換的 converter 類(策略模式:https://www.runoob.com/design-pattern/strategy-pattern.html),相當(dāng)于表的 schema 確定之后,其 converter 也會確定
上述實現(xiàn)類的具體關(guān)系如下:

介紹完流程,進入具體實現(xiàn)方案細(xì)節(jié):
ProtobufFormatFactory 主要創(chuàng)建 format 的邏輯:
public class ProtobufFormatFactory implements DeserializationFormatFactory {
public static final String IDENTIFIER = "protobuf";
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(Context context,
ReadableConfig formatOptions) {
FactoryUtil.validateFactoryOptions(this, formatOptions);
// 1.獲取到 protobuf 的 class 全路徑
final String className = formatOptions.get(PROTOBUF_CLASS_NAME);
try {
// 2.load class
Class<GeneratedMessageV3> protobufV3 =
(Class<GeneratedMessageV3>) this.getClass().getClassLoader().loadClass(className);
// 3.創(chuàng)建 DecodingFormat
return new DecodingFormat<DeserializationSchema<RowData>>() {
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context,
DataType physicalDataType) {
// 4.獲取到 table schema rowtype
final RowType rowType = (RowType) physicalDataType.getLogicalType();
// 5.創(chuàng)建對應(yīng)的 DeserializationSchema 作為反序列化器
return new ProtobufRowDataDeserializationSchema(
protobufV3
, true
, rowType);
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.insertOnly();
}
};
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
@Override
public String factoryIdentifier() {
return IDENTIFIER;
}
...
}
resources\META-INF 文件:

ProtobufRowDataDeserializationSchema 主要實現(xiàn)反序列化的邏輯:
public class ProtobufRowDataDeserializationSchema extends AbstractDeserializationSchema<RowData> {
...
private ProtobufToRowDataConverters.ProtobufToRowDataConverter runtimeConverter;
public ProtobufRowDataDeserializationSchema(
Class<? extends GeneratedMessageV3> messageClazz
, boolean ignoreParseErrors
, RowType expectedResultType) {
this.ignoreParseErrors = ignoreParseErrors;
Preconditions.checkNotNull(messageClazz, "Protobuf message class must not be null.");
this.messageClazz = messageClazz;
this.descriptorBytes = null;
this.descriptor = ProtobufUtils.getDescriptor(messageClazz);
this.defaultInstance = ProtobufUtils.getDefaultInstance(messageClazz);
// protobuf 本身的 schema
this.protobufOriginalRowType = (RowType) ProtobufSchemaConverter.convertToRowDataTypeInfo(messageClazz);
this.expectedResultType = expectedResultType;
// 1.根據(jù) table schema 動態(tài)創(chuàng)建出對應(yīng)的反序列化器
this.runtimeConverter = new ProtobufToRowDataConverters(false)
.createRowDataConverterByLogicalType(this.descriptor, this.expectedResultType);
}
@Override
public RowData deserialize(byte[] bytes) throws IOException {
if (bytes == null) {
return null;
}
try {
// 2.將 bytes 反序列化為 protobuf message
Message message = this.defaultInstance
.newBuilderForType()
.mergeFrom(bytes)
.build();
// 3.反序列化邏輯,從 protobuf message 中獲取字段轉(zhuǎn)換為 RowData
return (RowData) runtimeConverter.convert(message);
} catch (Throwable t) {
if (ignoreParseErrors) {
return null;
}
throw new IOException(
format("Failed to deserialize Protobuf '%s'.", new String(bytes)), t);
}
}
...
可以注意到上述反序列化的主要邏輯就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定義的。
ProtobufToRowDataConverters.ProtobufToRowDataConverter 其實就是一個 convertor 接口:
@FunctionalInterface
public interface ProtobufToRowDataConverter extends Serializable {
Object convert(Object object);
}
其作用就是將 protobuf message 中的每一個字段轉(zhuǎn)換成為 RowData 中的每一個字段。
ProtobufToRowDataConverters 中就定義了具體轉(zhuǎn)換邏輯,如截圖所示,每一個 LogicalType 都定義了 protobuf message 字段轉(zhuǎn)換為 flink 數(shù)據(jù)類型的邏輯:

源碼后臺回復(fù)flink sql 知其所以然(五)| 自定義 protobuf format獲取。
6.總結(jié)與展望篇
6.1.總結(jié)
本文主要是針對 flink sql protobuf format 進行了原理解釋以及對應(yīng)的實現(xiàn)。
如果你正好需要這么一個 format,直接后臺回復(fù)flink sql 知其所以然(五)| 自定義 protobuf format獲取源碼吧。
6.2.展望
當(dāng)然上述只是 protobuf format 一個基礎(chǔ)的實現(xiàn),用于生產(chǎn)環(huán)境還有很多方面可以去擴展的。
- 性能優(yōu)化、通用化:protobuf java class 本地 codegen 來提高任務(wù)性能
- 數(shù)據(jù)質(zhì)量:異常 AOP,alert 等