
《Spring 響應式編程》
C1 為什么選擇響應式 Spring
響應式編程 vs 命令式編程
如何具備即時響應性?
- 彈性:scalability 可伸縮的擴容減容
- 可伸縮的分布式系統(tǒng)有難度
- resilience 回彈性:故障可響應
消息驅(qū)動通信
針對IO實現(xiàn)更高資源利用率,應該使用異步非阻塞交互模型。
- 接受的消息到了之后做出響應,否則休眠。
- 組件以非阻塞方式發(fā)送消息
實現(xiàn)方法:消息代理服務器
價值:即時響應
形式:彈性和回彈性(elasticity and resilience)
手段:消息驅(qū)動
背壓:工作負載管理,確保一個處理階段不會壓垮另一個
JVM 響應式框架:
- Akka
- Vert.x
- Spring Cloud
調(diào)用阻塞:阻塞式通信違背消息驅(qū)動原則。A調(diào)用B, B阻塞A。
以前舊有的一些不夠完美的解決方法:
- 方法1:可以用 回調(diào) (callback) 來實現(xiàn)跨組件通信。
interface ShoppingCardService{
void calculate(Input in, Consumer<Output> c);
}
class SyncServiceA implements ShoppingCardService{
public void calculate(Input value, Consumer<Output> c){
Output res = new Output(value);
c.accept(res); // no block action
}
}
class AsyncServiceB implements ShoppingCardService{
public void calculate(Input value, Consumer<Output> c){
new Thread(() -> {
// put this block action into a thread
Output res = otherService.getRes(value);
//...
c.accept(res);
}).start;
}
}
需要理解多線程,避免共享數(shù)據(jù)修改陷阱和回調(diào)地獄。
- 方法2:用
Future(java.util.concurrent.Future)
interface ShoppingCardService{
Future<Output> calculate(Input in);
}
class OrderService{
private final ShoppingCardService shoppingCardService;
void process(){
Input intput = getInput();
Future<Output> future = shoppingCardService.calculate(input);
//...
Output output = future.get();
//...
}
}
Future 類包裝器檢查是否有可用結(jié)果,能否以阻塞方式獲取。對結(jié)果延遲獲取,但最終還是要阻塞當前線程來和外部執(zhí)行進行同步。
- 方法3: 更好的
CompletionStage和CompletableFuture。
interface ShoppingCardService{
CompletionStage<Output> calculate(Input in);
}
class OrderService{
private final ComponentB comb;
Intput in = getInput();
comb.calculate(in)
.thenApply(out1 -> { })
.thenCombine(out2 -> { })
.thenAccept(out3 -> { })
}
CompletionStage 類包裝期提供了API進行鏈式調(diào)用,但 Spring4 MVC 不支持
- 方法4: Spring 4 里 的
ListenableFuture和AsyncRestTemplate
AsyncRestTemplate template = new AsyncRestTemplate();
// define the callback
SuccessCallback onSuccess = r -> {};
FailureCallback onFailure = e -> {};
String url = "http://xxxxx/api/xx";
ListenableFuture<?> res = template.getForEntity(url,SomeEntity.class);
res.addCallback(onSuccess,onFailure);
用一個單獨的線程包裝阻塞式網(wǎng)絡調(diào)用來實現(xiàn),依賴于 Servlet API,所以實現(xiàn)基于線程單次請求(thread-per-request)。Spring 5 的 WebClient 跨服務通信不阻塞,Servlet3.0之后也引入了新異步非阻塞功能。
JAVA的多線程共享CPU,內(nèi)存也有較多消耗,上下文切換有成本,多開線程去解決阻塞不劃算。
Netty響應式服務器解決上下文切換問題。
異步處理:
- 請求 --> 響應
- 無限元素數(shù)據(jù)流(響應式管道) 源<==>處理<==>轉(zhuǎn)換<==>過濾