Redis cluster使用pipeline

一般解決思路

redis集群有16384個(gè)slot,例如有3個(gè)節(jié)點(diǎn),那么每個(gè)節(jié)點(diǎn)可能分配的slot為Node A是0-5500,Node B是5501-11000,Node C是11001-16383。pipeline是要基于某個(gè)節(jié)點(diǎn)的,所以如果要用pipeline查詢某些key的值,那么就需要通過JedisClusterCRC16.getSlot(key)計(jì)算key的slot值,通過上面每個(gè)節(jié)點(diǎn)的slot分布,就知道了哪些key應(yīng)該在哪些節(jié)點(diǎn)上。再獲取這個(gè)節(jié)點(diǎn)的JedisPool就可以使用pipeline進(jìn)行讀寫了。實(shí)現(xiàn)上面的過程可以有很多種方式,本文將介紹一種也許是代碼量最少的一種解決方法。本文基于redis 3.2.9(如何安裝redis集群請(qǐng)參考http://www.itdecent.cn/p/64d05c4e0ae2)以及

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>2.9.0</version>
</dependency>

解決方案

上一節(jié)提到的過程,其實(shí)在JedisClusterInfoCache對(duì)象中都已經(jīng)幫助開發(fā)人員實(shí)現(xiàn)了,但是這個(gè)對(duì)象在JedisClusterConnectionHandler中為protected并沒有對(duì)外開放,而且通過JedisCluster的API也無法拿到JedisClusterConnectionHandler對(duì)象。所以通過下面兩個(gè)類將這些對(duì)象暴露出來,這樣使用getJedisPoolFromSlot就可以知道每個(gè)key對(duì)應(yīng)的JedisPool了。

class JedisClusterPlus extends JedisCluster {

    public JedisClusterPlus(Set<HostAndPort> jedisClusterNode, int connectionTimeout, int soTimeout, final GenericObjectPoolConfig poolConfig) {
        super(jedisClusterNode);
        super.connectionHandler = new JedisSlotAdvancedConnectionHandler(jedisClusterNode, poolConfig,
                connectionTimeout, soTimeout);
    }

    public JedisSlotAdvancedConnectionHandler getConnectionHandler() {
        return (JedisSlotAdvancedConnectionHandler)this.connectionHandler;
    }
}
public class JedisSlotAdvancedConnectionHandler extends JedisSlotBasedConnectionHandler{

    public JedisSlotAdvancedConnectionHandler(Set<HostAndPort> nodes, GenericObjectPoolConfig poolConfig, int connectionTimeout, int soTimeout) {
        super(nodes, poolConfig, connectionTimeout, soTimeout);
    }

    public JedisPool getJedisPoolFromSlot(int slot) {
        JedisPool connectionPool = cache.getSlotPool(slot);
        if (connectionPool != null) {
            // It can't guaranteed to get valid connection because of node
            // assignment
            return connectionPool;
        } else {
            renewSlotCache(); //It's abnormal situation for cluster mode, that we have just nothing for slot, try to rediscover state
            connectionPool = cache.getSlotPool(slot);
            if (connectionPool != null) {
                return connectionPool;
            } else {
                throw new JedisNoReachableClusterNodeException("No reachable node in cluster for slot " + slot);
            }
        }
    }
}

Demo

public class Tester {
    public static void main(String[] args) {
        Set<HostAndPort> jedisClusterNode = new HashSet<>();
        HostAndPort hostAndPort1 = new HostAndPort("hostA",7000);
        HostAndPort hostAndPort2 = new HostAndPort("hostB",7001);
        HostAndPort hostAndPort3 = new HostAndPort("hostC",7002);
        jedisClusterNode.add(hostAndPort1);
        jedisClusterNode.add(hostAndPort2);
        jedisClusterNode.add(hostAndPort3);

        JedisClusterPlus jedisClusterPlus = new JedisClusterPlus(jedisClusterNode, 2000, 2000, new JedisPoolConfig());
        JedisSlotAdvancedConnectionHandler jedisSlotAdvancedConnectionHandler = jedisClusterPlus.getConnectionHandler();

        String[] testKeys = {"foo","bar","xyz"};

        Map<JedisPool, List<String>> poolKeys = new HashMap<>();

        for (String key : testKeys) {
            int slot = JedisClusterCRC16.getSlot(key);
            JedisPool jedisPool = jedisSlotAdvancedConnectionHandler.getJedisPoolFromSlot(slot);
            if (poolKeys.keySet().contains(jedisPool)){
                List<String> keys = poolKeys.get(jedisPool);
                keys.add(key);
            }else {
                List<String> keys = new ArrayList<>();
                keys.add(key);
                poolKeys.put(jedisPool, keys);
            }
        }

        for (JedisPool jedisPool : poolKeys.keySet()) {
            Jedis jedis = jedisPool.getResource();
            Pipeline pipeline = jedis.pipelined();

            List<String> keys = poolKeys.get(jedisPool);

            keys.forEach(key ->pipeline.get(key));

            List result = pipeline.syncAndReturnAll();

            System.out.println(result);

            jedis.close();
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • redis集群分為服務(wù)端集群和客戶端分片,redis3.0以上版本實(shí)現(xiàn)了集群機(jī)制,即服務(wù)端集群,3.0以下使用客戶...
    hadoop_null閱讀 1,682評(píng)論 0 6
  • 轉(zhuǎn)發(fā):Redis Cluster探索與思考 Redis Cluster的基本原理和架構(gòu) Redis Cluster...
    meng_philip123閱讀 3,711評(píng)論 0 14
  • NOSQL類型簡介鍵值對(duì):會(huì)使用到一個(gè)哈希表,表中有一個(gè)特定的鍵和一個(gè)指針指向特定的數(shù)據(jù),如redis,volde...
    MicoCube閱讀 4,166評(píng)論 2 27
  • 請(qǐng)求路由 目前我們已經(jīng)搭建好Redis集群并且理解了通信和伸縮細(xì)節(jié),但還沒有使用客戶端去操作集群。Redis集群對(duì)...
    達(dá)微閱讀 1,243評(píng)論 0 1
  • 1 Redis介紹1.1 什么是NoSql為了解決高并發(fā)、高可擴(kuò)展、高可用、大數(shù)據(jù)存儲(chǔ)問題而產(chǎn)生的數(shù)據(jù)庫解決方...
    克魯?shù)吕?/span>閱讀 5,741評(píng)論 0 36

友情鏈接更多精彩內(nèi)容