生產(chǎn)者
public class PulsarProducer {
private static String localClusterUrl = "pulsar://localhost:6650";
public static void main(String[] args) {
try {
Producer<byte[]> producer = getProducer();
String msg = "test send";
Long start = System.currentTimeMillis();
MessageId msgId = producer.send(msg.getBytes());
System.out.println("spend=" + (System.currentTimeMillis() - start) + ";send a message msgId = " + msgId.toString());
} catch (Exception e) {
System.err.println(e);
}
}
public static Producer<byte[]> getProducer() throws Exception {
PulsarClient client;
client = PulsarClient.builder().serviceUrl(localClusterUrl).build();
Producer<byte[]> producer = client.newProducer()
.topic("persistent://my-tenant/my-namespace/my-topic")
.producerName("test-producer")
.create();
return producer;
}
}
消費(fèi)者
public class PulsarConsumerDemo {
private static String localClusterUrl = "pulsar://localhost:6650";
public static void main(String[] args) {
try {
//將訂閱消費(fèi)者指定的主題和訂閱
Consumer<byte[]> consumer = getClient().newConsumer()
.topic("persistent://my-tenant/my-namespace/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.subscribe();
while (true) {
Message msg = consumer.receive();
System.out.printf("consumer-Message received: %s. \n", new String(msg.getData()));
// 確認(rèn)消息,以便broker刪除消息
consumer.acknowledge(msg);
}
} catch (Exception e) {
System.out.println(e);
}
}
public static PulsarClient getClient() throws Exception {
PulsarClient client;
client = PulsarClient.builder()
.serviceUrl(localClusterUrl).build();
return client;
}
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。