感興趣的朋友,可以關(guān)注微信服務(wù)號(hào)“猿學(xué)堂社區(qū)”,或加入“猿學(xué)堂社區(qū)”微信交流群
版權(quán)聲明:本文由作者自行翻譯,未經(jīng)作者授權(quán),不得隨意轉(zhuǎn)發(fā)。
通過(guò)第一次迭代,我們得到了一個(gè)可工作的Wiki應(yīng)用。然而它的實(shí)現(xiàn)存在以下問(wèn)題:
- HTTP請(qǐng)求處理和數(shù)據(jù)庫(kù)訪問(wèn)代碼交織在相同的方法中。
- 大量配置數(shù)據(jù)(如端口號(hào)、JDBC驅(qū)動(dòng)等)是代碼中的硬編碼字符串。
3.1 架構(gòu)和技術(shù)選擇
第二次迭代是關(guān)于重構(gòu)代碼為獨(dú)立可重用Verticle的:
我們將部署兩個(gè)Verticle來(lái)處理HTTP請(qǐng)求,一個(gè)Verticle封裝數(shù)據(jù)庫(kù)持久化。由此產(chǎn)生的Verticle將沒(méi)有相互的直接引用,它們將只商定事件總線中的目的地名稱以及消息格式。這種方式提供了一個(gè)簡(jiǎn)單但有效的解耦。
發(fā)送到事件總線的消息將解碼為JSON。雖然Vert.x的事件總線支持靈活的串行化方案用于高要求或者高度定制的上下文,但是使用JSON數(shù)據(jù)通常是明智的選擇。使用JSON的另一個(gè)優(yōu)勢(shì)是它是一種語(yǔ)言無(wú)關(guān)的格式。由于Vert.x是支持多語(yǔ)言的,對(duì)于使用不同語(yǔ)言編寫(xiě)的Verticle之間的通訊,JSON是非常理想的。
3.2 HTTP Server Verticle
Verticle類開(kāi)端及start方法看起來(lái)如下:
public class HttpServerVerticle extends AbstractVerticle {
private static final Logger LOGGER = LoggerFactory.getLogger(HttpServerVerticle.class);
public static final String CONFIG_HTTP_SERVER_PORT = "http.server.port"; ①
public static final String CONFIG_WIKIDB_QUEUE = "wikidb.queue";
private String wikiDbQueue = "wikidb.queue";
@Override
public void start(Future<Void> startFuture) throws Exception {
wikiDbQueue = config().getString(CONFIG_WIKIDB_QUEUE, "wikidb.queue"); ②
HttpServer server = vertx.createHttpServer();
Router router = Router.router(vertx);
router.get("/").handler(this::indexHandler);
router.get("/wiki/:page").handler(this::pageRenderingHandler);
router.post().handler(BodyHandler.create());
router.post("/save").handler(this::pageUpdateHandler);
router.post("/create").handler(this::pageCreateHandler);
router.post("/delete").handler(this::pageDeletionHandler);
int portNumber = config().getInteger(CONFIG_HTTP_SERVER_PORT, 8080); ③
server
.requestHandler(router::accept)
.listen(portNumber, ar -> {
if (ar.succeeded()) {
LOGGER.info("HTTP server running on port " + portNumber);
startFuture.complete();
} else {
LOGGER.error("Could not start a HTTP server", ar.cause());
startFuture.fail(ar.cause());
}
});
}
// (...)
① 我們暴露了公開(kāi)的常量用于Verticle配置參數(shù):HTTP端口號(hào)以及發(fā)送消息到數(shù)據(jù)庫(kù)Verticle的事件總線目的地名稱。
② AbstractVerticle#config()方法允許訪問(wèn)已提供的Verticle配置。對(duì)于沒(méi)有指定值的情況,第二個(gè)參數(shù)是默認(rèn)值。
③ 配置值不只可以是字符串,也可以是整數(shù)、布爾值以及復(fù)雜的JSON數(shù)據(jù)等。
該類剩余部分主要是提取HTTP部分的代碼,以前的數(shù)據(jù)庫(kù)代碼通過(guò)事件總線消息替換。這是indexHandler方法的代碼:
private final FreeMarkerTemplateEngine templateEngine = FreeMarkerTemplateEngine.create();
private void indexHandler(RoutingContext context) {
DeliveryOptions options = new DeliveryOptions().addHeader("action", "all-pages"); ②
vertx.eventBus().send(wikiDbQueue, new JsonObject(), options, reply -> { ①
if (reply.succeeded()) {
JsonObject body = (JsonObject) reply.result().body(); ③
context.put("title", "Wiki home");
context.put("pages", body.getJsonArray("pages").getList());
templateEngine.render(context, "templates", "/index.ftl", ar -> {
if (ar.succeeded()) {
context.response().putHeader("Content-Type", "text/html");
context.response().end(ar.result());
} else {
context.fail(ar.cause());
}
});
} else {
context.fail(reply.cause());
}
});
}
① vertx對(duì)象提供了對(duì)事件總線的訪問(wèn),我們發(fā)送一個(gè)消息到數(shù)據(jù)庫(kù)Verticle的隊(duì)列。
② 傳遞選項(xiàng)(DeliveryOptions)允許我們指定頭、有效載荷(payload)編解碼器和超時(shí)時(shí)間。
③ 一旦成功,回復(fù)包含有效載荷。
正如我們所看到的,事件總線消息由一個(gè)消息體和選項(xiàng)組成,它可以選擇性地期待一個(gè)答復(fù)。對(duì)于沒(méi)有預(yù)期答復(fù)的情況,有一個(gè)send方法的變體,它沒(méi)有Handler參數(shù)。
我們將有效載荷編碼為JSON對(duì)象,并通過(guò)一個(gè)稱為action的消息頭指定數(shù)據(jù)庫(kù)Verticle應(yīng)該執(zhí)行哪個(gè)操作。
Verticle的剩余代碼就是路由器處理器,同樣使用事件總線獲取和存儲(chǔ)數(shù)據(jù):
private static final String EMPTY_PAGE_MARKDOWN = "# A new page\n" + "\n"
+ "Feel-free to write in Markdown!\n";
private void pageRenderingHandler(RoutingContext context) {
String requestedPage = context.request().getParam("page");
JsonObject request = new JsonObject().put("page", requestedPage);
DeliveryOptions options = new DeliveryOptions().addHeader("action",
"get-page");
vertx.eventBus().send(
wikiDbQueue,
request,
options,
reply -> {
if (reply.succeeded()) {
JsonObject body = (JsonObject) reply.result().body();
boolean found = body.getBoolean("found");
String rawContent = body.getString("rawContent",
EMPTY_PAGE_MARKDOWN);
context.put("title", requestedPage);
context.put("id", body.getInteger("id", -1));
context.put("newPage", found ? "no" : "yes");
context.put("rawContent", rawContent);
context.put("content", Processor.process(rawContent));
context.put("timestamp", new Date().toString());
templateEngine.render(
context,
"templates",
"/page.ftl",
ar -> {
if (ar.succeeded()) {
context.response().putHeader(
"Content-Type", "text/html");
context.response().end(ar.result());
} else {
context.fail(ar.cause());
}
});
} else {
context.fail(reply.cause());
}
});
}
private void pageUpdateHandler(RoutingContext context) {
String title = context.request().getParam("title");
JsonObject request = new JsonObject()
.put("id", context.request().getParam("id"))
.put("title", title)
.put("markdown", context.request().getParam("markdown"));
DeliveryOptions options = new DeliveryOptions();
if ("yes".equals(context.request().getParam("newPage"))) {
options.addHeader("action", "create-page");
} else {
options.addHeader("action", "save-page");
}
vertx.eventBus().send(wikiDbQueue, request, options, reply -> {
if (reply.succeeded()) {
context.response().setStatusCode(303);
context.response().putHeader("Location", "/wiki/" + title);
context.response().end();
} else {
context.fail(reply.cause());
}
});
}
private void pageCreateHandler(RoutingContext context) {
String pageName = context.request().getParam("name");
String location = "/wiki/" + pageName;
if (pageName == null || pageName.isEmpty()) {
location = "/";
}
context.response().setStatusCode(303);
context.response().putHeader("Location", location);
context.response().end();
}
private void pageDeletionHandler(RoutingContext context) {
String id = context.request().getParam("id");
JsonObject request = new JsonObject().put("id", id);
DeliveryOptions options = new DeliveryOptions().addHeader("action",
"delete-page");
vertx.eventBus().send(wikiDbQueue, request, options, reply -> {
if (reply.succeeded()) {
context.response().setStatusCode(303);
context.response().putHeader("Location", "/");
context.response().end();
} else {
context.fail(reply.cause());
}
});
}
3.3 數(shù)據(jù)庫(kù)Verticle
使用JDBC鏈接到一個(gè)數(shù)據(jù)庫(kù)當(dāng)然需要數(shù)據(jù)庫(kù)驅(qū)動(dòng)以及配置,這些我們?cè)诘谝淮蔚胁捎糜簿幋a的方式實(shí)現(xiàn)。
3.3.1 配置SQL查詢
在將前面Verticle的硬編碼值轉(zhuǎn)換為配置參數(shù)的同時(shí),我們還可以進(jìn)一步從一個(gè)配置文件中加載SQL查詢。
查詢語(yǔ)句將從一個(gè)文件中加載,這個(gè)文件名作為一個(gè)配置參數(shù)傳遞,如果沒(méi)有提供則從一個(gè)默認(rèn)資源加載。這個(gè)方法的優(yōu)點(diǎn)是Verticle可以適配不同的JDBC驅(qū)動(dòng)和SQL方言。
Verticle類的開(kāi)端包含了主要的配置鍵的定義:
public class WikiDatabaseVerticle extends AbstractVerticle {
public static final String CONFIG_WIKIDB_JDBC_URL = "wikidb.jdbc.url";
public static final String CONFIG_WIKIDB_JDBC_DRIVER_CLASS = "wikidb.jdbc.driver_class";
public static final String CONFIG_WIKIDB_JDBC_MAX_POOL_SIZE = "wikidb.jdbc.max_pool_size";
public static final String CONFIG_WIKIDB_SQL_QUERIES_RESOURCE_FILE = "wikidb.sqlqueries.resource.file";
public static final String CONFIG_WIKIDB_QUEUE = "wikidb.queue";
private static final Logger LOGGER = LoggerFactory.getLogger(WikiDatabaseVerticle.class);
// (...)
SQL查詢存儲(chǔ)在一個(gè)Properties文件中,使用HSQLDB的默認(rèn)文件位于src/main/resources/db-queries.properties:
create-pages-table=create table if not exists Pages (Id integer identity primary key, Name varchar(255) unique, Content
clob)
get-page=select Id, Content from Pages where Name = ?
create-page=insert into Pages values (NULL, ?, ?)
save-page=update Pages set Content = ? where Id = ?
all-pages=select Name from Pages
delete-page=delete from Pages where Id = ?
WikiDdatabaseVerticle中的以下代碼用于從文件中加載SQL查詢,并將它們放到一個(gè)map中:
private enum SqlQuery {
CREATE_PAGES_TABLE,
ALL_PAGES,
GET_PAGE,
CREATE_PAGE,
SAVE_PAGE,
DELETE_PAGE
}
private final HashMap<SqlQuery, String> sqlQueries = new HashMap<>();
private void loadSqlQueries() throws IOException {
String queriesFile = config().getString(CONFIG_WIKIDB_SQL_QUERIES_RESOURCE_FILE);
InputStream queriesInputStream;
if (queriesFile != null) {
queriesInputStream = new FileInputStream(queriesFile);
} else {
queriesInputStream = getClass().getResourceAsStream("/db-queries.properties");
}
Properties queriesProps = new Properties();
queriesProps.load(queriesInputStream);
queriesInputStream.close();
sqlQueries.put(SqlQuery.CREATE_PAGES_TABLE, queriesProps.getProperty("create-pages-table"));
sqlQueries.put(SqlQuery.ALL_PAGES, queriesProps.getProperty("all-pages"));
sqlQueries.put(SqlQuery.GET_PAGE, queriesProps.getProperty("get-page"));
sqlQueries.put(SqlQuery.CREATE_PAGE, queriesProps.getProperty("create-page"));
sqlQueries.put(SqlQuery.SAVE_PAGE, queriesProps.getProperty("save-page"));
sqlQueries.put(SqlQuery.DELETE_PAGE, queriesProps.getProperty("delete-page"));
}
在接下來(lái)的代碼中,我們使用SqlQuery枚舉類型以避免字符串常量。Verticle的start方法代碼如下:
private JDBCClient dbClient;
@Override
public void start(Future<Void> startFuture) throws Exception {
/*
* Note: this uses blocking APIs, but data is small...
*/
loadSqlQueries(); ①
dbClient = JDBCClient.createShared(vertx, new JsonObject()
.put("url", config().getString(CONFIG_WIKIDB_JDBC_URL, "jdbc:hsqldb:file:db/wiki"))
.put("driver_class", config().getString(CONFIG_WIKIDB_JDBC_DRIVER_CLASS, "org.hsqldb.jdbcDriver"))
.put("max_pool_size", config().getInteger(CONFIG_WIKIDB_JDBC_MAX_POOL_SIZE, 30)));
dbClient.getConnection(ar -> {
if (ar.failed()) {
LOGGER.error("Could not open a database connection", ar.cause());
startFuture.fail(ar.cause());
} else {
SQLConnection connection = ar.result();
connection.execute(sqlQueries.get(SqlQuery.CREATE_PAGES_TABLE), create -> { ②
connection.close();
if (create.failed()) {
LOGGER.error("Database preparation error", create.cause());
startFuture.fail(create.cause());
} else {
vertx.eventBus().consumer(config().getString(CONFIG_WIKIDB_QUEUE, "wikidb.queue"), this::onMessage); ③
startFuture.complete();
}
});
}
});
}
① 有趣的是,我們打破了Vert.x中的一個(gè)重要的原則——就是避免阻塞API,但是因?yàn)闆](méi)有異步API來(lái)訪問(wèn)類路徑上的資源,所以我們的選擇是受限的。我們可以使用Vert.x的executeBlocking方法將阻塞的I/O操作從事件循環(huán)轉(zhuǎn)移到工作者線程,但是由于數(shù)據(jù)非常小,這么做沒(méi)有明顯的效益。
② 這兒是使用SQL查詢的一個(gè)示例。
③ consumer方法注冊(cè)了一個(gè)事件總線目的地Handler。
3.3.2 分發(fā)請(qǐng)求
事件總線消息的Handler是onMessage方法:
public enum ErrorCodes {
NO_ACTION_SPECIFIED,
BAD_ACTION,
DB_ERROR
}
public void onMessage(Message<JsonObject> message) {
if (!message.headers().contains("action")) {
LOGGER.error("No action header specified for message with headers {} and body {}",
message.headers(), message.body().encodePrettily());
message.fail(ErrorCodes.NO_ACTION_SPECIFIED.ordinal(), "No action header specified");
return;
}
String action = message.headers().get("action");
switch (action) {
case "all-pages":
fetchAllPages(message);
break;
case "get-page":
fetchPage(message);
break;
case "create-page":
createPage(message);
break;
case "save-page":
savePage(message);
break;
case "delete-page":
deletePage(message);
break;
default:
message.fail(ErrorCodes.BAD_ACTION.ordinal(), "Bad action: " + action);
}
}
我們?yōu)殄e(cuò)誤定義了一個(gè)ErrorCodes枚舉,用來(lái)報(bào)回消息發(fā)送者。為此,Message類的fail方法提供了一個(gè)便捷方式答復(fù)錯(cuò)誤,原消息發(fā)送者得到一個(gè)失敗的AsyncResult.
3.3.3 減少JDBC客戶端樣板文件(譯者注:原文使用的是boilerplate,應(yīng)該指的是那些重復(fù)的代碼)
截止目前,我們可以看到執(zhí)行一個(gè)SQL查詢的完整交互:
- 獲取一個(gè)鏈接
- 執(zhí)行請(qǐng)求
- 釋放鏈接
這導(dǎo)致對(duì)每個(gè)異步操作需要異常處理的地方都需要編碼,如下:
dbClient.getConnection(car -> {
if (car.succeeded()) {
SQLConnection connection = car.result();
connection.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
connection.close();
if (res.succeeded()) {
List<String> pages = res.result()
.getResults()
.stream()
.map(json -> json.getString(0))
.sorted()
.collect(Collectors.toList());
message.reply(new JsonObject().put("pages", new JsonArray(pages)));
} else {
reportQueryError(message, res.cause());
}
});
} else {
reportQueryError(message, car.cause());
}
});
自從Vert.x 3.5.0開(kāi)始,JDBC客戶端現(xiàn)在支持一次性(one-shot)操作,獲取一個(gè)鏈接執(zhí)行一個(gè)SQL操作,并且在內(nèi)部釋放。與前面相同的代碼現(xiàn)在簡(jiǎn)化如下:
dbClient.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
if (res.succeeded()) {
List<String> pages = res.result()
.getResults()
.stream()
.map(json -> json.getString(0))
.sorted()
.collect(Collectors.toList());
message.reply(new JsonObject().put("pages", new JsonArray(pages)));
} else {
reportQueryError(message, res.cause());
}
});
這對(duì)于獲取數(shù)據(jù)庫(kù)鏈接執(zhí)行一個(gè)單獨(dú)操作的情況非常有用。但就性能而言,需要注意的是,對(duì)于鏈?zhǔn)降腟QL操作,重用數(shù)據(jù)庫(kù)鏈接會(huì)更好。
該類的剩余部分包含了onMessage分發(fā)接收消息時(shí)的私有方法調(diào)用:
private void fetchAllPages(Message<JsonObject> message) {
dbClient.query(sqlQueries.get(SqlQuery.ALL_PAGES), res -> {
if (res.succeeded()) {
List<String> pages = res.result()
.getResults()
.stream()
.map(json -> json.getString(0))
.sorted()
.collect(Collectors.toList());
message.reply(new JsonObject().put("pages", new JsonArray(pages)));
} else {
reportQueryError(message, res.cause());
}
});
}
private void fetchPage(Message<JsonObject> message) {
String requestedPage = message.body().getString("page");
JsonArray params = new JsonArray().add(requestedPage);
dbClient.queryWithParams(sqlQueries.get(SqlQuery.GET_PAGE), params, fetch -> {
if (fetch.succeeded()) {
JsonObject response = new JsonObject();
ResultSet resultSet = fetch.result();
if (resultSet.getNumRows() == 0) {
response.put("found", false);
} else {
response.put("found", true);
JsonArray row = resultSet.getResults().get(0);
response.put("id", row.getInteger(0));
response.put("rawContent", row.getString(1));
}
message.reply(response);
} else {
reportQueryError(message, fetch.cause());
}
});
}
private void createPage(Message<JsonObject> message) {
JsonObject request = message.body();
JsonArray data = new JsonArray()
.add(request.getString("title"))
.add(request.getString("markdown"));
dbClient.updateWithParams(sqlQueries.get(SqlQuery.CREATE_PAGE), data, res -> {
if (res.succeeded()) {
message.reply("ok");
} else {
reportQueryError(message, res.cause());
}
});
}
private void savePage(Message<JsonObject> message) {
JsonObject request = message.body();
JsonArray data = new JsonArray()
.add(request.getString("markdown"))
.add(request.getString("id"));
dbClient.updateWithParams(sqlQueries.get(SqlQuery.SAVE_PAGE), data, res -> {
if (res.succeeded()) {
message.reply("ok");
} else {
reportQueryError(message, res.cause());
}
});
}
private void deletePage(Message<JsonObject> message) {
JsonArray data = new JsonArray().add(message.body().getString("id"));
dbClient.updateWithParams(sqlQueries.get(SqlQuery.DELETE_PAGE), data, res -> {
if (res.succeeded()) {
message.reply("ok");
} else {
reportQueryError(message, res.cause());
}
});
}
private void reportQueryError(Message<JsonObject> message, Throwable cause) {
LOGGER.error("Database query error", cause);
message.fail(ErrorCodes.DB_ERROR.ordinal(), cause.getMessage());
}
3.4 從主Verticle部署Verticle
我們依舊有一個(gè)MainVerticle類,但它不是像首次迭代一樣包含所有邏輯,它的唯一目的是啟動(dòng)應(yīng)用并且部署其它Verticle。
這些代碼包括部署一個(gè)WikiDatabaseVerticle實(shí)例和兩個(gè)HttpServerVerticle實(shí)例:
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Future<Void> startFuture) throws Exception {
Future<String> dbVerticleDeployment = Future.future(); ①
vertx.deployVerticle(new WikiDatabaseVerticle(), dbVerticleDeployment.completer()); ②
dbVerticleDeployment.compose(id -> { ③
Future<String> httpVerticleDeployment = Future.future();
vertx.deployVerticle("io.vertx.guides.wiki.HttpServerVerticle", ④
new DeploymentOptions().setInstances(2), ⑤
httpVerticleDeployment.completer());
return httpVerticleDeployment; ⑥
}).setHandler(ar -> { ⑦
if (ar.succeeded()) {
startFuture.complete();
} else {
startFuture.fail(ar.cause());
}
});
}
}
① 部署Verticle是一個(gè)異步操作,因此我們需要一個(gè)Future。String參數(shù)類型是因?yàn)橐粋€(gè)Verticle部署成功時(shí)會(huì)返回一個(gè)標(biāo)識(shí)。
② 一種選擇是使用new創(chuàng)建一個(gè)Verticle實(shí)例,傳遞對(duì)象引用給deploy方法。completer返回值是一個(gè)處理器,簡(jiǎn)單的完成future。
③ 使用compose的順序組合允許在一個(gè)操作之后運(yùn)行另一個(gè)異步操作。當(dāng)初始化future成功完成之后,調(diào)用組合方法。
④ 指定一個(gè)類名字符串也是部署Verticle的一種選項(xiàng)。對(duì)于其它JVM語(yǔ)言來(lái)說(shuō),基于字符串的慣例(conventions)允許指定一個(gè)模塊/腳本。
⑤ DeploymentOption類允許指定一些參數(shù),尤其部署的實(shí)例個(gè)數(shù)。
⑥ 組合方法返回下一個(gè)future。它的完成將會(huì)觸發(fā)組合操作的完成。
⑦ 我們定義了一個(gè)Handler,以便完成MainVerticle的啟動(dòng)future。
精明的讀者可能會(huì)感到驚奇,我們?cè)趺纯梢栽谕粋€(gè)端口部署HTTP Server代碼兩次,并且對(duì)于每個(gè)實(shí)例都不期望出現(xiàn)由于TCP端口已經(jīng)被占用而導(dǎo)致的任何錯(cuò)誤。對(duì)于許多Web框架,我們需要選擇不同的TCP端口,并且有一個(gè)前端HTTP代理來(lái)執(zhí)行端口之間的負(fù)載平衡。
對(duì)于Vert.x則不需要這么做,多個(gè)Verticle可以共享相同的TCP端口號(hào)。傳入的連接只是簡(jiǎn)單的通過(guò)接收線程以輪轉(zhuǎn)的方式分發(fā)。