zookeeper的分布式隊列應用

zookeeper可實現(xiàn)簡單的分布式隊列。

curator實現(xiàn)了先入先出的分布式消息隊列,它采用的是zookeeper的持久化有序節(jié)點。

DistributedQueue是最普通的一種隊列(FIFO)

廢話不多說,直接上代碼

 //序列化數(shù)據(jù)
        QueueSerializer queueSerializer = new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes(StandardCharsets.UTF_8);
            }
            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes,StandardCharsets.UTF_8);
            }
        };
        //消費者
        QueueConsumer<String> consumer = new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
                System.out.println("state changed");
            }
            @Override
            public void consumeMessage(String s) {
                //do someThing
                System.out.println("消費數(shù)據(jù):" + s);
            }
        };
        /**
         * client:客戶端
         * consumer:消費者,它可以接收隊列的數(shù)據(jù)。
         * queueSerializer:對隊列中的對象的序列化和反序列化
         * queuePath:PERSISTENT-SEQUENTIAL:持久化有序節(jié)點
         * lockPath:TRANSIENT-SEQUENTIAL:消費者加鎖, 當消費者消費數(shù)據(jù)時持有鎖,這樣其它消費者不能消費此消息,性能損失。
         */
        DistributedQueue queue = QueueBuilder.builder(client, consumer, queueSerializer, "/example/customize-queue")
                .lockPath("/lockPath-")
                .buildQueue();

        queue.start();

        for (int i = 0; i < 10; i++) {
            queue.put("test-" + i);//生產(chǎn)數(shù)據(jù)
        }

        Thread.sleep(60000);
        CloseableUtils.closeQuietly(queue);
        CloseableUtils.closeQuietly(client);

其他分布式隊列

  • DistributedQueue:是最普通的一種隊列;queue.put(" test-" + i);
  • DistributedIdQueue:可以為隊列中的每一個元素設置一個ID。 可以通過ID把隊列中 任意的元素移除,queue.put(" test-" + i, "Id" + i);
  • DistributedPriorityQueue:優(yōu)先級隊列對隊列中的元素按照優(yōu)先級進行排序。 Priority 越小, 元素月靠前, 越先被消費掉,queue.put("test-" + i, priority)
  • DistributedDelayQueue:延遲隊列,消費者隔一段時間才能收到元素, queue.put(aMessage, delayUntilEpoch);

最后,我想說的是,zookeeper雖然可最分布式隊列,但是官方不推薦。主要原因在于在高并發(fā)常見下,效果不好。具體情況要看自己的業(yè)務常見。
官方解釋:http://curator.apache.org/curator-recipes/distributed-queue.html

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容