基于默認(rèn)的direct(即路由)模式
yml的rabbitmq配置

image.png
引入maven依賴

image.png
1.自定義消息類
package com.zkmeiling.biz.rabbitmq;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.Date;
import java.util.List;
/**
* 箱任務(wù)博
*
* @author WeiWei
* @date 2022/10/17
*/
@NoArgsConstructor
@Data
public class BoxTaskBo {
private String request;
private Date time;
private Date data;
}
2.開啟手動ack模式并準(zhǔn)備相關(guān)的處理回調(diào)函數(shù)
2.1---ConfirmCallback 回調(diào)函數(shù)類( 發(fā)布端----send消息--> broker[由交換機exchang和隊列queue組成 {只要進入exchange就觸發(fā)ConfirmCllback }] )
package com.zkmeiling.biz.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
/**
* Confirmation callback.
*
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (!ack) {
log.error("bbk樣本庫消息發(fā)送異常!");
} else {
log.info("bbk樣本庫發(fā)送者爸爸已經(jīng)收到確認(rèn),correlationData={} ,ack={}, cause={}", correlationData, ack, cause);
}
}
}
2.2---ReturnsCallback 回調(diào)函數(shù)類( 發(fā)布端----send消息--> broker[由交換機exchang和隊列queue組成 {進入exchange 但未投遞到queqe就觸發(fā)ReturnsCallback }] )
package com.zkmeiling.biz.rabbitmq;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback{
/**
* Returned message callback.
*
* @param returned the returned message and metadata.
*/
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("bbk樣本庫returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", returned.getReplyCode(), returned.getReplyText(), returned.getExchange(), returned.getRoutingKey());
}
}
3.發(fā)布端配置
下圖是發(fā)布端服務(wù)(簡單)相關(guān)目錄結(jié)構(gòu)

image.png
3.1發(fā)布端(生產(chǎn)端配置代碼)
package com.zkmeiling.biz.rabbitmq;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//聲明交換機和需要綁定的隊列(這部分也可單獨拿出做一個rabbitmq配置供應(yīng)端)
@Configuration
public class RabbitmqQueueConfig {
/**
* 聲明樣本庫管理系統(tǒng)->工控系統(tǒng)隊列 支持持久化
*/
@Bean(name = "bbkToIpcQueue")
public Queue BbkToIpcQueue() {
return QueueBuilder.durable("zkmeiling.bbk.to.ipc").build();
}
//聲明交換機
@Bean(name = "directExchange")
public DirectExchange directExchange() {
return ExchangeBuilder.directExchange("zkmeiling.direct").durable(true).build();
}
//聲明交換機和隊列的綁定關(guān)系
@Bean
public Binding bbkToIpcBinding(
@Qualifier("directExchange") DirectExchange directExchange,
@Qualifier("bbkToIpcQueue") Queue bbkToIpcQueue) {
return BindingBuilder.bind(bbkToIpcQueue).to(directExchange).with("bbk2ipc_routing_key");
}
}
3.2---發(fā)布端具體發(fā)送消息代碼
package com.zkmeiling.biz.rabbitmq;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.ClassMapper;
import org.springframework.amqp.support.converter.DefaultJackson2JavaTypeMapper;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.UUID;
@RestController
public class PublishController {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Autowired
private ReturnCallbackService returnCallbackService;
@PostMapping("frozenBoxPutInStore")
public void frozenBoxPutInStore(@RequestBody BoxTaskBo boxs){
sendMessage("zkmeiling.direct","bbk2ipc_routing_key",boxs);
}
public void sendMessage(String exchange, String routingKey, BoxTaskBo msg) {
// /**
// * 確保消息發(fā)送失敗后可以重新返回到隊列中
// * 注意:yml需要配置 publisher-returns: true
// */
// rabbitTemplate.setMandatory(true);
/**
* 消費者確認(rèn)收到消息后,手動ack回執(zhí)回調(diào)處理
*/
rabbitTemplate.setConfirmCallback(confirmCallbackService);
/**
* 消息投遞到隊列失敗回調(diào)處理
*/
rabbitTemplate.setReturnsCallback(returnCallbackService);
/**
* 開啟Jackson2JsonMessageConverter的消息轉(zhuǎn)換器(進行msg編碼操作)
*/
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
/**
* 發(fā)送消息
*/
rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> {
//這里是對msg的aop做一些前置處理操作
//下面是為了解決
<----原理說明--->
實體類轉(zhuǎn)換異常
failed to resolve class name.
解決方案
1.依賴系統(tǒng)1的jar包,直接使用類A來接收
2.不依賴系統(tǒng)1的jar包,自己建一個和A一模一樣的類,連名稱,包路徑都一樣
3.負(fù)責(zé)監(jiān)聽 queue 的類實現(xiàn) MessageListener 接口,直接接收 Message 類,再自己轉(zhuǎn)
顯然都不夠好也不是自己想要的
在 JsonMessageConverter 的 fromMessage 方法中有這么一段:
f (getClassMapper() == null) {
JavaType targetJavaType = getJavaTypeMapper()
.toJavaType(message.getMessageProperties());
content = convertBytesToObject(message.getBody(), encoding, targetJavaType);
} else {
Class<?> targetClass = getClassMapper().toClass(
message.getMessageProperties());
content = convertBytesToObject(message.getBody(), encoding, targetClass);
}
就是說默認(rèn)情況下,JsonMessageConverter 使用的 ClassMapper 是 DefaultJackson2JavaTypeMapper,在轉(zhuǎn)換時通過 Message 的 Properties 來獲取要轉(zhuǎn)換的目標(biāo)類的類型。通過 Debug 可以發(fā)現(xiàn),目標(biāo)類的類型是存儲在 Message 的 Proterties 的 一個 headers 的 Map 中,Key 叫“__TypeId__”。所以只要想辦法在傳輸消息時更改__TypeId__的值即可。
<----原理說明--->
message.getMessageProperties().getHeaders().put("__TypeId__","com.example.rabbitmqdemo.BoxTaskBo");
return message;
} ,new CorrelationData(UUID.randomUUID().toString()));
System.out.println("bbk樣本庫exchange = " + exchange);
}
}
4.消費端配置
下圖是消費端服務(wù)(簡單)相關(guān)目錄結(jié)構(gòu)

image.png
4.1---消費端配置代碼
package com.example.rabbitmqdemo;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//設(shè)置開啟Jackson2JsonMessageConverter
factory.setMessageConverter(new Jackson2JsonMessageConverter());
//設(shè)置開啟手動確認(rèn)ack
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
return factory;
}
}
4.1---消費端具體接收消息消費業(yè)務(wù)代碼
package com.example.rabbitmqdemo;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
@RabbitListener(queues = "zkmeiling.bbk.to.ipc")
public class ReceiverMessage1 {
@RabbitHandler
public void processHandler(@Payload BoxTaskBo msg, Channel channel, Message message) throws IOException {
try {
log.info("ipc工業(yè)端小富收到消息:{}", msg);
//TODO 具體業(yè)務(wù)
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
if (message.getMessageProperties().getRedelivered()) {
log.error("ipc工業(yè)端消息已重復(fù)處理失敗,拒絕再次接收...");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
} else {
log.error("ipc工業(yè)端消息即將再次返回隊列處理...");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
}
驗證下
1.postmain 模擬發(fā)送請求

image.png
2.發(fā)布端:發(fā)送成功日志

image.png
3.消費端: 接收成功日志

image.png