SpringBoot入門建站全系列(十七)整合ActiveMq(JMS類消息隊(duì)列)
一、概述
消息中間件利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成。通過提供消息傳遞和消息排隊(duì)模型,它可以在分布式環(huán)境下擴(kuò)展進(jìn)程間的通信。對(duì)于消息中間件,常見的角色大致也就有Producer(生產(chǎn)者)、Consumer(消費(fèi)者)
常見的消息中間件產(chǎn)品:
(1)ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。這里介紹的是ActiveMQ的使用。
(2)RabbitMQ
AMQP協(xié)議的領(lǐng)導(dǎo)實(shí)現(xiàn),支持多種場景。淘寶的MySQL集群內(nèi)部有使用它進(jìn)行通訊,OpenStack開源云平臺(tái)的通信組件,最先在金融行業(yè)得到運(yùn)用。
(3)ZeroMQ
史上最快的消息隊(duì)列系統(tǒng)
(4)Kafka
Apache下的一個(gè)子項(xiàng)目 。特點(diǎn):高吞吐,在一臺(tái)普通的服務(wù)器上既可以達(dá)到10W/s的吞吐速率;完全的分布式系統(tǒng)。適合處理海量數(shù)據(jù)。
Jms
JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無關(guān)的API,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。
AMQP
AMQP是一種協(xié)議,更準(zhǔn)確的說是一種binary wire-level protocol(鏈接協(xié)議)。這是其和JMS的本質(zhì)差別,AMQP不從API層進(jìn)行限定,而是直接定義網(wǎng)絡(luò)交換的數(shù)據(jù)格式。這使得實(shí)現(xiàn)了AMQP的provider天然性就是跨平臺(tái)的。意味著我們可以使用Java的AMQP provider,同時(shí)使用一個(gè)python的producer加一個(gè)rubby的consumer。從這一點(diǎn)看,AQMP可以用http來進(jìn)行類比,不關(guān)心實(shí)現(xiàn)的語言,只要大家都按照相應(yīng)的數(shù)據(jù)格式去發(fā)送報(bào)文請求,不同語言的client均可以和不同語言的server鏈接。
首發(fā)地址:
品茗IT-同步發(fā)布
品茗IT 提供在線支持:
一鍵快速構(gòu)建SpringBoot項(xiàng)目工具
一鍵快速構(gòu)建SpringCloud項(xiàng)目工具
代碼可以在SpringBoot組件化構(gòu)建https://www.pomit.cn/java/spring/springboot.html中的ActiveMQ組件中查看,并下載。
如果大家正在尋找一個(gè)java的學(xué)習(xí)環(huán)境,或者在開發(fā)中遇到困難,可以加入我們的java學(xué)習(xí)圈,點(diǎn)擊即可加入,共同學(xué)習(xí),節(jié)約學(xué)習(xí)時(shí)間,減少很多在學(xué)習(xí)中遇到的難題。
二、配置
本文假設(shè)你已經(jīng)引入spring-boot-starter-web。已經(jīng)是個(gè)SpringBoot項(xiàng)目了,如果不會(huì)搭建,可以打開這篇文章看一看《SpringBoot入門建站全系列(一)項(xiàng)目建立》。
2.1 Maven依賴
使用activemq可以使用spring-boot-starter-activemq,方便快捷,一般springboot對(duì)大多數(shù)開源項(xiàng)目都做了整合,提供了專用的stater。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2.2 配置文件
在application.properties 中需要配置activemq的信息,也可以配置自定義的配置,如:
spring.activemq.broker-url=tcp://localhost:61616
#spring.activemq.user=admin
#spring.activemq.password=secret
jms.destQueueName=destQueue
這里面,
- spring.activemq.broker-url是springboot自動(dòng)裝配的配置,activemq的地址。
- spring.activemq.user是springboot自動(dòng)裝配的配置,activemq的用戶名,一般自己測試都不會(huì)去設(shè)置用戶名密碼的。
- spring.activemq.password是springboot自動(dòng)裝配的配置,,activemq的密碼,一般自己測試都不會(huì)去設(shè)置用戶名密碼的。
- jms.destQueueName,這是我自己定義的一個(gè)目的隊(duì)列名。
三、ActiveMQ的使用
3.1 配置數(shù)據(jù)轉(zhuǎn)換
ActiveMQ是分為生產(chǎn)者和消費(fèi)者的,生產(chǎn)者生產(chǎn)的消息,如何能夠被消費(fèi)者正常解析,需要開發(fā)者自己對(duì)數(shù)據(jù)轉(zhuǎn)換做定義,如果你非要說,我用字符串也可以啊,那當(dāng)這個(gè)不存在就行了。
下面這個(gè)配置是指明了ActiveMQ的數(shù)據(jù)轉(zhuǎn)換是用MappingJackson2MessageConverter,將json數(shù)據(jù)轉(zhuǎn)換為對(duì)象,或者將對(duì)象轉(zhuǎn)換為json。
ActiveMQConfig :
package com.cff.springbootwork.activemq;
import javax.jms.ConnectionFactory;
import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
@Configuration
public class ActiveMQConfig {
@Bean
public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
}
3.2 生產(chǎn)者
ActiveMQ當(dāng)然需要生產(chǎn)者來生產(chǎn)信息,然后才發(fā)送到消息隊(duì)列的。
JmsProducer:
package com.cff.springbootwork.activemq.handler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import com.cff.springbootwork.activemq.model.DefaultMqModel;
@Component
public class JmsProducer {
@Autowired
JmsTemplate jmsTemplate;
@Value("${jms.destQueueName}")
String destQueueName;
public void send(DefaultMqModel defaultMqModel) {
jmsTemplate.convertAndSend(destQueueName, defaultMqModel);
}
}
這里的JmsTemplate 是Spring自動(dòng)生成的bean。destQueueName注入的是配置文件中定義的目的隊(duì)列。然后發(fā)送數(shù)據(jù)。
測試發(fā)送數(shù)據(jù),ActiveMQRest :
package com.cff.springbootwork.activemq.web;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import com.cff.springbootwork.activemq.handler.JmsProducer;
import com.cff.springbootwork.activemq.model.DefaultMqModel;
import com.cff.springbootwork.activemq.model.SeccondMqModel;
@RestController
@RequestMapping("/activemq")
public class ActiveMQRest {
@Autowired
JmsProducer jmsProducer;
@RequestMapping(value = "/test", method = { RequestMethod.GET })
public DefaultMqModel test() {
DefaultMqModel defaultMqModel = new DefaultMqModel();
defaultMqModel.setContent("hahahahahahhahaha哈啊哈哈");
defaultMqModel.setTitle("測試");
defaultMqModel.setType(1);
jmsProducer.send(defaultMqModel);
return defaultMqModel;
}
@RequestMapping(value = "/test2", method = { RequestMethod.GET })
public SeccondMqModel test2() {
SeccondMqModel seccondMqModel = new SeccondMqModel();
seccondMqModel.setContent("asdasdasd哈啊哈哈");
seccondMqModel.setTitle("測試2");
seccondMqModel.setRemark("第二個(gè)");
seccondMqModel.setType(2);
jmsProducer.send(seccondMqModel);
return seccondMqModel;
}
}
3.3 消費(fèi)者
消費(fèi)者監(jiān)聽指定的隊(duì)列,需要用@JmsListener注解標(biāo)明它是一個(gè)消費(fèi)者。
JmsConsumer :
package com.cff.springbootwork.activemq.handler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import com.cff.springbootwork.activemq.model.DefaultMqModel;
import com.cff.springbootwork.activemq.service.BusinessSerivce;
@Component
public class JmsConsumer {
@Autowired
BusinessSerivce businessSerivce;
@JmsListener(destination = "${jms.destQueueName}")
public void processMessage(DefaultMqModel defaultMqModel) {
businessSerivce.doBusiness(defaultMqModel);
}
}
這里,@JmsListener注解表明它在監(jiān)聽我們配置文件中配置的隊(duì)列,方法的參數(shù)可以直接傳入對(duì)象,因?yàn)槲覀冊?.1中配置了統(tǒng)一的MessageConverter,它可以自動(dòng)解析成對(duì)象。
BusinessSerivce是我自定義的一個(gè)業(yè)務(wù)邏輯處理service.
BusinessSerivce:
package com.cff.springbootwork.activemq.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.cff.springbootwork.activemq.model.DefaultMqModel;
import com.cff.springbootwork.activemq.model.SeccondMqModel;
@Service
public class BusinessSerivce {
private final Logger log = LoggerFactory.getLogger(this.getClass());
public void doBusiness(DefaultMqModel defaultMqModel) {
log.info(defaultMqModel.toString());
if(defaultMqModel.getType() == 2 && defaultMqModel instanceof SeccondMqModel){
SeccondMqModel seccondMqModel = (SeccondMqModel) defaultMqModel;
log.info(seccondMqModel.remark);
}
}
}
四、過程中用到的實(shí)體
DefaultMqModel:
SeccondMqModel:
詳細(xì)完整的實(shí)體,可以訪問品茗IT-博客《SpringBoot入門建站全系列(十七)整合ActiveMq(JMS類消息隊(duì)列)》進(jìn)行查看
快速構(gòu)建項(xiàng)目
喜歡這篇文章么,喜歡就加入我們的Java學(xué)習(xí)圈(點(diǎn)擊加入或下方掃碼)一起討論SpringBoot技術(shù)吧!