Redisson 中的 CommandExecutor

轉自:https://github.com/angryz/my-blog/issues/6

上篇Redisson 分布式鎖實現分析中提到了RedissonLock中的redis命令都是通過CommandExecutor來發(fā)送到redis服務執(zhí)行的,本篇就來了解一下它的實現方式。

先來看其源碼

public interface CommandExecutor extends CommandSyncExecutor, CommandAsyncExecutor {
}

可以看到它同時繼承了 同步和異步(sync/async) 兩種調用方式。

Note:

  • 在分布式鎖的實現中是用了同步的 CommandExecutor,是因為鎖的獲取和釋放是有強一致性要求的,需要實時知道結果方可進行下一步操作。
  • 上篇分布式鎖分析中我提到 Redisson 的同步實現實際上是基于異步實現的,這在下文中也會得到解釋。

在Redisson中,除了提供同步和異步的方式執(zhí)行命令之外,還通過 Reactive Streams 實現了 Reactive 方式的命令執(zhí)行器。見下圖

預備知識

Redisson 大量使用了 Redis 的 EVAL 命令來執(zhí)行 Lua 腳本,所以先要對 EVAL 有所了解。

EVAL命令格式和示例

EVAL script numkeys key [key ...] arg [arg ...]

> eval "return redis.call('set',KEYS[1],ARGV[1])" 1 foo bar
OK

從 Redis 2.6.0 版本開始,通過內置的 Lua 解釋器,可以使用 EVAL 命令對 Lua 腳本進行求值。

參數的說明本文不再詳述,可查閱 Redis命令參考。

重點是這個:Redis 使用單個 Lua 解釋器去運行所有腳本,并且 Redis 也保證腳本會以原子性(atomic)的方式執(zhí)行,當某個腳本正在運行的時候,不會有其他腳本或 Redis 命令被執(zhí)行。所以 Redisson 中使用了 EVAL 來保證執(zhí)行命令操作數據時的安全性。

例子

這里就使用 Redisson 參考文檔中的一個 RAtomicLong 對象的例子吧。

RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// 同步方式
longObject.compareAndSet(3, 401);
// 異步方式
longObject.compareAndSetAsync(3, 401);

RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// reactive方式
longObject.compareAndSet(3, 401);

根據此例,我們分別來看 compareAndSet/compareAndSetAsync 的實現,其他命令原理都一樣。

異步

既然同步和Reactive的實現都繼承了異步的實現,那我們就先來看看CommandAsyncService吧。

例子中的 longObject.compareAndSetAsync(3, 401); 實際執(zhí)行的是 RedissonAtomicLong 實現類的 compareAndSetAsync 方法,如下

public Future<Boolean> compareAndSetAsync(long expect, long update) {
    return commandExecutor.evalWriteAsync(getName(),
                                          StringCodec.INSTANCE,
                                          RedisCommands.EVAL_BOOLEAN,
                                          "...此處省略...",
                                          Collections.<Object>singletonList(getName()),
                                          expect, update);
}

此處的 evalWriteAsync 就是在 CommandAsyncService 中實現的,如下

public <T, R> Future<R> evalWriteAsync(String key,
                                       Codec codec,
                                       RedisCommand<T> evalCommandType,
                                       String script,
                                       List<Object> keys,
                                       Object ... params) {
    NodeSource source = getNodeSource(key);
    return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}

private <T, R> Future<R> evalAsync(NodeSource nodeSource,
                                   boolean readOnlyMode,
                                   Codec codec,
                                   RedisCommand<T> evalCommandType,
                                   String script,
                                   List<Object> keys,
                                   Object ... params) {
    Promise<R> mainPromise = connectionManager.newPromise();
    List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
    args.add(script);
    args.add(keys.size());
    args.addAll(keys);
    args.addAll(Arrays.asList(params));
    async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0);
    return mainPromise;
}

追根溯源,最后來看看 async 方法的實現,

protected <V, R> void async(final boolean readOnlyMode,
                            final NodeSource source,
                            final Codec codec,
                            final RedisCommand<V> command,
                            final Object[] params,
                            final Promise<R> mainPromise,
                            final int attempt) {
    // ....省略部分代碼....
    // AsyncDetails 是一個包裝對象,用來將異步調用過程中的對象引用包裝起來方便使用
    final AsyncDetails<V, R> details = AsyncDetails.acquire();
    details.init(connectionFuture, attemptPromise,
            readOnlyMode, source, codec, command, params, mainPromise, attempt);

    // retryTimerTask 用來實現 Redisson 提供的重試機制
    final TimerTask retryTimerTask = new TimerTask() {

        @Override
        public void run(Timeout t) throws Exception {
            // ....省略部分代碼....
            int count = details.getAttempt() + 1;
            // ....省略部分代碼....
            async(details.isReadOnlyMode(), details.getSource(),
                    details.getCodec(), details.getCommand(),
                    details.getParams(), details.getMainPromise(), count);
            AsyncDetails.release(details);
        }
    };
    // 啟用重試機制
    Timeout timeout = connectionManager.newTimeout(retryTimerTask,
            connectionManager.getConfig().getRetryInterval(),
            TimeUnit.MILLISECONDS);
    details.setTimeout(timeout);

    // checkConnectionFuture 用于檢查客戶端是否與服務端集群建立連接,如果連接建立
    // 則可發(fā)送命令到服務端執(zhí)行
    if (connectionFuture.isDone()) {
        checkConnectionFuture(source, details);
    } else {
        connectionFuture.addListener(new FutureListener<RedisConnection>() {
            @Override
            public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
                checkConnectionFuture(source, details);
            }
        });
    }

    // ....省略部分代碼....
}

private <R, V> void checkConnectionFuture(final NodeSource source,
        final AsyncDetails<V, R> details) {
    // ....省略部分代碼....
    // 獲取客戶端與服務端集群建立的連接
    final RedisConnection connection = details.getConnectionFuture().getNow();

    if (details.getSource().getRedirect() == Redirect.ASK) {
        // 客戶端接收到 ASK 轉向, 先發(fā)送一個 ASKING 命令,然后再發(fā)送真正的命令請求
        // ....省略部分代碼....
    } else {
        // ....省略部分代碼....
        // 客戶端發(fā)送命令到服務端
        ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(),
                details.getCodec(), details.getCommand(), details.getParams()));
        details.setWriteFuture(future);
    }
    // ....省略部分代碼....
    // 釋放本次連接
    releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(),
            details.getAttemptPromise(), details);
}

由于代碼太長,我只貼出了和執(zhí)行命令有關的部分代碼,我們可以從上面代碼中看到

  • Redisson 對每次操作都提供了重試機制,可配置 retryAttempts 來控制重試次數(缺省為3次),可配置 retryInterval 來控制重試間隔(缺省為 1000 ms)。Redisson 中使用了 Netty 的 TimerTaskTimeout 工具來實現其重試機制。
  • Redisson 中也大量使用了 Netty 實現的異步工具 FutureFutureListener,使得異步調用執(zhí)行完成后能夠立刻做出對應的操作。
  • RedissonConnection 是基于 Netty 實現的,發(fā)送命令的 send 方法實現是使用 Netty 的 Channel.writeAndFlush 方法。

以上便是 Redisson 的異步實現。

同步

Redisson 里的同步都是基于異步來實現的,為什么這么說,來看看 RedissonAtomicLongcompareAndSet方法,

public boolean compareAndSet(long expect, long update) {
    return get(compareAndSetAsync(expect, update));
}

可見是在之前的異步方法外套了一個 get 方法,而這個 get 方法實際上也是在異步實現類 CommandAsyncService 中實現的,至于同步的實現類 CommandSyncService 有興趣大家可以去看看,基本上都是在異步實現返回的 Future 外套了一個 get 方法。那么我們就看看 get 的實現,

public <V> V get(Future<V> future) {
    final CountDownLatch l = new CountDownLatch(1);
    future.addListener(new FutureListener<V>() {
        @Override
        public void operationComplete(Future<V> future) throws Exception {
            l.countDown();
        }
    });
    try {
        // 阻塞當前線程
        l.await();
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    if (future.isSuccess()) {
        return future.getNow();
    }
    throw convertException(future);
}

原來是利用了 CountDownLatch 在異步調用結果返回前將當前線程阻塞,然后通過 Netty 的 FutureListener在異步調用完成后解除阻塞,并返回調用結果。

Reactive

從例子中可以看到,Reactive 的客戶端和對象實現都是獨立的,先來看看 RedissonAtomicLongReactivecompareAndSet 方法,

public Publisher<Boolean> compareAndSet(long expect, long update) {
    return commandExecutor.evalWriteReactive(getName(), StringCodec.INSTANCE,
            RedisCommands.EVAL_BOOLEAN,
            "if redis.call('get', KEYS[1]) == ARGV[1] then "
                 + "redis.call('set', KEYS[1], ARGV[2]); "
                 + "return 1 "
               + "else "
                 + "return 0 end",
            Collections.<Object>singletonList(getName()), expect, update);
}

它調用的是 CommandReactiveService 中實現的 evalWriteReactive 方法,

public <T, R> Publisher<R> evalWriteReactive(String key, Codec codec,
        RedisCommand<T> evalCommandType, String script, List<Object> keys,
        Object... params) {
  Future<R> f = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
  return new NettyFuturePublisher<R>(f);
}

我們看到這里還是基于異步調用實現的,只是將異步調用返回的 Future 封裝在了一個 NettyFuturePublisher 對象中返回,這個對象繼承了 Reactive Streams 中的 Stream,所以我的解讀也只能到此為止了,Reactive Streams 的相關知識目前我還不具備。

總結

  • Redisson 提供了 同步、異步 和 Reactive 三種命令執(zhí)行方式。
  • 同步 和 Reactive 的實現是基于 異步 的實現的。
  • Redisson 使用 Netty 連接 Redis 服務,并依賴 Netty 異步工具類來實現異步通信、重試機制、阻塞等特性。
  • Redisson 使用 Reactive Streams 來實現 Reactive 特性。
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容