Flink 和 Pulsar 的批流融合

作者:Sijie Guo

編輯:Irene

Apache Flink 和 Apache Pulsar 的開源數(shù)據(jù)技術框架可以以不同的方式融合,來提供大規(guī)模彈性數(shù)據(jù)處理。4 月 2 日,我司 CEO 郭斯杰受邀在 Flink Forward San Francisco 2019 大會上發(fā)表演講,介紹了 Flink 和 Pulsar 在批流應用程序的融合情況。這篇文章會簡要介紹 Apache Pulsar 及其與其他消息系統(tǒng)的不同之處,并講解如何融合 Pulsar 和 Flink 協(xié)同工作,為大規(guī)模彈性數(shù)據(jù)處理提供無縫的開發(fā)人員體驗。

本文轉載自“StreamNative”,更多干貨請關注微信公眾號“StreamNative”。

Apache Pulsar 簡介

Apache Pulsar 是一個開源的分布式發(fā)布-訂閱消息系統(tǒng), 由 Apache 軟件基金會管理,并于 2018 年 9 月成為 Apache 頂級開源項目。 Pulsar 是一種多租戶、高性能解決方案,用于服務器到服務器消息傳遞,包括多個功能,例如,在一個 Pulsar 實例中對多個集群提供原生支持、集群間消息跨地域的無縫復制、發(fā)布和端到端的低延遲、超過一百萬個主題的無縫擴展以及由 Apache BookKeeper 提供的持久消息存儲保證消息傳遞?,F(xiàn)在我們來討論 Pulsar 和其他發(fā)布-訂閱消息傳遞框架之間的主要區(qū)別:

區(qū)別一

雖然 Pulsar 提供了靈活的發(fā)布-訂閱消息傳遞系統(tǒng),但它也由持久的日志存儲支持——因此需在一個框架下集成消息傳遞和存儲功能。由于 Pulsar 采用了分層架構,它可以即時故障恢復、支持獨立可擴展性和無需均衡的集群擴展。

Pulsar 的架構與其他發(fā)布-訂閱系統(tǒng)類似,框架由主題組成,而主題是主要數(shù)據(jù)實體。如下圖所示,生產(chǎn)者向主題發(fā)送數(shù)據(jù),消費者從主題接收數(shù)據(jù)。

image

區(qū)別二

第二個區(qū)別是,Pulsar 的框架構建從一開始就考慮到了多租戶。這意味著每個 Pulsar 主題都有一個分層的管理結構,使得資源分配、資源管理和團隊協(xié)作變得高效而容易。由于 Pulsar 提供屬性(租戶)級、命名空間級和主題級的資源隔離,Pulsar 的多租戶特性不僅能使數(shù)據(jù)平臺管理人員輕松擴展新的團隊,還能跨集群共享數(shù)據(jù),簡化團隊協(xié)作。

image

區(qū)別三

Pulsar 靈活的消息傳遞框架統(tǒng)一了流式和隊列數(shù)據(jù)消費模型,并提供了更大的靈活性。如下圖所示,Pulsar 保存主題中的數(shù)據(jù),而多個團隊可以根據(jù)其工作負載和數(shù)據(jù)消費模式獨立地消費數(shù)據(jù)。

image

Pulsar 數(shù)據(jù)視圖:****分片數(shù)據(jù)流

Apache Flink 是一個流式優(yōu)先計算框架,它將批處理視為流處理的特殊情況。在對數(shù)據(jù)流的看法上,F(xiàn)link 區(qū)分了有界和無界數(shù)據(jù)流之間的批處理和流處理,并假設對于批處理工作負載數(shù)據(jù)流是有限的,具有開始和結束。

在數(shù)據(jù)層上,Apache Pulsar 與 Apache Flink 的觀點相似。該框架也使用流作為所有數(shù)據(jù)的統(tǒng)一視圖,分層架構允許傳統(tǒng)發(fā)布-訂閱消息傳遞,用于流式工作負載和連續(xù)數(shù)據(jù)處理;并支持分片流(Segmented Streams)和有界數(shù)據(jù)流的使用,用于批處理和靜態(tài)工作負載。

image

如下圖所示,為了并行處理數(shù)據(jù),生產(chǎn)者向主題發(fā)送數(shù)據(jù)后,Pulsar 根據(jù)數(shù)據(jù)流量對主題進行分區(qū),再在每個分區(qū)中進行分片,并使用 Apache BookKeeper 進行分片存儲。這一模式允許在同一個框架中集成傳統(tǒng)的發(fā)布-訂閱消息系統(tǒng)和分布式并行計算。

image

Flink + Pulsar 的融合

Apache Flink 和 Apache Pulsar 已經(jīng)以多種方式融合。在以下內(nèi)容中,我會介紹兩個框架間未來一些可行的融合方式,并分享一些融合使用兩個框架的示例。

未來融合方式:

Pulsar 能以不同的方式與 Apache Flink 融合,一些可行的融合包括,使用流式連接器(Streaming Connectors)支持流式工作負載,或使用批式源連接器(Batch Source Connectors)支持批式工作負載。Pulsar 還提供了對 Schema 的原生支持,可以與 Flink 集成并提供對數(shù)據(jù)的結構化訪問,例如,使用 Flink SQL 在 Pulsar 中查詢數(shù)據(jù)。另外,還能將 Pulsar 作為 Flink 的狀態(tài)后端。由于 Pulsar 具有分層架構(Apache Bookkeeper 支持下的 Streams 和 Segmented Streams),因此可以將 Pulsar 作為存儲層并存儲 Flink 狀態(tài)。

從架構的角度來看,我們可以想象兩個框架之間的融合,使用 Apache Pulsar 作為統(tǒng)一的數(shù)據(jù)層視圖,使用 Apache Flink 作為統(tǒng)一的計算、數(shù)據(jù)處理框架和 API。

現(xiàn)有融合方式

大量技術愛好者參與了此次干貨滿滿的分享請看以下內(nèi)容。

兩個框架之間的融合正在進行中,開發(fā)人員已經(jīng)可以通過多種方式融合使用 Pulsar 和 Flink。例如,在 Flink DataStream 應用程序中,Pulsar 可以作為流數(shù)據(jù)源和流接收器。開發(fā)人員能使 Flink 作業(yè)從 Pulsar 中獲取數(shù)據(jù),再進行計算并處理實時數(shù)據(jù),最后將數(shù)據(jù)作為流接收器發(fā)送回 Pulsar 主題。示例如下:

PulsarSourceBuilder<String>builder = PulsarSourceBuilder.builder(new SimpleStringSchema())

.serviceUrl(serviceUrl)

.topic(inputTopic)

.subscriptionName(subscription);

SourceFunction<String> src = builder.build();

DataStream<String> input = env.addSource(src);

DataStream<WordWithCount> wc = input

.flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {

   for (String word : line.split("\\s")) {

       collector.collect(new WordWithCount(word, 1));

   }

})

.returns(WordWithCount.class)

.keyBy("word")

.timeWindow(Time.seconds(5))

.reduce((ReduceFunction<WordWithCount>) (c1, c2) ->

   new WordWithCount(c1.word, c1.count + c2.count));

if (null != outputTopic) {

wc.addSink(new FlinkPulsarProducer<>(

   serviceUrl,

   outputTopic,

   new AuthenticationDisabled(),

   wordWithCount -> wordWithCount.toString().getBytes(UTF_8),

   wordWithCount -> wordWithCount.word

)).setParallelism(parallelism);

} else {

// print the results with a single thread, rather than in parallel

wc.print().setParallelism(1);

}

另一個開發(fā)人員可利用的框架間的融合,已經(jīng)包括將 Pulsar 用作 Flink 應用程序的流式源和流式表接收器,代碼示例如下:

PulsarSourceBuilder<String> builder = PulsarSourceBuilder.builder(new SimpleStringSchema())

   .serviceUrl(serviceUrl)

   .topic(inputTopic)

   .subscriptionName(subscription);

SourceFunction<String> src = builder.build();

DataStream<String> input = env.addSource(src);

DataStream<WordWithCount> wc = input

   .flatMap((FlatMapFunction<String, WordWithCount>) (line, collector) -> {

       for (String word : line.split("\\s")) {

           collector.collect(

               new WordWithCount(word, 1)

           );

       }

   })

   .returns(WordWithCount.class)

   .keyBy(ROUTING_KEY)

   .timeWindow(Time.seconds(5))

   .reduce((ReduceFunction<WordWithCount>) (c1, c2) ->

           new WordWithCount(c1.word, c1.count + c2.count));

tableEnvironment.registerDataStream("wc",wc);

Table table = tableEnvironment.sqlQuery("select word, count from wc");

table.printSchema();

TableSink sink = null;

if (null != outputTopic) {

sink = new PulsarJsonTableSink(serviceUrl, outputTopic, new AuthenticationDisabled(), ROUTING_KEY);

} else {

// print the results with a csv file

sink = new CsvTableSink("./examples/file", "|");

}

table.writeToSink(sink);

最后,F(xiàn)link 融合 Pulsar 作為批處理接收器,負責完成批處理工作負載。Flink 在靜態(tài)數(shù)據(jù)集完成計算之后,批處理接收器將結果發(fā)送至 Pulsar。示例如下:

// create PulsarOutputFormat instance

final OutputFormat pulsarOutputFormat =

   new PulsarOutputFormat(serviceUrl, topic, new AuthenticationDisabled(), wordWithCount -> wordWithCount.toString().getBytes());

// create DataSet

DataSet<String> textDS = env.fromElements(EINSTEIN_QUOTE);

// convert sentences to words

textDS.flatMap(new FlatMapFunction<String, WordWithCount>() {

@Override

public void flatMap(String value, Collector<WordWithCount> out) throws Exception {

   String[] words = value.toLowerCase().split(" ");

   for(String word: words) {

       out.collect(new WordWithCount(word.replace(".", ""), 1));

   }

}

})

// filter words which length is bigger than 4

.filter(wordWithCount -> wordWithCount.word.length() > 4)

// group the words

.groupBy(new KeySelector<WordWithCount, String>() {

@Override

public String getKey(WordWithCount wordWithCount) throws Exception {

   return wordWithCount.word;

}

})

// sum the word counts

.reduce(new ReduceFunction<WordWithCount>() {

@Override

public WordWithCount reduce(WordWithCount wordWithCount1, WordWithCount wordWithCount2) throws Exception {

   return  new WordWithCount(wordWithCount1.word, wordWithCount1.count + wordWithCount2.count);

}

})

// write batch data to Pulsar

.output(pulsarOutputFormat);

總結

Pulsar 和 Flink 對應用程序在數(shù)據(jù)和計算級別如何處理數(shù)據(jù)的視圖基本一致,將“批”作為“流”的特殊情況進行“流式優(yōu)先”處理。通過 Pulsar 的 Segmented Streams 方法和 Flink 在一個框架下統(tǒng)一批處理和流處理工作負載的幾個步驟,可以應用多種方法融合兩種技術,提供大規(guī)模的彈性數(shù)據(jù)處理。

歡迎訂閱 Apache Flink 和 Apache Pulsar 郵件,及時了解領域最新發(fā)展,或在社區(qū)分享您的想法和建議。更多 Pulsar 干貨和動態(tài)分享,請關注微信公眾號,我們將在后續(xù)文章中推送更多優(yōu)質內(nèi)容。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容