Vert.x 3 核心手冊之開頭篇

Vert.x的核心是一組我們稱之為Vert.x Core的Java API。

知識(shí)庫。

Vert.x核心庫提供了以下功能:

  • TCP客戶端和服務(wù)器
  • HTTP客戶端和服務(wù)器同時(shí)支持WebSocket
  • 事件總線
  • 共享數(shù)據(jù)-局部的map和集群下的分布式map
  • 定時(shí)和延遲的任務(wù)
  • 部署卸載Verticles
  • 數(shù)據(jù)報(bào)套接字
  • DNS客戶端
  • 訪問文件系統(tǒng)
  • 高可用
  • 本地傳輸
  • 集群

核心API的功能功能相對(duì)較底層以至于你可能找不到你想要的組件,例如數(shù)據(jù)庫訪問、權(quán)限授權(quán)或者高級(jí)Web,雖然我們核心包中不提供,但是不代表我們擴(kuò)展包沒有提供,
所有有此需要的開發(fā)人員可以在Vert.x ext(擴(kuò)展)包中找到你所想要的組件。

Vert.x核心包很小而且很輕量級(jí)。你可以只使用你想要的部分即可。它能夠很輕松的集成到你現(xiàn)有的項(xiàng)目中-我們不會(huì)強(qiáng)制要求你的應(yīng)用結(jié)構(gòu)以我們指定的方式使用。

你可以使用Vert.x核心支持的任何語言。但是這里有一個(gè)很酷的地方-我們并不強(qiáng)迫你直接從JavaScript或Ruby中使用Java API,畢竟,不同的語言有不同的約定和習(xí)慣用法,在Ruby開發(fā)者(例如)上強(qiáng)制Java習(xí)慣用法是很奇怪的。 與之不同的是,我們會(huì)自動(dòng)為每一種語言生成和Java API相等的約定和習(xí)慣用法。(idiomatic)

從現(xiàn)在開始,我們將使用 核心(core) 來指代 Vert.x core。

如果你使用Maven或者Gradle,在你的項(xiàng)目描述文件中添加以下依賴便可以輕松訪問Vert.x 核心API了。

  • Maven (在你的pom.xml文件中添加以下內(nèi)容)
<dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-core</artifactId>
      <version>3.5.0</version>
</dependency>
  • Gradle (在你的build.gradle文件添加以下內(nèi)容):
dependencies {
      compile 'io.vertx:vertx-core:3.5.0'
}

廢話不多說,開車?yán)瞺

沒時(shí)間解釋啦,快上車

Vert.x 之千里之行始于足下

在Vert.x-land大陸你能做的不多,除非你用Vertx對(duì)象進(jìn)行操作!(ps:沒有Vertx對(duì)象意味著你啥也干不了,哈哈)

我差點(diǎn)沒忍住笑出聲來

Vertx對(duì)象是Vert.x的控制中心,你幾乎可以用它來做任何事。(感覺有點(diǎn)像上帝(God))

包括創(chuàng)建客戶端和服務(wù)器,從事件總線中獲取一個(gè)引用,設(shè)置定時(shí)器,以及許多其他的事情。

你要如何獲取一個(gè)Vert.x的實(shí)例呢?

如果你想要集成Vert.x并獲取一個(gè)簡單的實(shí)例,那么你可以像下面這么做:

Vertx vertx = Vertx.vertx();

注意 :大多數(shù)應(yīng)用可能只需要一個(gè)Vert.x實(shí)例,但是如果你有需要它也可以創(chuàng)建多個(gè)Vert.x實(shí)例,舉個(gè)栗子,服務(wù)器和客戶端可以隔離成不同的事件總線或者不同的組。

當(dāng)創(chuàng)建Vertx對(duì)象時(shí)指定配置選項(xiàng)

如果默認(rèn)的配置選項(xiàng)你覺得不適用,那你可以在創(chuàng)建Vertx對(duì)象的時(shí)候指定配置選項(xiàng):

Vertx vertx = Vertx.vertx(new VertxOptions().setWorkerPoolSize(40));

VertxOptions 對(duì)象有很多設(shè)置選項(xiàng)供你選擇,允許你配置集群、高可用、池大小和其它各種設(shè)置。了解更多配置細(xì)節(jié)請參考Java文檔。

創(chuàng)建Vert.x集群

如果你想創(chuàng)建 Vert.x集群 (獲取更多信息請參考 事件總線 集群章節(jié)),然后通常你會(huì)使用異步變量創(chuàng)建 Vert.x 對(duì)象。

這是因?yàn)樵谝粋€(gè)集群組里面創(chuàng)建不同的 Vert.x 實(shí)例對(duì)象通常會(huì)花費(fèi)一些時(shí)間(也許幾秒鐘)。在此期間,我們不想要阻塞調(diào)用的線程,因此我們會(huì)給你一個(gè)異步的結(jié)果。

你是鏈?zhǔn)降膯幔?/h2>

眼尖的朋友可能已經(jīng)注意到了我們上面的栗子中使用了鏈?zhǔn)降腁PI。

鏈?zhǔn)紸PI能夠在多個(gè)方法中以鏈的方式互相調(diào)用 。舉個(gè)栗子:

request().response().putHeader("Content-Type", "text/plain").write("some text").end();

在 Vert.x 的API中這是很常見的模式,因此要習(xí)慣使用這種模式。

通常使用鏈?zhǔn)斤L(fēng)格來寫代碼能讓你的代碼更加簡潔,當(dāng)然我們又不是流氓,光天化日之下哪敢調(diào)戲程序員啊,如果你不喜歡鏈?zhǔn)椒椒▉頃鴮懘a我們也不會(huì)強(qiáng)制要求你使用這種方法,你完全可以無視它并以自己喜歡的風(fēng)格來書寫代碼,舉下面的一個(gè)栗子:

HttpServerResponse response = request.response();
response.putHeader("Content-Type", "text/plain");
response.write("some text");
response.end();

不要調(diào)用我們,我們會(huì)調(diào)用你

Vert.x 的API主要是 事件驅(qū)動(dòng) 的。這意味著當(dāng) Vert.x 中發(fā)生您感興趣的事情時(shí),Vert.x 會(huì)通過發(fā)送事件調(diào)用你。

下面舉了一些事件的栗子:

  • 定時(shí)器觸發(fā)
  • 數(shù)據(jù)到達(dá)套接字
  • 從磁盤讀取數(shù)據(jù)完成
  • 發(fā)生異常
  • HTTP服務(wù)器接收了一個(gè)請求

你可以通過提供一個(gè)處理器給 Vert.x 的API來處理事件。下面舉個(gè)栗子來說明接收一個(gè)定時(shí)器事件每秒打印輸出:

vertx.setPeriodic(1000, id -> {
    // This handler will get called every second'
    System.out.println("timer fired!");
});

或者接收一個(gè)HTTP的請求:

server.requestHandler(request -> {
    // This handler will be called every time an HTTP request is received at the server
    request.response().end("Hello,World!");
});

當(dāng) Vert.x 傳遞一個(gè)事件給你的處理程序,一段時(shí)間后,Vert.x 并會(huì)異步的調(diào)用它。

所以在 Vert.x 中我們有一些重要的概念需要注意:

不要阻塞我

除去少有的情況(例如:以"Sync"結(jié)尾的文件系統(tǒng)操作),Vert.x 的API都是不會(huì)阻塞調(diào)用線程的。

如果結(jié)果可以立即提供,它將會(huì)立即返回,通常情況下就需要提供一個(gè)處理器在一段時(shí)間后來接收事件。

因?yàn)?Vert.x 的API是沒有阻塞的,這意味著你可以用 Vert.x 處理大量并發(fā)但僅使用少量的線程。

當(dāng)下面的情況發(fā)生時(shí)調(diào)用的線程可能會(huì)被阻塞:

  • 從套接字上讀取數(shù)據(jù)

  • 將數(shù)據(jù)寫入磁盤

  • 發(fā)送一個(gè)消息給收信者然后等待回信

  • …? 其它多數(shù)情況

在上述所有情況下, 當(dāng)您的線程等待結(jié)果時(shí), 它不能做任何其他事情-它實(shí)際上是無用的。

這意味著, 如果您希望使用阻塞 API 進(jìn)行大量的并發(fā), 那么您需要大量的線程來防止應(yīng)用程序被停止。

線程在所需內(nèi)存和其它方面都有開銷(例如:它們的棧內(nèi)存數(shù)據(jù))和上下文切換。

對(duì)于許多現(xiàn)代應(yīng)用程序所需的并發(fā)級(jí)別, 阻塞方法是不可擴(kuò)展的。


反應(yīng)堆和多反應(yīng)堆

在此之前我們已經(jīng)注意到 Vert.x 的API都是事件驅(qū)動(dòng)的 - 當(dāng)它們可用時(shí),Vert.x 會(huì)傳遞事件給處理器處理。

在大多數(shù)情況下 Vert.x 使用一個(gè)事件循環(huán)線程調(diào)用你的處理器。

因?yàn)閂ert.x和你的應(yīng)用中不存在阻塞,所以事件循環(huán)線程可以在事件到達(dá)時(shí)持續(xù)不斷地將其分發(fā)給不同的處理器。

因?yàn)闆]有阻塞代碼,事件循環(huán)線程能夠在短時(shí)間內(nèi)分發(fā)大量的事件。例如,單個(gè)事件循環(huán)線程能夠快速地處理成上千個(gè)HTTP請求。

我們將這個(gè)模式稱之為 Reactor Pattern (反應(yīng)堆模式)。

在此之前你可能聽說過這個(gè) - 例如,Node.js(實(shí)現(xiàn)了反應(yīng)堆模式)

在一個(gè)標(biāo)準(zhǔn)的反應(yīng)堆模式實(shí)現(xiàn)中,有一個(gè)單獨(dú)的事件循環(huán)線程 (single event loop),它在一個(gè)循環(huán)中運(yùn)行,處理事件到達(dá)時(shí)傳遞所有的事件到所有的處理器。

問題是單線程任何時(shí)候都只能運(yùn)行在單核心上面,因此如果你想要單線程反應(yīng)堆應(yīng)用(例如:你的Node.js應(yīng)用)在您的多核服務(wù)器上擴(kuò)展,則必須啟動(dòng)并管理許多不同的進(jìn)程。

Vert.x 不同之處并在這里。每個(gè) Vartx 實(shí)例不是一個(gè)事件循環(huán)而是維護(hù)好多個(gè)事件循環(huán)。默認(rèn)情況下我們選擇機(jī)器可用的內(nèi)核數(shù)量作為默認(rèn)數(shù)量,但是這是可以覆寫的。

這意味著與Node.js不同,單個(gè) Vertx 進(jìn)程可以跨服務(wù)器進(jìn)行擴(kuò)展。

我們將這種模式稱為多反應(yīng)堆模式 (Multi-Reactor Pattern),將其與單線程反應(yīng)堆模式區(qū)分開來。

注意 :雖然Vertx實(shí)例維護(hù)了多個(gè)事件循環(huán),任何特定的處理程序也不會(huì)同時(shí)執(zhí)行,并且在大多數(shù)情況下(除了wokrer verticle)將始終使用完全相同的事件循環(huán)來調(diào)用。


黃金法則-不要阻塞事件循環(huán)線程

我們已經(jīng)知道 Vert.x API是非阻塞同時(shí)也不會(huì)阻塞事件循環(huán),但是如果你在你自己的處理器中阻塞了事件循環(huán)這將毫無用處。

如果你要這樣做,事件循環(huán)被阻塞時(shí)它將不能處理任何事情,如果在 Vertx 實(shí)例中阻塞了所有的事件循環(huán),那么你的應(yīng)用將會(huì)完全停止!

因此不要這樣搞,小伙子!在此已經(jīng)警告過你啦。

詠春拳警告

舉幾個(gè)阻塞的栗子:

  • Thread.sleep()

  • 等待一個(gè)鎖

  • 正在等待互斥或監(jiān)視器(例如 synchronized 部分)

  • 做一個(gè)長時(shí)間的數(shù)據(jù)庫操作,并等待一個(gè)結(jié)果

  • 需要長時(shí)間來做一個(gè)復(fù)雜的計(jì)算

  • 死循環(huán)

以上任何的一種情況都會(huì)在很長一段時(shí)間內(nèi)阻塞事件循環(huán)線程(event loop),你應(yīng)經(jīng)立即去下一步,并等待進(jìn)一步的指示。

那么……什么是相當(dāng)長的時(shí)間呢?

這段時(shí)間是多久呢?這實(shí)際上取決于您的應(yīng)用程序和您需要的并發(fā)量。

如果你有一個(gè)事件循環(huán),并且你想每秒處理10000個(gè)HTTP請求,那很明顯每個(gè)請求的處理時(shí)間不能超過0.1ms,所以阻塞時(shí)間不能超過這個(gè)時(shí)間。

這道數(shù)學(xué)題目又不難,就留給讀者來計(jì)算吧。

如果您的應(yīng)用程序沒有響應(yīng),則可能是您在某處阻塞了事件循環(huán)而導(dǎo)致的。為了幫助您診斷這些問題,如果檢測到事件循環(huán)在一段時(shí)間未返回,Vert.x 將會(huì)自動(dòng)記錄警告。如果您在日志中看到這樣的警告,那么您應(yīng)該進(jìn)行代碼檢查了。

  線程 vertx-eventloop-thread-3 已被阻塞20458毫秒

Vert.x 也會(huì)提供堆棧跟蹤來準(zhǔn)確確定阻塞發(fā)生的位置。

如果想要關(guān)閉這些警告或設(shè)置,可以在創(chuàng)建Vert.x對(duì)象之前通過設(shè)置 VertxOptions 對(duì)象來更改。

運(yùn)行阻塞代碼

在一個(gè)完美的世界里面,不會(huì)有戰(zhàn)爭或饑餓,所有的操作都是異步的,小兔子會(huì)在陽光明媚的綠色草地上和小羊羔手拉手。

但是現(xiàn)實(shí)不是這樣的。(你最近有看新聞嗎?(ps:發(fā)生了什么?))

鬼知道我經(jīng)歷了什么

事實(shí)上,大多數(shù)的庫都是同步操作,尤其是JVM生態(tài)系統(tǒng)中有許多同步API,并且許多方法都可能阻塞。一個(gè)很好的栗子就是JDBC API-它本質(zhì)上是同步的,不管多么努力,Vert.x 都不可能使用魔法在上面撒鹽使其變?yōu)楫惒健?/p>

我們不會(huì)把所有的東西都改為異步,因此我們需要為您提供一種在 Vert.x 應(yīng)用程序中安全的使用 “傳統(tǒng)” 阻塞API的方法。

正如前面所討論的,你不能直接從事件循環(huán)中調(diào)用阻塞操作,因?yàn)檫@將阻塞它做其它有用的工作。那么你怎么做到這一點(diǎn)呢?

這是通過調(diào)用 executeBlocking 指定要執(zhí)行阻塞代碼以及在阻塞代碼執(zhí)行返回異步結(jié)果處理程序來完成的:

vertx.executeBlocking(future -> {
  // Call some blocking API that takes a significant amount of time to return
  String result = someAPI.blockingMethod("hello");
  future.complete(result);
}, res -> {
  System.out.println("The result is: " + res.result());
});

默認(rèn)情況下,如果從相同的上下文中多次調(diào)用 executeBlocking(例如:相同的Verticle實(shí)例),則不同的 executeBlocking 被會(huì)被串行執(zhí)行(即一個(gè)接一個(gè)地執(zhí)行 one by one)。

如果你不關(guān)心執(zhí)行順序,你可以調(diào)用 executeBlocking 時(shí)指定 false 作為 ordered 的參數(shù) 。在這種設(shè)置下,工作池上就可以并發(fā)執(zhí)行任何 executeBlocking 了。

運(yùn)行阻塞代碼的另一種方法是使用 Worker Verticle 。

Worker Verticle 始終使用來自工作池的線程來執(zhí)行的。

默認(rèn)情況下,阻塞代碼都在 Vert.x 工作池上執(zhí)行,需要配置 setWorkerPoolSize。

可以為不同的業(yè)務(wù)創(chuàng)建額外的線程池:

WorkerExecutor executot = vertx.createSharedWorkerExecutor("my-worker-pool");
executot.executeBlocking(future -> {
    // Call some blocking API that takes a significant amount of time to return
   String result = someAPI.blockingMethod("hello");
   future.complete(result);
}, res -> {
   System.out.println("The result is: " + res.result());
});

當(dāng)worker executor不再需要的時(shí)候必須要將其關(guān)閉掉。

executor.close();

當(dāng)幾個(gè)工作者使用相同的名稱創(chuàng)建線程池時(shí),它們將會(huì)共享同一個(gè)線程池。工作者線程池關(guān)閉的時(shí)候所有工作者線程也會(huì)被關(guān)閉掉。

當(dāng)在 Verticle 中創(chuàng)建一個(gè)執(zhí)行器時(shí),Verticle 卸載的時(shí)候它也會(huì)自動(dòng)將執(zhí)行器關(guān)閉掉。

Worker executor 可以在被創(chuàng)建時(shí)候進(jìn)行配置:

int poolSize = 10;

// 2 minutes
long maxExecuteTime = 120000;

WorkerExecutor executor = vertx.createSharedWorkerExecutor("my-worker-pool", poolSize, maxExecuteTime);

注意: 這個(gè)需要在創(chuàng)建工作池的時(shí)候進(jìn)行配置。

異步協(xié)調(diào)

Vert.x 可以實(shí)現(xiàn)多個(gè)異步結(jié)果future的協(xié)調(diào)。它還支持并發(fā)組合(并發(fā)運(yùn)行好幾個(gè)異步操作)和順序組合(異步鏈操作)。

并發(fā)組合

CompositeFuture.all 接受多個(gè)future參數(shù)(最多6個(gè));當(dāng)所有的future都成功了,就返回成功的future,否則只要有一個(gè)失敗就會(huì)返回失敗(failed)的future:

Future<HttpServer> httpServerFuture = Future.future();
httpServerFuture.listen(httpServerFuture.completer());

Future<NetServer> netServerFuture = Future.future();
netServer.listen(netServerFuture.complete());

CompositeFuture.all(httpServerFuture, netServerFuture).setHandler(ar -> {
  if (ar.succeeded()) {
    // All servers started
  } else {
    // At least one server failed
  }
});

這個(gè)操作時(shí)同時(shí)運(yùn)行的,在組合完成后,處理器 (Handler) 將會(huì)追加到返回的future上。

handler:處理器

invoke:調(diào)用

upon:在...之上

當(dāng)其中的一個(gè)操作失?。ū粋鬟f的future會(huì)標(biāo)記為失?。┩瑫r(shí)結(jié)果future也會(huì)標(biāo)記為失敗。當(dāng)所有的操作都成功了,結(jié)果future也會(huì)是成功完成的。

同時(shí),你還可以傳遞一個(gè)List集合future(可能我空):

CompositeFuture.all(Arrays.asList(future1, future2, future3));

CompositeFuture.all 是當(dāng)所有的future都成功了才返回成功,其中的一個(gè)future失敗就代表著失敗。(要俺們都成功了那才是成功,不然都算是失敗)。CompositeFuture.any 與此不同的是只要有一個(gè)成功了,并返回成功的future,只有當(dāng)所有的future都失敗那就表示失敗了。(也就是說只要有一個(gè)成功,那咱們就是成功的)。

CompositeFuture.any(future1, future2).setHandler(ar -> {
  if (ar.succeeded()) {
    // At least one is succeeded
  } else {
    // All failed
  }
});

同樣你也可以使用一個(gè)list列表的方式:

CompositeFuture.any(Arrays.asList(f1, f2, f2));

CompositeFuture.join 是等待所有的future完成,不論成功失?。ú灰猿晒κ≌撚⑿郏?,可以支持多個(gè)參數(shù)(至多6個(gè))。當(dāng)所有的future都成功了才返回一個(gè)成功的future,

CompositeFuture.join 需要等待所有的future完成,無論是成功還是失?。ú灰猿晒κ≌撚⑿郏?。 CompositeFuture.join 可以有好幾個(gè)future參數(shù)(最多6個(gè)),當(dāng)所有future成功時(shí)并返回成功的future,當(dāng)所有future都完成并且但是其中一個(gè)失敗,那就代表著失?。?/p>

CompositeFuture.join(future1, future2, future3).setHandler(ar -> {
  if (ar.succeeded()) {
    // All succeeded
  } else {
    // All Complete and at least one failed
  }
});

同樣你也可以使用list集合的方式:

CompositeFuture.join(Arrays.asList(future1, future2, future3));

順序組合

allany 都實(shí)現(xiàn)了并發(fā)組合, compose 可以使用鏈的方式設(shè)置組合future(因此這種方式叫順序組合)。

FileSystem fs = vertx.fileSystem();
Future<Void> startFuture = Future.future();
 
Future<Void fut1 = Future.future();
fs.createFile("/foo", fut1.completer());

fut1.compose(v -> {
   // What the file is created (fut1), execute this:
   Future<Void> fut2 = Future.future();
   fs.writeFile("/foo", Buffer.buffer(), fut2.complter());
   return fut2;
}).compose(v -> {
    // When the file is written (fut2), execute this:
    fs.remove("/foo", "/bar", startFuture.completer());
}, 
// mark startFuture it as failed if any step fails.
startFuture);

在上面的例子中,這3個(gè)操作都是鏈?zhǔn)降模?/p>

  1. 創(chuàng)建文件(fut1 )
  2. 寫入數(shù)據(jù)(fut2 )
  3. 刪除文件(startFuture )

當(dāng)這3個(gè)步驟都成功了,最終的future (startFuture)就成功了。然而,如果其中的一個(gè)步驟失敗了,最終的future并也是失敗的。

這個(gè)例子使用:

  • compose:當(dāng)現(xiàn)有的future完成時(shí),運(yùn)行給定的方法會(huì)返回一個(gè)future。當(dāng)返回的future完成時(shí),它并完成了這個(gè)組合操作。

  • compose:當(dāng)現(xiàn)有的future完成時(shí),運(yùn)行給定的處理器完成給定的 future (下一個(gè))。

在第二種情況下, 處理程序應(yīng)完成下一個(gè)future, 以報(bào)告其成功或失敗。

你可以使用 completer 來完成一個(gè)future操作結(jié)果成功還是失敗。它避免了傳統(tǒng)的不得不的寫操作:如果成功了則返回完成的future否則返回失敗的future。

THE END
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容