最近很火的消息中間件Pulsar,本想學(xué)習(xí)下,發(fā)現(xiàn)網(wǎng)上很多都是介紹性能和對(duì)比Kafka的文章,實(shí)踐的文章很少!于是對(duì)著官方文檔實(shí)踐了一波,寫下了這篇文章,估計(jì)是國(guó)內(nèi)第一篇Pulsar實(shí)戰(zhàn)文章了,希望對(duì)大家有所幫助!
Pulsar簡(jiǎn)介
Pulsar是一個(gè)用于服務(wù)端到服務(wù)端的消息中間件,具有多租戶、高性能等優(yōu)勢(shì)。Pulsar最初由Yahoo開發(fā),目前由Apache軟件基金會(huì)管理。Pulsar采用
發(fā)布-訂閱設(shè)計(jì)模式,Producer發(fā)布消息到Topic,Consumer訂閱Topic、處理Topic中的消息。
Pulsar具有如下特性:
- Pulsar的單個(gè)實(shí)例原生支持集群。
- 極低的發(fā)布延遲和端到端延遲。
- 可無(wú)縫擴(kuò)展到超過(guò)一百萬(wàn)個(gè)Topic。
- 簡(jiǎn)單易用的客戶端API,支持Java、Go、Python和C++。
- 支持多種Topic訂閱模式(獨(dú)占訂閱、共享訂閱、故障轉(zhuǎn)移訂閱)。
- 通過(guò)Apache BookKeeper提供的持久化消息存儲(chǔ)機(jī)制保證消息傳遞。
Pulsar安裝
使用Docker安裝Pulsar是最簡(jiǎn)單的,這次我們使用Docker來(lái)安裝。
- 首先下載Pulsar的Docker鏡像;
docker pull apachepulsar/pulsar:2.7.1
- 下載完成后運(yùn)行Pulsar容器,http協(xié)議訪問(wèn)使用
8080端口,pulsar協(xié)議(Java、Python等客戶端)訪問(wèn)使用6650端口。
docker run --name pulsar \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
-d apachepulsar/pulsar:2.7.1 \
bin/pulsar standalone
Pulsar可視化
Pulsar Manager是官方提供的可視化工具,可以對(duì)多個(gè)Pulsar進(jìn)行可視化管理,雖然功能不多,但也基本夠用了,支持Docker部署。
- 下載
pulsar-manager的Docker鏡像;
docker pull apachepulsar/pulsar-manager:v0.2.0
- 下載完成后運(yùn)行
pulsar-manager容器,從9527端口可以訪問(wèn)Web頁(yè)面;
docker run -it --name pulsar-manager\
-p 9527:9527 -p 7750:7750 \
-e SPRING_CONFIGURATION_FILE=/pulsar-manager/pulsar-manager/application.properties \
-d apachepulsar/pulsar-manager:v0.2.0
- 運(yùn)行成功后,我們剛開始無(wú)法訪問(wèn),需要?jiǎng)?chuàng)建管理員賬號(hào),這里創(chuàng)建賬號(hào)為
admin:apachepulsar:
CSRF_TOKEN=$(curl http://localhost:7750/pulsar-manager/csrf-token)
curl \
-H "X-XSRF-TOKEN: $CSRF_TOKEN" \
-H "Cookie: XSRF-TOKEN=$CSRF_TOKEN;" \
-H 'Content-Type: application/json' \
-X PUT http://localhost:7750/pulsar-manager/users/superuser \
-d '{"name": "admin", "password": "apachepulsar", "description": "test", "email": "username@test.org"}'
-
創(chuàng)建成功后,通過(guò)登錄頁(yè)面進(jìn)行登錄,訪問(wèn)地址:http://192.168.5.78:9527image.png
- 登錄成功后我們需要先配置一個(gè)環(huán)境,就是將需要管理的Pulsar服務(wù)配置上去,配置的
Service URL為:http://192.168.5.78:8080image.png - 可以查看Tenant列表;
- 可以查看Topic列表和管理Topic;

- 還可以查看Topic的詳細(xì)信息。

Pulsar結(jié)合SpringBoot使用
Pulsar結(jié)合SpringBoot使用也是非常簡(jiǎn)單的,我們可以使用Pulsar官方的Java SDK,也可以使用第三方的SpringBoot Starter。這里使用Starter,非常簡(jiǎn)單!
- 首先在
pom.xml中添加Pulsar相關(guān)依賴;
<!--SpringBoot整合Pulsar-->
<dependency>
<groupId>io.github.majusko</groupId>
<artifactId>pulsar-java-spring-boot-starter</artifactId>
<version>1.0.4</version>
</dependency>
- 然后在
application.yml中添加Pulsar的Service URL配置;
pulsar:
service-url: pulsar://192.168.5.78:6650
- 再添加Pulsar的Java配置,聲明兩個(gè)Topic,并確定好發(fā)送的消息類型;
/**
* Pulsar配置類
* Created by macro on 2021/5/21.
*/
@Configuration
public class PulsarConfig {
@Bean
public ProducerFactory producerFactory() {
return new ProducerFactory()
.addProducer("bootTopic", MessageDto.class)
.addProducer("stringTopic", String.class);
}
}
- 創(chuàng)建Pulsar生產(chǎn)者,往Topic中發(fā)送消息,這里可以發(fā)現(xiàn)Pulsar是支持直接發(fā)送消息對(duì)象的;
/**
* Pulsar消息生產(chǎn)者
* Created by macro on 2021/5/19.
*/
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate<MessageDto> template;
public void send(MessageDto message){
try {
template.send("bootTopic",message);
} catch (PulsarClientException e) {
e.printStackTrace();
}
}
}
- 創(chuàng)建Pulsar消費(fèi)者,從Topic中獲取并消費(fèi)消息,也是可以直接獲取到消息對(duì)象的;
/**
* Pulsar消息消費(fèi)者
* Created by macro on 2021/5/19.
*/
@Slf4j
@Component
public class PulsarRealConsumer {
@PulsarConsumer(topic="bootTopic", clazz= MessageDto.class)
public void consume(MessageDto message) {
log.info("PulsarRealConsumer consume id:{},content:{}",message.getId(),message.getContent());
}
}
- 添加測(cè)試接口,調(diào)用生產(chǎn)者發(fā)送消息;
/**
* Pulsar功能測(cè)試
* Created by macro on 2021/5/19.
*/
@Api(tags = "PulsarController", description = "Pulsar功能測(cè)試")
@Controller
@RequestMapping("/pulsar")
public class PulsarController {
@Autowired
private PulsarProducer pulsarProducer;
@ApiOperation("發(fā)送消息")
@RequestMapping(value = "/sendMessage", method = RequestMethod.POST)
@ResponseBody
public CommonResult sendMessage(@RequestBody MessageDto message) {
pulsarProducer.send(message);
return CommonResult.success(null);
}
}
- 在Swagger中調(diào)用接口進(jìn)行測(cè)試;
- 調(diào)用成功后,控制臺(tái)將輸入如下信息,表示消息已經(jīng)被成功接收并消費(fèi)了。
2021-05-21 16:25:07.756 INFO 11472 --- [al-listener-3-1] c.m.m.tiny.component.PulsarRealConsumer : PulsarRealConsumer consum
總結(jié)
上次寫了一篇《吊炸天的 Kafka 圖形化工具 Eagle,必須推薦給你!》介紹了Kafka的基本使用,這里和Pulsar做個(gè)對(duì)比。Pulsar對(duì)Docker支持無(wú)疑是更好的,官方文檔也更全。對(duì)比下圖形化工具Pulsar Manager和Kafka Eagle,Pulsar的圖形化工具感覺(jué)有點(diǎn)簡(jiǎn)陋。介于目前雅虎、騰訊、360等互聯(lián)網(wǎng)大廠都在使用Pulsar,Pulsar的性能和穩(wěn)定性應(yīng)該是很不錯(cuò)的!

