1. 需求背景
在支付系統(tǒng)中,因為上游通道的差異化,有時會出現(xiàn)這種情形:
上游提供的接口是異步的,即上游接收請求后,立即響應(yīng)處理中(中間狀態(tài)),最終處理結(jié)果以異步通知的形式告知;
平臺對接多家上游機構(gòu),有同步響應(yīng)也有異步響應(yīng),為了標準統(tǒng)一,平臺對下游統(tǒng)一封裝了同步響應(yīng)接口,方便下游處理業(yè)務(wù)。
此時,對于異步響應(yīng)的通道,平臺需要通過異步轉(zhuǎn)同步功能,讓下游無感知。
2. 方案簡介

具體流程見上圖:
- 客戶端同步請求服務(wù)端(集群);
2.1 將orderNo/uniqueId作為key/value 放入redis緩存,其中uniqueId用于異步轉(zhuǎn)同步識別請求線程;
2.2 服務(wù)端請求第三方服務(wù);
(中間忽略了同步響應(yīng)的中間狀態(tài)響應(yīng))
- 第三方發(fā)起異步響應(yīng)到服務(wù)端集群;
(此時,不能保證異步響應(yīng)路由到源請求的節(jié)點上)
4.1 按照orderNo查詢redis,獲取請求的uniqueId;
4.2 按約定組裝結(jié)果放入MQ中(此處使用RabbitMq演示);
- MQ向消費者廣播;
5.1 節(jié)點接收廣播后,識別是該節(jié)點發(fā)出的請求,則進行響應(yīng)處理;
5.2 節(jié)點接收廣播后,識別不是該節(jié)點發(fā)出的請求,則丟棄該通知;
- 在未超時的情況下,同步響應(yīng)客戶端。
3. 技術(shù)要點
參考Dubbo 2.5.x com.alibaba.dubbo.remoting.exchange.support. DefaultFuture類
主要使用 ReentrantLock解決互斥問題,使用 Condition 實現(xiàn)超時等待功能。對DefaultFuture做適當?shù)暮喕?,示例如下?/p>
public class DefaultFuture {
private static final Map<String, DefaultFuture> FUTURES = new ConcurrentHashMap<>();
private final String id;
private final int timeout;
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
private final long start = System.currentTimeMillis();
private volatile Response response;
private final Request request;
public DefaultFuture(Request request, int timeout){
this.id = request.getUniqueId();
this.timeout = timeout > 0 ? timeout : Constants.DEFAULT_TIMEOUT;
this.request = request;
// put into waiting map.
FUTURES.put(id, this);
}
/**
* 阻塞獲取響應(yīng)
* @return
* @throws TimeoutException
*/
public Object get() throws Exception {
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
// 超時等待
done.await(timeout, TimeUnit.MILLISECONDS);
// 如果有返回結(jié)果了,或者,超時了,就退出循環(huán)
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// 如果是超時了,就拋出異常
if (!isDone()) {
throw new TimeoutException(request.getUniqueId());
}
}
// 遠程服務(wù)正常返回結(jié)果,則返回給調(diào)用方
return returnFromResponse();
}
private Object returnFromResponse() throws Exception {
Response res = response;
if (res.getStatus() == Response.OK) {
return response.getResult();
}
if (res.getStatus() == Response.TIMEOUT) {
throw new TimeoutException(res.getErrorMessage());
}
FUTURES.remove(id);
throw new Exception(res.getErrorMessage());
}
/**
* 是否響應(yīng)
* @return
*/
private boolean isDone(){
return this.response != null;
}
public static void received(Response response) {
try {
// 根據(jù)請求id從FUTURES中獲取DefaultFuture,并刪除
DefaultFuture future = FUTURES.remove(response.getUniqueId());
if (future != null) {
future.doReceived(response);
} else {
log.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response);
}
} finally {
}
}
private void doReceived(Response response) {
lock.lock();
try {
this.response = response;
this.response.setStatus(Response.OK);
if (done != null) {
// 喚醒阻塞的線程
done.signal();
}
} finally {
lock.unlock();
}
}
/**
* 是否是正確的響應(yīng)
* @param id
* @return
*/
public static boolean isCorrectResponse(String id){
return FUTURES.containsKey(id);
}
private int getTimeout() {
return timeout;
}
private long getStartTimestamp() {
return start;
}
public String getId() {
return id;
}
private static class RemotingInvocationTimeoutScan implements Runnable {
@Override
public void run() {
while (true) {
try {
for (DefaultFuture future : FUTURES.values()) {
if (future == null || future.isDone()) {
continue;
}
if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
// create exception response.
Response timeoutResponse = new Response(future.getId());
// set timeout status.
timeoutResponse.setErrorMessage("響應(yīng)超時");
// handle response.
DefaultFuture.received(timeoutResponse);
}
}
Thread.sleep(30);
} catch (Throwable e) {
log.error("Exception when scan the timeout invocation of remoting.", e);
}
}
}
}
static {
// 自動清除超時任務(wù)
Thread th = new Thread(new RemotingInvocationTimeoutScan(), "remoting-invocation-timeout-scan");
th.setDaemon(true);
th.start();
}
}
4. 測試
- 測試類接收請求后,直接組裝響應(yīng),將響應(yīng)放入mq,然后調(diào)用DefaultFuture.get()阻塞等待響應(yīng);
- mq發(fā)送廣播,測試項目接收后,按流程通過DefaultFuture.received()處理響應(yīng);
- DefaultFuture.doReceived()方法中,喚醒等待的請求線程,測試完成。
測試環(huán)境: Macbook Pro 8G + Spring boot(單實例)+ jmeter(共100個線程)


