zookeeper可實現(xiàn)簡單的分布式隊列。
curator實現(xiàn)了先入先出的分布式消息隊列,它采用的是zookeeper的持久化有序節(jié)點。
DistributedQueue是最普通的一種隊列(FIFO)
廢話不多說,直接上代碼
//序列化數(shù)據(jù)
QueueSerializer queueSerializer = new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes(StandardCharsets.UTF_8);
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes,StandardCharsets.UTF_8);
}
};
//消費者
QueueConsumer<String> consumer = new QueueConsumer<String>() {
@Override
public void stateChanged(CuratorFramework curatorFramework, ConnectionState connectionState) {
System.out.println("state changed");
}
@Override
public void consumeMessage(String s) {
//do someThing
System.out.println("消費數(shù)據(jù):" + s);
}
};
/**
* client:客戶端
* consumer:消費者,它可以接收隊列的數(shù)據(jù)。
* queueSerializer:對隊列中的對象的序列化和反序列化
* queuePath:PERSISTENT-SEQUENTIAL:持久化有序節(jié)點
* lockPath:TRANSIENT-SEQUENTIAL:消費者加鎖, 當消費者消費數(shù)據(jù)時持有鎖,這樣其它消費者不能消費此消息,性能損失。
*/
DistributedQueue queue = QueueBuilder.builder(client, consumer, queueSerializer, "/example/customize-queue")
.lockPath("/lockPath-")
.buildQueue();
queue.start();
for (int i = 0; i < 10; i++) {
queue.put("test-" + i);//生產(chǎn)數(shù)據(jù)
}
Thread.sleep(60000);
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
其他分布式隊列
- DistributedQueue:是最普通的一種隊列;queue.put(" test-" + i);
- DistributedIdQueue:可以為隊列中的每一個元素設置一個ID。 可以通過ID把隊列中 任意的元素移除,queue.put(" test-" + i, "Id" + i);
- DistributedPriorityQueue:優(yōu)先級隊列對隊列中的元素按照優(yōu)先級進行排序。 Priority 越小, 元素月靠前, 越先被消費掉,queue.put("test-" + i, priority)
- DistributedDelayQueue:延遲隊列,消費者隔一段時間才能收到元素, queue.put(aMessage, delayUntilEpoch);
最后,我想說的是,zookeeper雖然可最分布式隊列,但是官方不推薦。主要原因在于在高并發(fā)常見下,效果不好。具體情況要看自己的業(yè)務常見。
官方解釋:http://curator.apache.org/curator-recipes/distributed-queue.html
