原作者: Sijie Guo、Markos Sfikas
翻譯:StreamNative-Sijia
在之前的博客中,我們介紹了 Apache Pulsar 及其與其他消息系統(tǒng)的不同之處,并講解了如何融合 Pulsar 和 Flink 協(xié)同工作,為大規(guī)模彈性數(shù)據(jù)處理提供無縫的開發(fā)人員體驗。本文將介紹 Apache Pulsar 和 Apache Flink 的集成和最新研發(fā)進展,并詳細說明如何利用 Pulsar 內(nèi)置 schema,使用 Apache Flink 實時查詢 Pulsar 流。
Apache Pulsar 簡介
Apache Pulsar 是一個靈活的發(fā)布/訂閱消息系統(tǒng),支持持久日志存儲。Pulsar 的架構(gòu)優(yōu)勢包括多租戶、統(tǒng)一消息模型、結(jié)構(gòu)化事件流、云原生架構(gòu)等,這些優(yōu)勢讓 Pulsar 能夠完美適用于多種用戶場景,從計費、支付、交易服務到融合組織中不同的消息架構(gòu)。更多關(guān)于 Pulsar 的信息,點擊 Apache Pulsar documentation 或通過 Slack 與 Pulsar 社區(qū)聯(lián)系。
現(xiàn)有 Pulsar & Flink 集成(Apache Flink 1.6+)
在現(xiàn)有的 Pulsar 和 Flink 集成中,Pulsar 作為 Flink 應用程序中的消息隊列來使用。Flink 開發(fā)人員可以選擇特定 Pulsar source,并連接到所需的 Puslar 集群和 topic,將 Pulsar 用作 Flink 的流 source 和流 sink:
// create and configure Pulsar consumer
PulsarSourceBuilder<String>builder = PulsarSourceBuilder
.builder(new SimpleStringSchema())
.serviceUrl(serviceUrl)
.topic(inputTopic)
.subsciptionName(subscription);
SourceFunction<String> src = builder.build();
// ingest DataStream with Pulsar consumer
DataStream<String> words = env.addSource(src);
然后,Pulsar 流可以連接到 Flink 的處理邏輯。
// perform computation on DataStream (here a simple WordCount)
DataStream<WordWithCount> wc = words
.flatmap((FlatMapFunction<String, WordWithCount>) (word, collector) -> {
collector.collect(new WordWithCount(word, 1));
})
.returns(WordWithCount.class)
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
new WordWithCount(c1.word, c1.count + c2.count));
然后通過 sink 將數(shù)據(jù)寫出到 Pulsar。
// emit result via Pulsar producer
wc.addSink(new FlinkPulsarProducer<>(
serviceUrl,
outputTopic,
new AuthentificationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
wordWithCount -> wordWithCount.word)
);
對于集成而言,這是重要的第一步,但現(xiàn)有設計還不足以充分利用 Pulsar 的全部功能。Pulsar 與 Flink 1.6.0 的集成中有一些不足,包括:既沒有作為持久存儲來使用,也沒有與 Flink 進行 schema 集成,導致在為應用程序 schema 注冊添加描述時,需要手動輸入。
Pulsar 與 Flink 1.9 的集成:將 Pulsar 用作 Flink catalog
Flink 1.9.0 與 Pulsar 的最新集成解決了前面提到的問題。阿里巴巴 Blink 對 Flink 倉庫的貢獻不僅強化了處理架構(gòu),還增加了新功能,使得 Flink 與 Pulsar 的集成更強大有效。在新 connector 的實現(xiàn)中引入了 Pulsar schema 集成,增加了對 Table API 的支持,同時提供了 exactly-once 語義的 Pulsar 讀與 at-least-once 語義的 Pulsar 寫。并且,通過 schema 集成,Pulsar 可以注冊為 Flink catalog,只需幾個命令就可以在 Pulsar 流上運行 Flink 查詢。下面我們將詳細介紹新的集成,并舉例說明如何使用 Flink SQL 查詢 Pulsar 流。
利用 Flink <> Pulsar Schema 集成
在展開集成細節(jié)與具體的使用方法之前,我們先來看一下 Pulsar schema 是怎么工作的。Apache Pulsar 內(nèi)置對 Schema 的支持,無須額外管理 schema。Pulsar 的數(shù)據(jù) schema 與每個 topic 相關(guān)聯(lián),因此,producer 和 consumer 都可以使用預定義 schema 信息發(fā)送數(shù)據(jù),而 broker 可以驗證 schema ,并在兼容性檢查中管理 schema 多版本化和 schema 演化。
下面分別是 Pulsar schema 用于 producer 和 consumer 的示例。在 producer 端,可以指定使用 schema,并且 Pulsar 無需執(zhí)行序列化/反序列化,就可以發(fā)送一個 POJO 類。類似地,在 consumer 端,也可以指定數(shù)據(jù) schema,并且在接收到數(shù)據(jù)后,Pulsar 會立即自動驗證 schema 信息,獲取給定版本的 schema,然后將數(shù)據(jù)反序列化到 POJO 結(jié)構(gòu)。Pulsar 在 topic 的元數(shù)據(jù)中存儲 schema 信息。
// Create producer with Struct schema and send messages
Producer<User> producer = client.newProducer(Schema.AVRO(User.class)).create();
producer.newMessage()
.value(User.builder()
.userName(“pulsar-user”)
.userId(1L)
.build())
.send();
// Create consumer with Struct schema and receive messages
Consumer<User> consumer = client.newCOnsumer(Schema.AVRO(User.class)).create();
consumer.receive();
假設一個應用程序?qū)?producer 和/或 consumer 指定 schema。在接收到 schema 信息時,連接到 broker 的 producer(或 consumer)傳輸此類信息,以便 broker 在返回或拒絕該 schema 前注冊 schema、驗證 schema,并檢查 schema 兼容性,如下圖所示:
Pulsar 不僅可以處理并存儲 schema 信息,還可以在必要時處理 schema 演化(schema evolution)。Pulsar 能夠有效管理 broker 中的 schema 演化,在必要的兼容性檢查中,追蹤 schema 的所有版本。
另外,當消息發(fā)布在 producer 端時,Pulsar 會在消息的元數(shù)據(jù)中標記 schema 版本;當 consumer 接收到消息,并完成反序列化元數(shù)據(jù)時,Pulsar 將會檢查與此消息相關(guān)聯(lián)的 schema 版本,并從 broker 中獲取 schema 信息。因此,當 Pulsar 與 Flink 應用集成時,Pulsar 使用預先存在的 schema 信息,并將帶有 schema 信息的單個消息映射到 Flink 類型系統(tǒng)的不同行中。
當 Flink 用戶不直接與 schema 交互或不使用原始 schema(primitive schema)時(例如,用 topic 來存儲字符串或長數(shù)值),Pulsar 會轉(zhuǎn)換消息到 Flink 行,即“值”;或者在結(jié)構(gòu)化的 schema 類型(例如,JSON 和 AVRO)中,Pulsar 從 schema 信息中提取單個字段信息,并將字段映射到 Flink 的類型系統(tǒng)。最后,所有與消息相關(guān)的元數(shù)據(jù)信息(例如,消息密鑰、topic、發(fā)布時間、事件時間等)都會轉(zhuǎn)換到 Flink 行中的元數(shù)據(jù)字段。以下是使用原始 schema 和結(jié)構(gòu)化 schema 的兩個示例,解釋了如何將數(shù)據(jù)從 Pulsar topic 轉(zhuǎn)換到 Flink 類型系統(tǒng)。
原始 schema(Primitive Schema):
root
|-- value: DOUBLE
|-- __key: BYTES
|-- __topic: STRING
|-- __messageId: BYTES
|-- __publishTime: TIMESTAMP(3)
|-- __eventTime: TIMESTAMP(3)
結(jié)構(gòu)化 schema(Avor Schema):
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Foo {
public int i;
public float f;
public Bar bar;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public static class Bar {
public boolean b;
public String s;
}
Schema s = Schema.AVRO(Foo.getClass());
root
|-- i: INT
|-- f: FLOAT
|-- bar: ROW<`b` BOOLEAN, `s` STRING>
|-- __key: BYTES
|-- __topic: STRING
|-- __messageId: BYTES
|-- __publishTime: TIMESTAMP(3)
|-- __eventTime: TIMESTAMP(3)
當所有 schema 信息都映射到 Flink 類型系統(tǒng)時,就可以在 Flink 中根據(jù)指定 schema 信息構(gòu)建 Pulsar source、sink 或 catalog,如下所示:
Flink & Pulsar: 從 Pulsar 讀取數(shù)據(jù)
- 創(chuàng)建用于流查詢的 Pulsar source
val env = StreamExecutionEnvironment.getExecutionEnvironment
val props = new Properties()
props.setProperty("service.url", "pulsar://...")
props.setProperty("admin.url", "http://...")
props.setProperty("partitionDiscoveryIntervalMillis", "5000")
props.setProperty("startingOffsets", "earliest")
props.setProperty("topic", "test-source-topic")
val source = new FlinkPulsarSource(props)
// you don't need to provide a type information to addSource since FlinkPulsarSource is ResultTypeQueryable
val dataStream = env.addSource(source)(null)
// chain operations on dataStream of Row and sink the output
// end method chaining
env.execute()
- 將 Pusar 中的 topic 注冊為 streaming tables
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
.connect(new Pulsar().properties(props))
.inAppendMode()
.registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()
Flink & Pulsar:向 Pulsar 寫入數(shù)據(jù)
- 創(chuàng)建用于流查詢的 Pulsar sink
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = .....
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
stream.addSink(new FlinkPulsarSink(prop, DummyTopicKeyExtractor))
env.execute()
- 向 Pulsar 寫入 streaming table
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)
val prop = new Properties()
prop.setProperty("service.url", serviceUrl)
prop.setProperty("admin.url", adminUrl)
prop.setProperty("flushOnCheckpoint", "true")
prop.setProperty("failOnWrite", "true")
props.setProperty("topic", "test-sink-topic")
tEnv
.connect(new Pulsar().properties(props))
.inAppendMode()
.registerTableSource("sink-table")
val sql = "INSERT INTO sink-table ....."
tEnv.sqlUpdate(sql)
env.execute()
在以上示例中,F(xiàn)link 開發(fā)人員都無需擔心 schema 注冊、序列化/反序列化,并將 Pulsar 集群注冊為 Flink 中的 source、sink 或 streaming table。當這三個要素同時存在時,Pulsar 會被注冊為 Flink 中的 catalog,這可以極大簡化數(shù)據(jù)處理與查詢,例如,編寫程序從 Pulsar 查詢數(shù)據(jù),使用 Table API 和 SQL 查詢 Pulsar 數(shù)據(jù)流等。
未來計劃
Pulsar 與 Flink 集成的目標在于簡化如何同時使用這兩個框架來構(gòu)建統(tǒng)一的數(shù)據(jù)處理堆棧,為開發(fā)人員提供便利。相比經(jīng)典的 Lamda 架構(gòu)(在線的高速層與離線的批處理層相結(jié)合,共同運行數(shù)據(jù)計算),F(xiàn)link 和 Pulsar 的組合提供了真正統(tǒng)一的數(shù)據(jù)處理堆棧。Flink 作為統(tǒng)一的計算引擎,處理在線(流)和離線(批)工作負載,而 Pulsar 作為統(tǒng)一數(shù)據(jù)處理堆棧的統(tǒng)一數(shù)據(jù)存儲層,簡化了開發(fā)人員的工作。
在改進集成的道路上仍有很多工作要做,例如,能夠利用 Pulsar connector 對 Flink 社區(qū)的貢獻的新 source API(FLIP-27),Pulsar 中允許有效 source 并行擴展的 Key_Shared 訂閱類型等。除此之外,可以改進的方向還包括:端到端的 exactly-once 保證(目前只能在 Pulsar source 中使用,不能在 Pulsar sink 中使用),以及將 Pulsar/BookKeeper 用作 Flink 狀態(tài)后端等。
點擊此處觀看關(guān)于 Flink 和 Pulsar 集成發(fā)展的詳細介紹,此視頻來自 Flink Forward Europe 2019,也可以訂閱 Flink 開發(fā)郵件列表,獲取關(guān)于 Flink 和 Pulsar 貢獻與集成工作的最新消息。
想要隨時掌握 Pulsar 的研發(fā)進展、用戶案例和熱點話題嗎?快來關(guān)注 Apache Pulsar 和 StreamNative 微信公眾號,我們會在第一時間和您分享 Pulsar 的一切。
原文鏈接:
https://flink.apache.org/news/2019/11/25/query-pulsar-streams-using-apache-flink.html