引子:被譽為“中國大數(shù)據(jù)第一人”的涂子沛先生在其成名作《數(shù)據(jù)之巔》里提到,摩爾定律、社交媒體、數(shù)據(jù)挖掘是大數(shù)據(jù)的三大成因。IBM的研究稱,整個人類文明所獲得的全部數(shù)據(jù)中,有90%是過去兩年內產(chǎn)生的。在此背景下,包括NoSQL,Hadoop, Spark, Storm, Kylin在內的大批新技術應運而生。其中以RxJava和Reactor為代表的響應式(Reactive)編程技術針對的就是經(jīng)典的大數(shù)據(jù)4V定義(Volume,Variety,Velocity,Value)中的Velocity,即高并發(fā)問題,而在即將發(fā)布的Spring 5中,也引入了響應式編程的支持。在接下來的幾周,我會圍繞響應式編程分三期與你分享我的一些學習心得。本篇是第二篇,以Reactor框架為例介紹響應式編程的幾個關鍵特性。
前情概要:
1 響應式編程總覽
In computing, reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. - Reactive programming - Wikipedia
在上述響應式編程(后面簡稱RP)的定義中,除了異步編程,還包含兩個重要的關鍵詞:
- Data streams: 即數(shù)據(jù)流,分為靜態(tài)數(shù)據(jù)流(比如數(shù)組,文件)和動態(tài)數(shù)據(jù)流(比如事件流,日志流)兩種。基于數(shù)據(jù)流模型,RP得以提供一套統(tǒng)一的Stream風格的數(shù)據(jù)處理接口。和Java 8中的Stream API相比,RP API除了支持靜態(tài)數(shù)據(jù)流,還支持動態(tài)數(shù)據(jù)流,并且允許復用和同時接入多個訂閱者。
- The propagation of change: 變化傳播,簡單來說就是以一個數(shù)據(jù)流為輸入,經(jīng)過一連串操作轉化為另一個數(shù)據(jù)流,然后分發(fā)給各個訂閱者的過程。這就有點像函數(shù)式編程中的組合函數(shù),將多個函數(shù)串聯(lián)起來,把一組輸入數(shù)據(jù)轉化為格式迥異的輸出數(shù)據(jù)。
一個容易混淆的概念是響應式設計,雖然它的名字中也包含了“響應式”三個字,但其實和RP完全是兩碼事。響應式設計是指網(wǎng)頁能夠自動調整布局和樣式以適配不同尺寸的屏幕,屬于網(wǎng)站設計的范疇,而RP是一種關注系統(tǒng)可響應性,面向數(shù)據(jù)流的編程思想或者說編程框架。
特性
從本質上說,RP是一種異步編程框架,和其他框架相比,RP至少包含了以下三個特性:
- 描述而非執(zhí)行:在你最終調用
subscribe()方法之前,從發(fā)布端到訂閱端,沒有任何事會發(fā)生。就好比無論多長的水管,只要水龍頭不打開,水管里的水就不會流動。為了提高描述能力,RP提供了比Stream豐富的多的多的API,比如buffer(),merge(),onErrorMap()等。 - 提高吞吐量: 類似于HTTP/2中的連接復用,RP通過線程復用來提高吞吐量。在傳統(tǒng)的Servlet容器中,每來一個請求就會發(fā)起一個線程進行處理。受限于機器硬件資源,單臺服務器所能支撐的線程數(shù)是存在一個上限的,假設為T,那么應用同時能處理的請求數(shù)(吞吐量)必然也不會超過T。但對于一個使用Spring 5開發(fā)的RP應用,如果運行在像Netty這樣的異步容器中,無論有多少個請求,用于處理請求的線程數(shù)是相對固定的,因此最大吞吐量就有可能超過T。
- 背壓(Backpressure)支持:簡單來說,背壓就是一種反饋機制。在一般的Push模型中,發(fā)布者既不知道也不關心訂閱者的處理速度,當數(shù)據(jù)的發(fā)布速度超過處理速度時,需要訂閱者自己決定是緩存還是丟棄。如果使用RP,決定權就交回給發(fā)布者,訂閱者只需要根據(jù)自己的處理能力問發(fā)布者請求相應數(shù)量的數(shù)據(jù)。你可能會問這不就是Pull模型嗎?其實是不同的。在Pull模型中,訂閱者每次處理完數(shù)據(jù),都要重新發(fā)起一次請求拉取新的數(shù)據(jù),而使用背壓,訂閱者只需要發(fā)起一次請求,就能連續(xù)不斷的重復請求數(shù)據(jù)。
適用場景
了解了RP的這些特性,你可能已經(jīng)猜想到RP有哪些適用場景了。一般來說,RP適用于高并發(fā)、帶延遲操作的場景,比如以下這些情況(的組合):
- 一次請求涉及多次外部服務調用
- 非可靠的網(wǎng)絡傳輸
- 高并發(fā)下的消息處理
- 彈性計算網(wǎng)絡
代價
Every coin has two sides.
和任何框架一樣,有優(yōu)勢必然就有劣勢。RP的兩個比較大的問題是:
- 雖然復用線程有助于提高吞吐量,但一旦在某個回調函數(shù)中線程被卡住,那么這個線程上所有的請求都會被阻塞,最嚴重的情況,整個應用會被拖垮。
- 難以調試。由于RP強大的描述能力,在一個典型的RP應用中,大部分代碼都是以鏈式表達式的形式出現(xiàn),比如
flux.map(String::toUpperCase).doOnNext(s -> LOG.info("UC String {}", s)).next().subscribe(),一旦出錯,你將很難定位到具體是哪個環(huán)節(jié)出了問題。所幸的是,RP框架一般都會提供一些工具方法來輔助進行調試。
2 Reactor實戰(zhàn)
為了幫助你理解上面說的一些概念,下面我就通過幾個測試用例,演示RP的兩個關鍵特性:提高吞吐量和背壓。完整的代碼可參見我GitHub上的示例工程。
提高吞吐量
@Test
public void testImperative() throws InterruptedException {
_runInParallel(CONCURRENT_SIZE, () -> {
ImperativeRestaurantRepository.INSTANCE.insert(load);
});
}
private void _runInParallel(int nThreads, Runnable task) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
for (int i = 0; i < nThreads; i++) {
executorService.submit(task);
}
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
}
@Test
public void testReactive() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(CONCURRENT_SIZE);
for (int i = 0; i < CONCURRENT_SIZE; i++) {
ReactiveRestaurantRepository.INSTANCE.insert(load).subscribe(s -> {
}, e -> latch.countDown(), latch::countDown);
}
latch.await();
}
用例解讀:
- 第一個測試用例使用的是多線程+MongoDB Driver,同時起100個線程,每個線程往MongoDB中插入10000條數(shù)據(jù),總共100萬條數(shù)據(jù),平均用時15秒左右。
- 第二個測試用例使用的是Reactor+MongoDB Reactive Streams Driver,同樣是插入100萬條數(shù)據(jù),平均用時不到10秒,吞吐量提高了50%!
背壓
在演示測試用例之前,先看兩張圖,幫助你更形象的理解什么是背壓。


圖片出處:Dataflow and simplified reactive programming
兩張圖乍一看沒啥區(qū)別,但其實是完全兩種不同的背壓策略。第一張圖,發(fā)布速度(100/s)遠大于訂閱速度(1/s),但由于背壓的關系,發(fā)布者嚴格按照訂閱者的請求數(shù)量發(fā)送數(shù)據(jù)。第二張圖,發(fā)布速度(1/s)小于訂閱速度(100/s),當訂閱者請求100個數(shù)據(jù)時,發(fā)布者會積滿所需個數(shù)的數(shù)據(jù)再開始發(fā)送??梢钥吹?,通過背壓機制,發(fā)布者可以根據(jù)各個訂閱者的能力動態(tài)調整發(fā)布速度。
@BeforeEach
public void beforeEach() {
// initialize publisher
AtomicInteger count = new AtomicInteger();
timerPublisher = Flux.create(s ->
new Timer().schedule(new TimerTask() {
@Override
public void run() {
s.next(count.getAndIncrement());
if (count.get() == 10) {
s.complete();
}
}
}, 100, 100)
);
}
@Test
public void testNormal() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
timerPublisher
.subscribe(r -> System.out.println("Continuous consuming " + r),
e -> latch.countDown(),
latch::countDown);
latch.await();
}
@Test
public void testBackpressure() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Subscription> timerSubscription = new AtomicReference<>();
Subscriber<Integer> subscriber = new BaseSubscriber<Integer>() {
@Override
protected void hookOnSubscribe(Subscription subscription) {
timerSubscription.set(subscription);
}
@Override
protected void hookOnNext(Integer value) {
System.out.println("consuming " + value);
}
@Override
protected void hookOnComplete() {
latch.countDown();
}
@Override
protected void hookOnError(Throwable throwable) {
latch.countDown();
}
};
timerPublisher.onBackpressureDrop().subscribe(subscriber);
new Timer().schedule(new TimerTask() {
@Override
public void run() {
timerSubscription.get().request(1);
}
}, 100, 200);
latch.await();
}
用例解讀:
- 第一個測試用例演示了在理想情況下,即訂閱者的處理速度能夠跟上發(fā)布者的發(fā)布速度(以100ms為間隔產(chǎn)生10個數(shù)字),控制臺從0打印到9,一共10個數(shù)字,和發(fā)布端一致。
- 第二個測試用例故意調慢了訂閱者的處理速度(每200ms處理一個數(shù)字),同時發(fā)布者采用了Drop的背壓策略,結果控制臺只打印了一半的數(shù)字(0,2,4,6,8),另外一半的數(shù)字由于背壓的原因被發(fā)布者Drop掉了,并沒有發(fā)給訂閱者。
3 小結
通過上面的介紹,不難看出RP實際上是一種內置了發(fā)布者訂閱者模型的異步編程框架,包含了線程復用,背壓等高級特性,特別適用于高并發(fā)、有延遲的場景。
以上就是我對響應式編程的一些簡單介紹,歡迎你到我的留言板分享,和大家一起過過招。下一篇我將綜合前兩篇的內容,詳解一個完整的Spring 5示例應用,敬請期待。