轉自:https://github.com/angryz/my-blog/issues/6
上篇Redisson 分布式鎖實現分析中提到了RedissonLock中的redis命令都是通過CommandExecutor來發(fā)送到redis服務執(zhí)行的,本篇就來了解一下它的實現方式。
先來看其源碼
public interface CommandExecutor extends CommandSyncExecutor, CommandAsyncExecutor {
}
可以看到它同時繼承了 同步和異步(sync/async) 兩種調用方式。
Note:
- 在分布式鎖的實現中是用了同步的 CommandExecutor,是因為鎖的獲取和釋放是有強一致性要求的,需要實時知道結果方可進行下一步操作。
- 上篇分布式鎖分析中我提到 Redisson 的同步實現實際上是基于異步實現的,這在下文中也會得到解釋。
在Redisson中,除了提供同步和異步的方式執(zhí)行命令之外,還通過 Reactive Streams 實現了 Reactive 方式的命令執(zhí)行器。見下圖
預備知識
Redisson 大量使用了 Redis 的 EVAL 命令來執(zhí)行 Lua 腳本,所以先要對 EVAL 有所了解。
EVAL命令格式和示例
EVAL script numkeys key [key ...] arg [arg ...]
> eval "return redis.call('set',KEYS[1],ARGV[1])" 1 foo bar
OK
從 Redis 2.6.0 版本開始,通過內置的 Lua 解釋器,可以使用 EVAL 命令對 Lua 腳本進行求值。
參數的說明本文不再詳述,可查閱 Redis命令參考。
重點是這個:Redis 使用單個 Lua 解釋器去運行所有腳本,并且 Redis 也保證腳本會以原子性(atomic)的方式執(zhí)行,當某個腳本正在運行的時候,不會有其他腳本或 Redis 命令被執(zhí)行。所以 Redisson 中使用了 EVAL 來保證執(zhí)行命令操作數據時的安全性。
例子
這里就使用 Redisson 參考文檔中的一個 RAtomicLong 對象的例子吧。
RedissonClient client = Redisson.create(config);
RAtomicLong longObject = client.getAtomicLong('myLong');
// 同步方式
longObject.compareAndSet(3, 401);
// 異步方式
longObject.compareAndSetAsync(3, 401);
RedissonReactiveClient client = Redisson.createReactive(config);
RAtomicLongReactive longObject = client.getAtomicLong('myLong');
// reactive方式
longObject.compareAndSet(3, 401);
根據此例,我們分別來看 compareAndSet/compareAndSetAsync 的實現,其他命令原理都一樣。
異步
既然同步和Reactive的實現都繼承了異步的實現,那我們就先來看看CommandAsyncService吧。
例子中的 longObject.compareAndSetAsync(3, 401); 實際執(zhí)行的是 RedissonAtomicLong 實現類的 compareAndSetAsync 方法,如下
public Future<Boolean> compareAndSetAsync(long expect, long update) {
return commandExecutor.evalWriteAsync(getName(),
StringCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"...此處省略...",
Collections.<Object>singletonList(getName()),
expect, update);
}
此處的 evalWriteAsync 就是在 CommandAsyncService 中實現的,如下
public <T, R> Future<R> evalWriteAsync(String key,
Codec codec,
RedisCommand<T> evalCommandType,
String script,
List<Object> keys,
Object ... params) {
NodeSource source = getNodeSource(key);
return evalAsync(source, false, codec, evalCommandType, script, keys, params);
}
private <T, R> Future<R> evalAsync(NodeSource nodeSource,
boolean readOnlyMode,
Codec codec,
RedisCommand<T> evalCommandType,
String script,
List<Object> keys,
Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0);
return mainPromise;
}
追根溯源,最后來看看 async 方法的實現,
protected <V, R> void async(final boolean readOnlyMode,
final NodeSource source,
final Codec codec,
final RedisCommand<V> command,
final Object[] params,
final Promise<R> mainPromise,
final int attempt) {
// ....省略部分代碼....
// AsyncDetails 是一個包裝對象,用來將異步調用過程中的對象引用包裝起來方便使用
final AsyncDetails<V, R> details = AsyncDetails.acquire();
details.init(connectionFuture, attemptPromise,
readOnlyMode, source, codec, command, params, mainPromise, attempt);
// retryTimerTask 用來實現 Redisson 提供的重試機制
final TimerTask retryTimerTask = new TimerTask() {
@Override
public void run(Timeout t) throws Exception {
// ....省略部分代碼....
int count = details.getAttempt() + 1;
// ....省略部分代碼....
async(details.isReadOnlyMode(), details.getSource(),
details.getCodec(), details.getCommand(),
details.getParams(), details.getMainPromise(), count);
AsyncDetails.release(details);
}
};
// 啟用重試機制
Timeout timeout = connectionManager.newTimeout(retryTimerTask,
connectionManager.getConfig().getRetryInterval(),
TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
// checkConnectionFuture 用于檢查客戶端是否與服務端集群建立連接,如果連接建立
// 則可發(fā)送命令到服務端執(zhí)行
if (connectionFuture.isDone()) {
checkConnectionFuture(source, details);
} else {
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> connFuture) throws Exception {
checkConnectionFuture(source, details);
}
});
}
// ....省略部分代碼....
}
private <R, V> void checkConnectionFuture(final NodeSource source,
final AsyncDetails<V, R> details) {
// ....省略部分代碼....
// 獲取客戶端與服務端集群建立的連接
final RedisConnection connection = details.getConnectionFuture().getNow();
if (details.getSource().getRedirect() == Redirect.ASK) {
// 客戶端接收到 ASK 轉向, 先發(fā)送一個 ASKING 命令,然后再發(fā)送真正的命令請求
// ....省略部分代碼....
} else {
// ....省略部分代碼....
// 客戶端發(fā)送命令到服務端
ChannelFuture future = connection.send(new CommandData<V, R>(details.getAttemptPromise(),
details.getCodec(), details.getCommand(), details.getParams()));
details.setWriteFuture(future);
}
// ....省略部分代碼....
// 釋放本次連接
releaseConnection(source, details.getConnectionFuture(), details.isReadOnlyMode(),
details.getAttemptPromise(), details);
}
由于代碼太長,我只貼出了和執(zhí)行命令有關的部分代碼,我們可以從上面代碼中看到
- Redisson 對每次操作都提供了重試機制,可配置
retryAttempts來控制重試次數(缺省為3次),可配置retryInterval來控制重試間隔(缺省為 1000 ms)。Redisson 中使用了 Netty 的TimerTask和Timeout工具來實現其重試機制。 - Redisson 中也大量使用了 Netty 實現的異步工具
Future和FutureListener,使得異步調用執(zhí)行完成后能夠立刻做出對應的操作。 - RedissonConnection 是基于 Netty 實現的,發(fā)送命令的
send方法實現是使用 Netty 的Channel.writeAndFlush方法。
以上便是 Redisson 的異步實現。
同步
Redisson 里的同步都是基于異步來實現的,為什么這么說,來看看 RedissonAtomicLong 的 compareAndSet方法,
public boolean compareAndSet(long expect, long update) {
return get(compareAndSetAsync(expect, update));
}
可見是在之前的異步方法外套了一個 get 方法,而這個 get 方法實際上也是在異步實現類 CommandAsyncService 中實現的,至于同步的實現類 CommandSyncService 有興趣大家可以去看看,基本上都是在異步實現返回的 Future 外套了一個 get 方法。那么我們就看看 get 的實現,
public <V> V get(Future<V> future) {
final CountDownLatch l = new CountDownLatch(1);
future.addListener(new FutureListener<V>() {
@Override
public void operationComplete(Future<V> future) throws Exception {
l.countDown();
}
});
try {
// 阻塞當前線程
l.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (future.isSuccess()) {
return future.getNow();
}
throw convertException(future);
}
原來是利用了 CountDownLatch 在異步調用結果返回前將當前線程阻塞,然后通過 Netty 的 FutureListener在異步調用完成后解除阻塞,并返回調用結果。
Reactive
從例子中可以看到,Reactive 的客戶端和對象實現都是獨立的,先來看看 RedissonAtomicLongReactive 的 compareAndSet 方法,
public Publisher<Boolean> compareAndSet(long expect, long update) {
return commandExecutor.evalWriteReactive(getName(), StringCodec.INSTANCE,
RedisCommands.EVAL_BOOLEAN,
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return 1 "
+ "else "
+ "return 0 end",
Collections.<Object>singletonList(getName()), expect, update);
}
它調用的是 CommandReactiveService 中實現的 evalWriteReactive 方法,
public <T, R> Publisher<R> evalWriteReactive(String key, Codec codec,
RedisCommand<T> evalCommandType, String script, List<Object> keys,
Object... params) {
Future<R> f = evalWriteAsync(key, codec, evalCommandType, script, keys, params);
return new NettyFuturePublisher<R>(f);
}
我們看到這里還是基于異步調用實現的,只是將異步調用返回的 Future 封裝在了一個 NettyFuturePublisher 對象中返回,這個對象繼承了 Reactive Streams 中的 Stream,所以我的解讀也只能到此為止了,Reactive Streams 的相關知識目前我還不具備。
總結
- Redisson 提供了 同步、異步 和 Reactive 三種命令執(zhí)行方式。
- 同步 和 Reactive 的實現是基于 異步 的實現的。
- Redisson 使用 Netty 連接 Redis 服務,并依賴 Netty 異步工具類來實現異步通信、重試機制、阻塞等特性。
- Redisson 使用 Reactive Streams 來實現 Reactive 特性。
