作者:Sijie Guo
編輯:Irene
Apache Flink 和 Apache Pulsar 的開源數(shù)據(jù)技術框架可以以不同的方式融合,來提供大規(guī)模彈性數(shù)據(jù)處理。4 月 2 日,我司 CEO 郭斯杰受邀在 Flink Forward San Francisco 2019 大會上發(fā)表演講,介紹了 Flink 和 Pulsar 在批流應用程序的融合情況。這篇文章會簡要介紹 Apache Pulsar 及其與其他消息系統(tǒng)的不同之處,并講解如何融合 Pulsar 和 Flink 協(xié)同工作,為大規(guī)模彈性數(shù)據(jù)處理提供無縫的開發(fā)人員體驗。
本文轉載自“StreamNative”,更多干貨請關注微信公眾號“StreamNative”。
Apache Pulsar 簡介
Apache Pulsar 是一個開源的分布式發(fā)布-訂閱消息系統(tǒng), 由 Apache 軟件基金會管理,并于 2018 年 9 月成為 Apache 頂級開源項目。 Pulsar 是一種多租戶、高性能解決方案,用于服務器到服務器消息傳遞,包括多個功能,例如,在一個 Pulsar 實例中對多個集群提供原生支持、集群間消息跨地域的無縫復制、發(fā)布和端到端的低延遲、超過一百萬個主題的無縫擴展以及由 Apache BookKeeper 提供的持久消息存儲保證消息傳遞?,F(xiàn)在我們來討論 Pulsar 和其他發(fā)布-訂閱消息傳遞框架之間的主要區(qū)別:
區(qū)別一
雖然 Pulsar 提供了靈活的發(fā)布-訂閱消息傳遞系統(tǒng),但它也由持久的日志存儲支持——因此需在一個框架下集成消息傳遞和存儲功能。由于 Pulsar 采用了分層架構,它可以即時故障恢復、支持獨立可擴展性和無需均衡的集群擴展。
Pulsar 的架構與其他發(fā)布-訂閱系統(tǒng)類似,框架由主題組成,而主題是主要數(shù)據(jù)實體。如下圖所示,生產(chǎn)者向主題發(fā)送數(shù)據(jù),消費者從主題接收數(shù)據(jù)。
區(qū)別二
第二個區(qū)別是,Pulsar 的框架構建從一開始就考慮到了多租戶。這意味著每個 Pulsar 主題都有一個分層的管理結構,使得資源分配、資源管理和團隊協(xié)作變得高效而容易。由于 Pulsar 提供屬性(租戶)級、命名空間級和主題級的資源隔離,Pulsar 的多租戶特性不僅能使數(shù)據(jù)平臺管理人員輕松擴展新的團隊,還能跨集群共享數(shù)據(jù),簡化團隊協(xié)作。
區(qū)別三
Pulsar 靈活的消息傳遞框架統(tǒng)一了流式和隊列數(shù)據(jù)消費模型,并提供了更大的靈活性。如下圖所示,Pulsar 保存主題中的數(shù)據(jù),而多個團隊可以根據(jù)其工作負載和數(shù)據(jù)消費模式獨立地消費數(shù)據(jù)。
Pulsar 數(shù)據(jù)視圖:****分片數(shù)據(jù)流
Apache Flink 是一個流式優(yōu)先計算框架,它將批處理視為流處理的特殊情況。在對數(shù)據(jù)流的看法上,F(xiàn)link 區(qū)分了有界和無界數(shù)據(jù)流之間的批處理和流處理,并假設對于批處理工作負載數(shù)據(jù)流是有限的,具有開始和結束。
在數(shù)據(jù)層上,Apache Pulsar 與 Apache Flink 的觀點相似。該框架也使用流作為所有數(shù)據(jù)的統(tǒng)一視圖,分層架構允許傳統(tǒng)發(fā)布-訂閱消息傳遞,用于流式工作負載和連續(xù)數(shù)據(jù)處理;并支持分片流(Segmented Streams)和有界數(shù)據(jù)流的使用,用于批處理和靜態(tài)工作負載。
如下圖所示,為了并行處理數(shù)據(jù),生產(chǎn)者向主題發(fā)送數(shù)據(jù)后,Pulsar 根據(jù)數(shù)據(jù)流量對主題進行分區(qū),再在每個分區(qū)中進行分片,并使用 Apache BookKeeper 進行分片存儲。這一模式允許在同一個框架中集成傳統(tǒng)的發(fā)布-訂閱消息系統(tǒng)和分布式并行計算。
Flink + Pulsar 的融合
Apache Flink 和 Apache Pulsar 已經(jīng)以多種方式融合。在以下內(nèi)容中,我會介紹兩個框架間未來一些可行的融合方式,并分享一些融合使用兩個框架的示例。
未來融合方式:
Pulsar 能以不同的方式與 Apache Flink 融合,一些可行的融合包括,使用流式連接器(Streaming Connectors)支持流式工作負載,或使用批式源連接器(Batch Source Connectors)支持批式工作負載。Pulsar 還提供了對 Schema 的原生支持,可以與 Flink 集成并提供對數(shù)據(jù)的結構化訪問,例如,使用 Flink SQL 在 Pulsar 中查詢數(shù)據(jù)。另外,還能將 Pulsar 作為 Flink 的狀態(tài)后端。由于 Pulsar 具有分層架構(Apache Bookkeeper 支持下的 Streams 和 Segmented Streams),因此可以將 Pulsar 作為存儲層并存儲 Flink 狀態(tài)。
從架構的角度來看,我們可以想象兩個框架之間的融合,使用 Apache Pulsar 作為統(tǒng)一的數(shù)據(jù)層視圖,使用 Apache Flink 作為統(tǒng)一的計算、數(shù)據(jù)處理框架和 API。
現(xiàn)有融合方式
大量技術愛好者參與了此次干貨滿滿的分享請看以下內(nèi)容。
兩個框架之間的融合正在進行中,開發(fā)人員已經(jīng)可以通過多種方式融合使用 Pulsar 和 Flink。例如,在 Flink DataStream 應用程序中,Pulsar 可以作為流數(shù)據(jù)源和流接收器。開發(fā)人員能使 Flink 作業(yè)從 Pulsar 中獲取數(shù)據(jù),再進行計算并處理實時數(shù)據(jù),最后將數(shù)據(jù)作為流接收器發(fā)送回 Pulsar 主題。示例如下:
PulsarSourceBuilder<String>builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
.serviceUrl(serviceUrl)
.topic(inputTopic)
.subscriptionName(subscription);
SourceFunction<String> src = builder.build();
DataStream<String> input = env.addSource(src);
DataStream<WordWithCount> wc = input
.flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
for (String word : line.split("\\s")) {
collector.collect(new WordWithCount(word, 1));
}
})
.returns(WordWithCount.class)
.keyBy("word")
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
new WordWithCount(c1.word, c1.count + c2.count));
if (null != outputTopic) {
wc.addSink(new FlinkPulsarProducer<>(
serviceUrl,
outputTopic,
new AuthenticationDisabled(),
wordWithCount -> wordWithCount.toString().getBytes(UTF_8),
wordWithCount -> wordWithCount.word
)).setParallelism(parallelism);
} else {
// print the results with a single thread, rather than in parallel
wc.print().setParallelism(1);
}
另一個開發(fā)人員可利用的框架間的融合,已經(jīng)包括將 Pulsar 用作 Flink 應用程序的流式源和流式表接收器,代碼示例如下:
PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())
.serviceUrl(serviceUrl)
.topic(inputTopic)
.subscriptionName(subscription);
SourceFunction<String> src = builder.build();
DataStream<String> input = env.addSource(src);
DataStream<WordWithCount> wc = input
.flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {
for (String word : line.split("\\s")) {
collector.collect(
new WordWithCount(word, 1)
);
}
})
.returns(WordWithCount.class)
.keyBy(ROUTING_KEY)
.timeWindow(Time.seconds(5))
.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->
new WordWithCount(c1.word, c1.count + c2.count));
tableEnvironment.registerDataStream("wc",wc);
Table table = tableEnvironment.sqlQuery("select word, count from wc");
table.printSchema();
TableSink sink = null;
if (null != outputTopic) {
sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY);
} else {
// print the results with a csv file
sink = new CsvTableSink("./examples/file", "|");
}
table.writeToSink(sink);
最后,F(xiàn)link 融合 Pulsar 作為批處理接收器,負責完成批處理工作負載。Flink 在靜態(tài)數(shù)據(jù)集完成計算之后,批處理接收器將結果發(fā)送至 Pulsar。示例如下:
// create PulsarOutputFormat instance
final OutputFormat pulsarOutputFormat =
new PulsarOutputFormat(serviceUrl, topic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes());
// create DataSet
DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);
// convert sentences to words
textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {
@Override
public void flatMap(String value, Collector<WordWithCount> out) throws Exception {
String[] words = value.toLowerCase().split(" ");
for(String word: words) {
out.collect(new WordWithCount(word.replace(".", ""), 1));
}
}
})
// filter words which length is bigger than 4
.filter(wordWithCount -> wordWithCount.word.length() > 4)
// group the words
.groupBy(new KeySelector<WordWithCount, String>() {
@Override
public String getKey(WordWithCount wordWithCount) throws Exception {
return wordWithCount.word;
}
})
// sum the word counts
.reduce(new ReduceFunction<WordWithCount>() {
@Override
public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2) throws Exception {
return new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count);
}
})
// write batch data to Pulsar
.output(pulsarOutputFormat);
總結
Pulsar 和 Flink 對應用程序在數(shù)據(jù)和計算級別如何處理數(shù)據(jù)的視圖基本一致,將“批”作為“流”的特殊情況進行“流式優(yōu)先”處理。通過 Pulsar 的 Segmented Streams 方法和 Flink 在一個框架下統(tǒng)一批處理和流處理工作負載的幾個步驟,可以應用多種方法融合兩種技術,提供大規(guī)模的彈性數(shù)據(jù)處理。
歡迎訂閱 Apache Flink 和 Apache Pulsar 郵件,及時了解領域最新發(fā)展,或在社區(qū)分享您的想法和建議。更多 Pulsar 干貨和動態(tài)分享,請關注微信公眾號,我們將在后續(xù)文章中推送更多優(yōu)質內(nèi)容。