背景
最近工作中需要自定義開發(fā)一些flink sql的connector,因?yàn)楣俜教峁┑腸onnector畢竟有限,在我們工作中可能會(huì)用到各種各樣的中間件。所以官方?jīng)]有提供的就需要我們自定義開發(fā)。
就是如:
CREATE TABLE XXX(
A STRING,
B BIGINT)
WITH(
'connect.type' = 'kafka',
...
)
所以開發(fā)一個(gè)自己的connector需要做哪些,本文就來總結(jié)一下開發(fā)的主要步驟,以及我遇到的問題怎么解決的。
開發(fā)
自定義Factory,根據(jù)需要實(shí)現(xiàn)StreamTableSourceFactory和StreamTableSinkFactory
根據(jù)需要繼承ConnectorDescriptorValidator,定義自己的connector參數(shù)(with 后面跟的那些)
Factory中的requiredContext、supportedProperties都比較重要,框架中對(duì)Factory的過濾和檢查需要他們
需要自定義個(gè)TableSink,根據(jù)你需要連接的中間件選擇是AppendStreamTableSink、Upsert、Retract
重寫consumeDataStream方法
- 自定義一個(gè)SinkFunction,在invoke方法中實(shí)現(xiàn)將數(shù)據(jù)寫入到外部中間件。
以上5步基本上可以寫一個(gè)簡單的sql-connector了
問題
-
異常信息:
org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in the classpath. Reason: Required context properties mismatch. The following properties are requested: connector.address=localhost:9091 connector.job=testJob connector.metrics=testMetrics connector.type=xxxx schema.0.data-type=ROW<`val` DOUBLE> schema.0.name=value有兩個(gè)問題都導(dǎo)致上面的報(bào)錯(cuò)
- discoverFactory時(shí)找不到我自定義的Factory
解決方法:
添加如下目錄及文件
- 根據(jù)properties過濾時(shí)過濾掉了我的Factory,代碼在TableFactoryService#filterBySupportedProperties
? 解決方法:在自定義Factory#supportedProperties方法中添加schema的配置
// schema
properties.add(SCHEMA + ".#." + SCHEMA_DATA_TYPE);
properties.add(SCHEMA + ".#." + SCHEMA_NAME);
2.序列化問題
異常信息:
xxx類不能被序列化
原因:開始在我的sinkFunction中有個(gè)TableSchema屬性,該屬性不能被序列化,TableSchema我是用來獲取字段信息的,后來直接改成了fieldName數(shù)組,從TableSchema.getFieldNames()獲取。
改完后又報(bào)了我使用的Util類不能序列化,我把util實(shí)現(xiàn)了Serializable接口解決
3.sink類型不匹配問題
異常如下:
org.apache.flink.table.api.TableException: The StreamTableSink#consumeDataStream(DataStream) must be implemented and return the sink transformation DataStreamSink. However, org.apache.flink.connector.prometheus.xxxTableSink doesn't implement this method.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:142)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:59)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:59)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:153)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:682)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:495)
at PrometheusConnectorTest.test(PrometheusConnectorTest.java:13)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)
但是我xxxTableSink實(shí)現(xiàn)的是AppendStreamTableSink,AppendStreamTableSink繼承了StreamTableSink
解決方法:
實(shí)現(xiàn)一個(gè)emitDataStream空方法,重寫consumeDataStream即可
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Row> dataStream) {
return dataStream
.addSink(new xxxSinkFunction())
.setParallelism(dataStream.getParallelism())
.name(TableConnectorUtils.generateRuntimeName(this.getClass(), getFieldNames()));
}
暫時(shí)還沒涉及到TableFormat的自定義,如果后面涉及到這方面的開發(fā)再來分享
詳細(xì)代碼可以參考我的git項(xiàng)目
https://github.com/zhuxiaoshang/flink-be-god/tree/branch_1.10/flink-connector/flink-sql-connector-customized/src/main/java/sql/connector/customized
如果對(duì)你幫助,幫忙點(diǎn)個(gè)star。