SpringBoot 使用(四): 消息隊列使用

一. 概述

在整套的微服務(wù)架構(gòu)中, 消息隊列是不可或缺的部分, 它能夠起到線程內(nèi)同步或者異步調(diào)用無法達(dá)到的作用,優(yōu)缺點分別是:
優(yōu)點:

  1. 解耦
    i. 只依賴消息的格式, 而不依賴發(fā)送者的ip和端口
    ii. 多消費者的情況下, 發(fā)送者不需要關(guān)注消費者的任何信息
  2. 路由
    不能互相訪問的網(wǎng)絡(luò)之間可以消息隊列實現(xiàn)訪問, 可以減少對現(xiàn)有網(wǎng)絡(luò)的修改。
  3. 消息可靠性
    當(dāng)消費者發(fā)生故障時, 消息可以被有效保存下來, 等待恢復(fù)后繼續(xù)訪問.
  4. 異步調(diào)用
    發(fā)送者異步發(fā)送消息, 不等待消息ack,不會對發(fā)送者本身產(chǎn)生響應(yīng)速度的影響, 當(dāng)然異步調(diào)用也是可以實現(xiàn)的。
  5. 方便擴(kuò)展
    集群部署消息隊列, 當(dāng)流量增大和減小是可以通過調(diào)整部署來實現(xiàn)和發(fā)送方, 消費方無關(guān)。

缺點:
多出一個環(huán)節(jié),需要保證消息隊列的可用性。

二. 常用消息隊列

目前常用的消息隊列大概有三種類型,RabbitMQ等AMQP系列, Kafka, Redis等kev value系列,它們的使用場景分別是:
1.RabbitMQ: 相對重量級高并發(fā)的情況,比如數(shù)據(jù)的異步處理 任務(wù)的串行執(zhí)行等.
2.Kafka: 基于Pull的模式來處理,具體很高的吞吐量,一般用來進(jìn)行 日志的存儲和收集.
3.Redis: 輕量級高并發(fā),實時性要求高的情況,比如緩存,秒殺,及時的數(shù)據(jù)分析(ELK日志分析框架,使用的就是Redis).

三. SpingBoot 集成消息隊列

在SpingBoot中對這個三種都有支持

  1. Redis
    請參照 其他作者的 文章 http://www.itdecent.cn/p/a2ab17707eff

  2. RabbitMQ
    i. 配置

     <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
    

    ii.實現(xiàn)方式 RabbitMQDemoConfigration.java

import org.springframework.amqp.core.;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/
*

  • RabbitMQDemo

  • @author: sunjie

  • @date: 16/01/10
    */
    @Configuration
    @SuppressWarnings("SpringJavaAutowiringInspection")
    public class RabbitMQDemoConfigration {
    @Bean
    public CachingConnectionFactory myConnectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setUsername("guest");
    connectionFactory.setPassword("guest");
    connectionFactory.setHost("192.168.1.1");
    connectionFactory.setPort(5672);
    connectionFactory.setVirtualHost("/myHost");
    return connectionFactory;
    }

    // 生成CachingConnectionFactory 也可以使用下面的方式,在application.properties
    // 中定義好屬性即可

    // @Autowired
    // ConnectionFactory connectionFactory;
    @Bean
    public DirectExchange myExchange() {
    return new DirectExchange("myExchangeDemo", true, false);
    }

    @Bean
    public Queue myQueue() {
    return new Queue("myQueueDemo", true);
    }

    @Bean
    public Binding myExchangeBinding(@Qualifier("myExchange") DirectExchange directExchange,
    @Qualifier("myQueue") Queue queue) {
    return BindingBuilder.bind(queue).to(directExchange).with("routeDemo");
    }

    @Bean
    public RabbitTemplate myExchangeTemlate() {
    RabbitTemplate r = new RabbitTemplate(myConnectionFactory());
    r.setExchange("myExchangeDemo");
    r.setRoutingKey("routeDemo");
    return r;
    }

    /**

    • 發(fā)送消息,工業(yè)使用需要自己做個性化實現(xiàn)
      */
      @Bean
      public void sendMessage(RabbitTemplate myExchangeTemlate) {
      String string = "Hello RabbitmQ";
      myExchangeTemlate.convertAndSend(string);
      }

    /**

    • 接受消息,工業(yè)使用時需要在監(jiān)聽類中實現(xiàn)process邏輯
      */
      @RabbitListener(queues = "myQueueDemo")
      public void process(Message message) {
      System.out.println("__________" + message.getBody().toString() + "__________");
      try {
      this.wait(1000);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }
    
    
  1. Kafka 使用
    i. pom.xml 配置
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka_2.10</artifactId>
     </dependency>
     <dependency>
       <groupId>org.springframework.integration</groupId>
       <artifactId>spring-integration-kafka</artifactId>
     </dependency>
    

ii. spring-integration-kafka.xml 也可以研究通過bean方式來實現(xiàn)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
    http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">

    <int:channel id="inputToKafka">
        <int:queue/>
    </int:channel>

    <int-kafka:outbound-channel-adapter
            id="kafkaOutboundChannelAdapter"
            kafka-producer-context-ref="kafkaProducerContext"
            channel="inputToKafka">
        <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
    </int-kafka:outbound-channel-adapter>

    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="50000"/>

    <int-kafka:producer-context id="kafkaProducerContext">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration broker-list="192.168.2.2:9092"
                                              key-class-type="java.lang.String"
                                              value-class-type="java.lang.String"
                                              topic="1_service"
                                              value-encoder="kafkaEncoder"
                                              key-encoder="kafkaEncoder"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>

    <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder"/>
</beans>

iii. 客戶端實現(xiàn)代碼:

  @Autowired
    MessageChannel             inputToKafka;

    String value = "Hello Kafka";
    Message<String> mess = MessageBuilder.withPayload(value)
            .setHeader(KafkaHeaders.TOPIC, appSetting.getStrValue("topic")).build();
    inputToKafka.send(mess);

四. 后記

消息中間件有很多,實際使用中往往是根據(jù)架構(gòu)師的技術(shù)棧相關(guān),做到了解使用場景和基本原理,在項目中提升自己細(xì)節(jié)能力,做到個人和公司共同成長,才是好的方式.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容