SpringBoot入門建站全系列(十七)整合ActiveMq(JMS類消息隊(duì)列)

SpringBoot入門建站全系列(十七)整合ActiveMq(JMS類消息隊(duì)列)

一、概述

消息中間件利用高效可靠的消息傳遞機(jī)制進(jìn)行平臺(tái)無關(guān)的數(shù)據(jù)交流,并基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成。通過提供消息傳遞和消息排隊(duì)模型,它可以在分布式環(huán)境下擴(kuò)展進(jìn)程間的通信。對(duì)于消息中間件,常見的角色大致也就有Producer(生產(chǎn)者)、Consumer(消費(fèi)者)

常見的消息中間件產(chǎn)品:

(1)ActiveMQ

ActiveMQ 是Apache出品,最流行的,能力強(qiáng)勁的開源消息總線。ActiveMQ 是一個(gè)完全支持JMS1.1和J2EE 1.4規(guī)范的 JMS Provider實(shí)現(xiàn)。這里介紹的是ActiveMQ的使用。

(2)RabbitMQ

AMQP協(xié)議的領(lǐng)導(dǎo)實(shí)現(xiàn),支持多種場景。淘寶的MySQL集群內(nèi)部有使用它進(jìn)行通訊,OpenStack開源云平臺(tái)的通信組件,最先在金融行業(yè)得到運(yùn)用。

(3)ZeroMQ

史上最快的消息隊(duì)列系統(tǒng)

(4)Kafka

Apache下的一個(gè)子項(xiàng)目 。特點(diǎn):高吞吐,在一臺(tái)普通的服務(wù)器上既可以達(dá)到10W/s的吞吐速率;完全的分布式系統(tǒng)。適合處理海量數(shù)據(jù)。

Jms

JMS即Java消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java消息服務(wù)是一個(gè)與具體平臺(tái)無關(guān)的API,絕大多數(shù)MOM提供商都對(duì)JMS提供支持。

AMQP

AMQP是一種協(xié)議,更準(zhǔn)確的說是一種binary wire-level protocol(鏈接協(xié)議)。這是其和JMS的本質(zhì)差別,AMQP不從API層進(jìn)行限定,而是直接定義網(wǎng)絡(luò)交換的數(shù)據(jù)格式。這使得實(shí)現(xiàn)了AMQP的provider天然性就是跨平臺(tái)的。意味著我們可以使用Java的AMQP provider,同時(shí)使用一個(gè)python的producer加一個(gè)rubby的consumer。從這一點(diǎn)看,AQMP可以用http來進(jìn)行類比,不關(guān)心實(shí)現(xiàn)的語言,只要大家都按照相應(yīng)的數(shù)據(jù)格式去發(fā)送報(bào)文請求,不同語言的client均可以和不同語言的server鏈接。

首發(fā)地址:
品茗IT-同步發(fā)布

品茗IT 提供在線支持:

一鍵快速構(gòu)建Spring項(xiàng)目工具

一鍵快速構(gòu)建SpringBoot項(xiàng)目工具

一鍵快速構(gòu)建SpringCloud項(xiàng)目工具

一站式Springboot項(xiàng)目生成

Mysql一鍵生成Mybatis注解Mapper

代碼可以在SpringBoot組件化構(gòu)建https://www.pomit.cn/java/spring/springboot.html中的ActiveMQ組件中查看,并下載。

如果大家正在尋找一個(gè)java的學(xué)習(xí)環(huán)境,或者在開發(fā)中遇到困難,可以加入我們的java學(xué)習(xí)圈,點(diǎn)擊即可加入,共同學(xué)習(xí),節(jié)約學(xué)習(xí)時(shí)間,減少很多在學(xué)習(xí)中遇到的難題。

二、配置

本文假設(shè)你已經(jīng)引入spring-boot-starter-web。已經(jīng)是個(gè)SpringBoot項(xiàng)目了,如果不會(huì)搭建,可以打開這篇文章看一看《SpringBoot入門建站全系列(一)項(xiàng)目建立》

2.1 Maven依賴

使用activemq可以使用spring-boot-starter-activemq,方便快捷,一般springboot對(duì)大多數(shù)開源項(xiàng)目都做了整合,提供了專用的stater。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

2.2 配置文件

在application.properties 中需要配置activemq的信息,也可以配置自定義的配置,如:

spring.activemq.broker-url=tcp://localhost:61616
#spring.activemq.user=admin
#spring.activemq.password=secret

jms.destQueueName=destQueue

這里面,

  • spring.activemq.broker-url是springboot自動(dòng)裝配的配置,activemq的地址。
  • spring.activemq.user是springboot自動(dòng)裝配的配置,activemq的用戶名,一般自己測試都不會(huì)去設(shè)置用戶名密碼的。
  • spring.activemq.password是springboot自動(dòng)裝配的配置,,activemq的密碼,一般自己測試都不會(huì)去設(shè)置用戶名密碼的。
  • jms.destQueueName,這是我自己定義的一個(gè)目的隊(duì)列名。

三、ActiveMQ的使用

3.1 配置數(shù)據(jù)轉(zhuǎn)換

ActiveMQ是分為生產(chǎn)者和消費(fèi)者的,生產(chǎn)者生產(chǎn)的消息,如何能夠被消費(fèi)者正常解析,需要開發(fā)者自己對(duì)數(shù)據(jù)轉(zhuǎn)換做定義,如果你非要說,我用字符串也可以啊,那當(dāng)這個(gè)不存在就行了。

下面這個(gè)配置是指明了ActiveMQ的數(shù)據(jù)轉(zhuǎn)換是用MappingJackson2MessageConverter,將json數(shù)據(jù)轉(zhuǎn)換為對(duì)象,或者將對(duì)象轉(zhuǎn)換為json。

ActiveMQConfig :

package com.cff.springbootwork.activemq;

import javax.jms.ConnectionFactory;

import org.springframework.boot.autoconfigure.jms.DefaultJmsListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;

@Configuration
public class ActiveMQConfig {

    @Bean
    public JmsListenerContainerFactory<?> myFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }
}

3.2 生產(chǎn)者

ActiveMQ當(dāng)然需要生產(chǎn)者來生產(chǎn)信息,然后才發(fā)送到消息隊(duì)列的。

JmsProducer:

package com.cff.springbootwork.activemq.handler;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

import com.cff.springbootwork.activemq.model.DefaultMqModel;

@Component
public class JmsProducer {
    @Autowired
    JmsTemplate jmsTemplate;

    @Value("${jms.destQueueName}")
    String destQueueName;

    public void send(DefaultMqModel defaultMqModel) {
        jmsTemplate.convertAndSend(destQueueName, defaultMqModel);
    }
}

這里的JmsTemplate 是Spring自動(dòng)生成的bean。destQueueName注入的是配置文件中定義的目的隊(duì)列。然后發(fā)送數(shù)據(jù)。

測試發(fā)送數(shù)據(jù),ActiveMQRest :

package com.cff.springbootwork.activemq.web;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.cff.springbootwork.activemq.handler.JmsProducer;
import com.cff.springbootwork.activemq.model.DefaultMqModel;
import com.cff.springbootwork.activemq.model.SeccondMqModel;

@RestController
@RequestMapping("/activemq")
public class ActiveMQRest {
    @Autowired
    JmsProducer jmsProducer;

    @RequestMapping(value = "/test", method = { RequestMethod.GET })
    public DefaultMqModel test() {
        DefaultMqModel defaultMqModel = new DefaultMqModel();
        defaultMqModel.setContent("hahahahahahhahaha哈啊哈哈");
        defaultMqModel.setTitle("測試");
        defaultMqModel.setType(1);
        jmsProducer.send(defaultMqModel);
        return defaultMqModel;
    }

    @RequestMapping(value = "/test2", method = { RequestMethod.GET })
    public SeccondMqModel test2() {
        SeccondMqModel seccondMqModel = new SeccondMqModel();
        seccondMqModel.setContent("asdasdasd哈啊哈哈");
        seccondMqModel.setTitle("測試2");
        seccondMqModel.setRemark("第二個(gè)");
        seccondMqModel.setType(2);
        jmsProducer.send(seccondMqModel);
        return seccondMqModel;
    }
}

3.3 消費(fèi)者

消費(fèi)者監(jiān)聽指定的隊(duì)列,需要用@JmsListener注解標(biāo)明它是一個(gè)消費(fèi)者。

JmsConsumer :

package com.cff.springbootwork.activemq.handler;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

import com.cff.springbootwork.activemq.model.DefaultMqModel;
import com.cff.springbootwork.activemq.service.BusinessSerivce;

@Component
public class JmsConsumer {

    @Autowired
    BusinessSerivce businessSerivce;

    @JmsListener(destination = "${jms.destQueueName}")
    public void processMessage(DefaultMqModel defaultMqModel) {
        businessSerivce.doBusiness(defaultMqModel);
    }
}

這里,@JmsListener注解表明它在監(jiān)聽我們配置文件中配置的隊(duì)列,方法的參數(shù)可以直接傳入對(duì)象,因?yàn)槲覀冊?.1中配置了統(tǒng)一的MessageConverter,它可以自動(dòng)解析成對(duì)象。

BusinessSerivce是我自定義的一個(gè)業(yè)務(wù)邏輯處理service.

BusinessSerivce:

package com.cff.springbootwork.activemq.service;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import com.cff.springbootwork.activemq.model.DefaultMqModel;
import com.cff.springbootwork.activemq.model.SeccondMqModel;

@Service
public class BusinessSerivce {
    private final Logger log = LoggerFactory.getLogger(this.getClass());

    public void doBusiness(DefaultMqModel defaultMqModel) {
        log.info(defaultMqModel.toString());
        
        if(defaultMqModel.getType() == 2 && defaultMqModel instanceof SeccondMqModel){
            SeccondMqModel seccondMqModel = (SeccondMqModel) defaultMqModel;
            log.info(seccondMqModel.remark);
        }
    }
}

四、過程中用到的實(shí)體

DefaultMqModel:


SeccondMqModel:


詳細(xì)完整的實(shí)體,可以訪問品茗IT-博客《SpringBoot入門建站全系列(十七)整合ActiveMq(JMS類消息隊(duì)列)》進(jìn)行查看

快速構(gòu)建項(xiàng)目

Spring組件化構(gòu)建

SpringBoot組件化構(gòu)建

SpringCloud服務(wù)化構(gòu)建

喜歡這篇文章么,喜歡就加入我們的Java學(xué)習(xí)圈(點(diǎn)擊加入或下方掃碼)一起討論SpringBoot技術(shù)吧!

品茗IT交流群

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 簡介 ActiveMQ 特點(diǎn) ActiveMQ 是由 Apache 出品的一款開源消息中間件,旨在為應(yīng)用程序提供高...
    預(yù)流閱讀 6,016評(píng)論 4 21
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,211評(píng)論 2 11
  • 原文鏈接:https://docs.spring.io/spring-boot/docs/1.4.x/refere...
    pseudo_niaonao閱讀 4,900評(píng)論 0 9
  • 聽吳伯凡老師的專欄,談到本能相關(guān)的話題?;蛸x予我們的本能,就像預(yù)裝在計(jì)算機(jī)里面的操作系統(tǒng),這個(gè)系統(tǒng)經(jīng)過億萬年與大...
    小貓藏魚閱讀 240評(píng)論 0 1
  • 蓉蓉出生的時(shí)候,是她爸爸接生的,她媽生得太快,連接生婆都來不及叫,最后,接生婆只是來走了個(gè)過場。也是,蓉蓉已經(jīng)是他...
    思緒云騫閱讀 859評(píng)論 9 13

友情鏈接更多精彩內(nèi)容