Springboot RabbitMQ 發(fā)送端/接受端消息處理、序列化、循環(huán)依賴問題

引入依賴
<pre class="custom" data-tool="mdnice編輯器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px; text-align: left;"><dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </pre>
配置類
<pre class="custom" data-tool="mdnice編輯器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px; text-align: left;">`import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
@Configuration
public class RabbitMQConfig {
@Bean
Jackson2JsonMessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}` </pre>
當(dāng)發(fā)送端和接收端都使用 Jackson2JsonMessageConverter 進(jìn)行消息轉(zhuǎn)換時,它們之間的數(shù)據(jù)交換是基于 JSON 的。JSON 是一種與語言無關(guān)的數(shù)據(jù)格式,它只包含數(shù)據(jù)的結(jié)構(gòu)和內(nèi)容,而不包含任何與特定編程語言相關(guān)的類信息。因此,當(dāng)使用 JSON 作為數(shù)據(jù)傳輸格式時,發(fā)送方和接收方不需要使用相同的類定義或包結(jié)構(gòu)。只要 JSON 數(shù)據(jù)能夠正確映射到接收方的類字段,消息就可以被成功接收和處理,即使發(fā)送方和接收方使用的是不同的類定義。這就是為什么通過使用 Jackson2JsonMessageConverter,即使包名不一樣,也能夠?qū)崿F(xiàn)數(shù)據(jù)的有效接收。
如果不使用像 Jackson2JsonMessageConverter 這樣的基于 JSON 的消息轉(zhuǎn)換器,那么發(fā)送端和接收端通常需要依賴于相同或兼容的 Java 類序列化機(jī)制。這意味著發(fā)送和接收的對象需要是相同的類(包括包名和類名),因為 Java 的標(biāo)準(zhǔn)序列化機(jī)制不僅包含數(shù)據(jù)本身,還包含了類的全限定名。如果類定義(包括包名和類名)在發(fā)送端和接收端不一致,那么在反序列化時會出現(xiàn)類找不到的錯誤,從而導(dǎo)致失敗。
在 Spring 框架和相關(guān)技術(shù)棧中,常見的基于 JSON 的消息轉(zhuǎn)換器包括:
Jackson2JsonMessageConverter:使用 Jackson 庫來序列化和反序列化 JSON 數(shù)據(jù)。它是最常用的轉(zhuǎn)換器之一,特別適用于與 Spring Boot 結(jié)合的場景。
GsonMessageConverter:使用 Google 的 Gson 庫來處理 JSON 數(shù)據(jù)。Gson 是另一個流行的 JSON 處理庫,這個轉(zhuǎn)換器適用于喜歡 Gson API 的開發(fā)者。
MarshallingMessageConverter:結(jié)合了 Spring 的 Marshaller 和 Unmarshaller,可以用于處理 XML 和 JSON 數(shù)據(jù)。使用這個轉(zhuǎn)換器時,需要配置相應(yīng)的 Marshaller,如 Jaxb2Marshaller。
這些轉(zhuǎn)換器使得在不同系統(tǒng)或應(yīng)用之間進(jìn)行基于 JSON 的數(shù)據(jù)交換變得更加靈活和方便。根據(jù)具體需求和項目情況,可以選擇最適合的轉(zhuǎn)換器。
工具類(發(fā)送端)
<pre class="custom" data-tool="mdnice編輯器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px; text-align: left;">`import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMessage(String exchange, String routingKey, Object message) {
rabbitTemplate.convertAndSend(exchange, routingKey, message);
}
}` </pre>
將工具類在需要的地方注入調(diào)用即可。
為什么不在工具類中直接添加messageConverter的bean?這樣做會使 Spring 嘗試注入 RabbitTemplate 到 RabbitMQSender 中,同時也需要注入由 RabbitMQSender 創(chuàng)建的 messageConverter 到 RabbitTemplate 中,從而導(dǎo)致循環(huán)依賴。為解決這個問題,應(yīng)該將 messageConverter() 方法移到一個配置類中,而不是將其放在 RabbitMQSender 類中。這樣可以防止循環(huán)依賴的產(chǎn)生,因為 messageConverter 將在 RabbitTemplate 被創(chuàng)建之前單獨(dú)創(chuàng)建和管理。
循環(huán)依賴問題:在Spring框架中,循環(huán)依賴通常是由于多個Bean(即由@Component注解的類)相互依賴而產(chǎn)生的。上述情況,RabbitMQSender類中定義了一個messageConverter()方法作為Bean,這可能導(dǎo)致了與RabbitTemplate的循環(huán)依賴。
解決這個問題的一個方法是將messageConverter()方法移動到單獨(dú)的配置類中。配置類通常用@Configuration注解標(biāo)記,其目的是專門用于定義和管理Bean。通過這種方式,可以避免RabbitMQSender和RabbitTemplate之間的直接依賴關(guān)系,從而打破循環(huán)依賴。
在單獨(dú)的配置類中,可以定義Jackson2JsonMessageConverter Bean,Spring將自動將其注入到RabbitTemplate中,而不需要RabbitMQSender直接參與。這樣,RabbitMQSender只需要專注于發(fā)送消息的邏輯,而不必?fù)?dān)心如何配置消息轉(zhuǎn)換器。
消息監(jiān)聽(接收端)
<pre class="custom" data-tool="mdnice編輯器" style="margin-top: 10px; margin-bottom: 10px; border-radius: 5px; box-shadow: rgba(0, 0, 0, 0.55) 0px 2px 10px; text-align: left;">`import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RabbitmqReceive {
/**
* rabbitmq隊列監(jiān)聽 消費(fèi)消息
*/
@RabbitListener(queues = "{spring.rabbitmq.receive.priority}")
public void listenMessage(Object(發(fā)送實體類型) obj, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
log.info("receiveQueue:" + JSONObject.toJSONString(obj));
// 處理操作
} catch (Exception e) {
log.error("rabbitmq接收插入隊列異常", e);
try {
// 消息處理出現(xiàn)異常,將消息拒絕并重新發(fā)送到隊列
// channel.basicNack(deliveryTag, false, true);
} catch (Exception ex) {
log.error("將消息拒絕并重新發(fā)送到隊列異常:", ex);
}
}
}
}` </pre>
在 @RabbitListener 注解中,除了指定隊列名(queues = "${spring.rabbitmq.receive.queueName}")外,還有兩個參數(shù) Channel 和 @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag:(如果不需要可以刪除這兩個參數(shù))
Channel:這是一個 AMQP
Channel對象,代表當(dāng)前消息通道。通過這個對象,可以對消息進(jìn)行更低層次的操作,比如手動發(fā)送ACK(確認(rèn))或NACK(不確認(rèn))信號。@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag:這里的
deliveryTag是一個特定于 AMQP 的消息頭部信息,用于唯一標(biāo)識一個通道上的消息傳遞。在消息處理邏輯中,它通常用于手動消息確認(rèn)。使用deliveryTag,可以對接收到的消息進(jìn)行更精確的控制,如確認(rèn)或拒絕消息。