參考dubbo實現(xiàn)異步轉(zhuǎn)同步方案

1. 需求背景

在支付系統(tǒng)中,因為上游通道的差異化,有時會出現(xiàn)這種情形:

  1. 上游提供的接口是異步的,即上游接收請求后,立即響應(yīng)處理中(中間狀態(tài)),最終處理結(jié)果以異步通知的形式告知;

  2. 平臺對接多家上游機構(gòu),有同步響應(yīng)也有異步響應(yīng),為了標準統(tǒng)一,平臺對下游統(tǒng)一封裝了同步響應(yīng)接口,方便下游處理業(yè)務(wù)。

此時,對于異步響應(yīng)的通道,平臺需要通過異步轉(zhuǎn)同步功能,讓下游無感知。

2. 方案簡介

請求與響應(yīng)流程

具體流程見上圖:

    1. 客戶端同步請求服務(wù)端(集群);
  • 2.1 將orderNo/uniqueId作為key/value 放入redis緩存,其中uniqueId用于異步轉(zhuǎn)同步識別請求線程;

  • 2.2 服務(wù)端請求第三方服務(wù);

(中間忽略了同步響應(yīng)的中間狀態(tài)響應(yīng))

    1. 第三方發(fā)起異步響應(yīng)到服務(wù)端集群;

(此時,不能保證異步響應(yīng)路由到源請求的節(jié)點上)

  • 4.1 按照orderNo查詢redis,獲取請求的uniqueId;

  • 4.2 按約定組裝結(jié)果放入MQ中(此處使用RabbitMq演示);

    1. MQ向消費者廣播;
  • 5.1 節(jié)點接收廣播后,識別是該節(jié)點發(fā)出的請求,則進行響應(yīng)處理;

  • 5.2 節(jié)點接收廣播后,識別不是該節(jié)點發(fā)出的請求,則丟棄該通知;

    1. 在未超時的情況下,同步響應(yīng)客戶端。

3. 技術(shù)要點

參考Dubbo 2.5.x com.alibaba.dubbo.remoting.exchange.support. DefaultFuture類

主要使用 ReentrantLock解決互斥問題,使用 Condition 實現(xiàn)超時等待功能。對DefaultFuture做適當?shù)暮喕?,示例如下?/p>

public class DefaultFuture {

    private static final Map<String, DefaultFuture> FUTURES  = new ConcurrentHashMap<>();

    private final String id;
    private final int timeout;
    private final Lock lock = new ReentrantLock();
    private final Condition done = lock.newCondition();
    private final long start = System.currentTimeMillis();
    private volatile Response response;
    private final Request request;

    public DefaultFuture(Request request, int timeout){
        this.id = request.getUniqueId();
        this.timeout = timeout > 0 ? timeout : Constants.DEFAULT_TIMEOUT;
        this.request = request;
        // put into waiting map.
        FUTURES.put(id, this);
    }

    /**
     * 阻塞獲取響應(yīng)
     * @return
     * @throws TimeoutException
     */
    public Object get() throws Exception {
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    // 超時等待
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    // 如果有返回結(jié)果了,或者,超時了,就退出循環(huán)
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            // 如果是超時了,就拋出異常
            if (!isDone()) {
                throw new TimeoutException(request.getUniqueId());
            }
        }
        // 遠程服務(wù)正常返回結(jié)果,則返回給調(diào)用方
        return returnFromResponse();

    }

    private Object returnFromResponse() throws Exception {
        Response res = response;
        if (res.getStatus() == Response.OK) {
            return response.getResult();
        }
        if (res.getStatus() == Response.TIMEOUT) {
            throw new TimeoutException(res.getErrorMessage());
        }
        FUTURES.remove(id);
        throw new Exception(res.getErrorMessage());
    }

    /**
     * 是否響應(yīng)
     * @return
     */
    private boolean isDone(){
        return this.response != null;
    }

    public static void received(Response response) {
        try {
            // 根據(jù)請求id從FUTURES中獲取DefaultFuture,并刪除
            DefaultFuture future = FUTURES.remove(response.getUniqueId());
            if (future != null) {
                future.doReceived(response);
            } else {
                log.warn("The timeout response finally returned at "
                    + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                    + ", response " + response);
            }
        } finally {
        }
    }

    private void doReceived(Response response) {
        lock.lock();
        try {
            this.response = response;
            this.response.setStatus(Response.OK);
            if (done != null) {
                // 喚醒阻塞的線程
                done.signal();
            }
        } finally {
            lock.unlock();
        }
    }

    /**
     * 是否是正確的響應(yīng)
     * @param id
     * @return
     */
    public static boolean isCorrectResponse(String id){
        return FUTURES.containsKey(id);
    }
    private int getTimeout() {
        return timeout;
    }

    private long getStartTimestamp() {
        return start;
    }

    public String getId() {
        return id;
    }

    private static class RemotingInvocationTimeoutScan implements Runnable {
        @Override
        public void run() {
            while (true) {
                try {
                    for (DefaultFuture future : FUTURES.values()) {
                        if (future == null || future.isDone()) {
                            continue;
                        }
                        if (System.currentTimeMillis() - future.getStartTimestamp() > future.getTimeout()) {
                            // create exception response.
                            Response timeoutResponse = new Response(future.getId());
                            // set timeout status.
                            timeoutResponse.setErrorMessage("響應(yīng)超時");
                            // handle response.
                            DefaultFuture.received(timeoutResponse);
                        }
                    }
                    Thread.sleep(30);
                } catch (Throwable e) {
                    log.error("Exception when scan the timeout invocation of remoting.", e);
                }
            }
        }
    }

    static {
        // 自動清除超時任務(wù)
        Thread th = new Thread(new RemotingInvocationTimeoutScan(), "remoting-invocation-timeout-scan");
        th.setDaemon(true);
        th.start();
    }

}

4. 測試

  1. 測試類接收請求后,直接組裝響應(yīng),將響應(yīng)放入mq,然后調(diào)用DefaultFuture.get()阻塞等待響應(yīng);
  2. mq發(fā)送廣播,測試項目接收后,按流程通過DefaultFuture.received()處理響應(yīng);
  3. DefaultFuture.doReceived()方法中,喚醒等待的請求線程,測試完成。

測試環(huán)境: Macbook Pro 8G + Spring boot(單實例)+ jmeter(共100個線程)

線程組
TPS
RabbitMQ
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容