RabbitMQ 整合spring AMQP相關組件
-
RabbitAdmin 是對rabbitMQ的操作工作工具.比如聲明交換機,聲明隊列,刪除之類的操作。
-
核心配置寫法為
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); //connectionFactory.setAddresses("127.0.0.1:5672") connectionFactory.setAddresses("127.0.0.1"); connectionFactory.setPort(5672); connectionFactory.setPassword("admin"); connectionFactory.setUsername("admin"); connectionFactory.setVirtualHost("/"); return connectionFactory; } @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); }然后就可以直接注入一個RabbitAdmin對象了。RabbitAdmin源碼如下:
@ManagedResource(description = "Admin Tasks") public class RabbitAdmin implements AmqpAdmin, ApplicationContextAware, ApplicationEventPublisherAware, BeanNameAware, InitializingBean { ....... }這個類實現(xiàn)了一些接口,我先看一下InitializingBean,這個接口表明這個類會初始化后會調用afterPropertiesSet方法。
@Override public void afterPropertiesSet() { synchronized (this.lifecycleMonitor) { if (this.running || !this.autoStartup) { return; } if (this.retryTemplate == null && !this.retryDisabled) { this.retryTemplate = new RetryTemplate(); this.retryTemplate.setRetryPolicy(new SimpleRetryPolicy(DECLARE_MAX_ATTEMPTS)); ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy(); backOffPolicy.setInitialInterval(DECLARE_INITIAL_RETRY_INTERVAL); backOffPolicy.setMultiplier(DECLARE_RETRY_MULTIPLIER); backOffPolicy.setMaxInterval(DECLARE_MAX_RETRY_INTERVAL); this.retryTemplate.setBackOffPolicy(backOffPolicy); } if (this.connectionFactory instanceof CachingConnectionFactory && ((CachingConnectionFactory) this.connectionFactory).getCacheMode() == CacheMode.CONNECTION) { this.logger.warn("RabbitAdmin auto declaration is not supported with CacheMode.CONNECTION"); return; } // Prevent stack overflow... final AtomicBoolean initializing = new AtomicBoolean(false); this.connectionFactory.addConnectionListener(connection -> { if (!initializing.compareAndSet(false, true)) { // If we are already initializing, we don't need to do it again... return; } try { /* * ...but it is possible for this to happen twice in the same ConnectionFactory (if more than * one concurrent Connection is allowed). It's idempotent, so no big deal (a bit of network * chatter). In fact it might even be a good thing: exclusive queues only make sense if they are * declared for every connection. If anyone has a problem with it: use auto-startup="false". */ if (this.retryTemplate != null) { this.retryTemplate.execute(c -> { initialize(); return null; }); } else { initialize(); } } finally { initializing.compareAndSet(true, false); } }); this.running = true; } }核心內容就是
if (this.retryTemplate != null) { this.retryTemplate.execute(c -> { initialize(); return null; }); } else { initialize(); }我們再看看initialize()方法中的實現(xiàn)
@Override // NOSONAR complexity public void initialize() { if (this.applicationContext == null) { this.logger.debug("no ApplicationContext has been set, cannot auto-declare Exchanges, Queues, and Bindings"); return; } this.logger.debug("Initializing declarations"); Collection<Exchange> contextExchanges = new LinkedList<Exchange>( this.applicationContext.getBeansOfType(Exchange.class).values()); Collection<Queue> contextQueues = new LinkedList<Queue>( this.applicationContext.getBeansOfType(Queue.class).values()); Collection<Binding> contextBindings = new LinkedList<Binding>( this.applicationContext.getBeansOfType(Binding.class).values()); Collection<DeclarableCustomizer> customizers = this.applicationContext.getBeansOfType(DeclarableCustomizer.class).values(); processDeclarables(contextExchanges, contextQueues, contextBindings); final Collection<Exchange> exchanges = filterDeclarables(contextExchanges, customizers); final Collection<Queue> queues = filterDeclarables(contextQueues, customizers); final Collection<Binding> bindings = filterDeclarables(contextBindings, customizers); for (Exchange exchange : exchanges) { if ((!exchange.isDurable() || exchange.isAutoDelete()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable or auto-delete Exchange (" + exchange.getName() + ") durable:" + exchange.isDurable() + ", auto-delete:" + exchange.isAutoDelete() + ". " + "It will be deleted by the broker if it shuts down, and can be redeclared by closing and " + "reopening the connection."); } } for (Queue queue : queues) { if ((!queue.isDurable() || queue.isAutoDelete() || queue.isExclusive()) && this.logger.isInfoEnabled()) { this.logger.info("Auto-declaring a non-durable, auto-delete, or exclusive Queue (" + queue.getName() + ") durable:" + queue.isDurable() + ", auto-delete:" + queue.isAutoDelete() + ", exclusive:" + queue.isExclusive() + ". " + "It will be redeclared if the broker stops and is restarted while the connection factory is " + "alive, but all messages will be lost."); } } if (exchanges.size() == 0 && queues.size() == 0 && bindings.size() == 0) { this.logger.debug("Nothing to declare"); return; } this.rabbitTemplate.execute(channel -> { declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()])); declareQueues(channel, queues.toArray(new Queue[queues.size()])); declareBindings(channel, bindings.toArray(new Binding[bindings.size()])); return null; }); this.logger.debug("Declarations finished"); } private void processDeclarables(Collection<Exchange> contextExchanges, Collection<Queue> contextQueues, Collection<Binding> contextBindings) { Collection<Declarables> declarables = this.applicationContext.getBeansOfType(Declarables.class, false, true) .values(); declarables.forEach(d -> { d.getDeclarables().forEach(declarable -> { if (declarable instanceof Exchange) { contextExchanges.add((Exchange) declarable); } else if (declarable instanceof Queue) { contextQueues.add((Queue) declarable); } else if (declarable instanceof Binding) { contextBindings.add((Binding) declarable); } }); }); }聲明了一些鏈表存放Exchange、Queue、Binding等對象。然后判斷一下各種條件,講這些對象加入進去,轉換RabbitMQ client對象,進行交互。使用InitializingBean是在bean初始化后進行的。
-
-
在RabbitMQ中API中聲明一個Exchange、綁定和隊列。
在原生RabbitMQ中使用的channel進行操作的。在使用AMQP之后可以注入一個Exchange的bean的方式來聲明。
@Bean public TopicExchange exchange001() { return new TopicExchange("topic001", true, false); } @Bean public Queue queue001() { return new Queue("queue001", true); //隊列持久 } -
RabbitTemplate:整合spring AMQP的發(fā)送消息的模板類。消息可靠性投遞模板,ConfirmCalllBack和ReturnCallBack等接口。
@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); return rabbitTemplate; } -
SimpleMessageListenerContainer簡單消息監(jiān)聽容器,監(jiān)聽多個隊列、自動啟動自動聲明。設置事務、事務管理器、事務屬性、事務容量(并發(fā))、是否開啟事務、回滾消息等。設置消息的簽收模式、捕獲消息異常的handle。設置消費者標簽生成策略、是否獨占模式、消費者屬性。設置具體的監(jiān)聽器轉換器等操作。可以動態(tài)的設置。
@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueues(queue001()); container.setConcurrentConsumers(1); container.setMaxConcurrentConsumers(5); container.setDefaultRequeueRejected(false); container.setAcknowledgeMode(AcknowledgeMode.AUTO); container.setExposeListenerChannel(true); container.setConsumerTagStrategy(queue -> queue + "_" + UUID.randomUUID().toString()); return container; } -
MessageListenerAdapter 消息監(jiān)聽適配器,主要是做消息監(jiān)聽器的適配工作。比如指定轉換器之類的操作。
Message listener adapter that delegates the handling of messages to target listener methods via reflection, with flexible message type conversion. Allows listener methods to operate on message content types, completely independent from the Rabbit API.
消息監(jiān)聽適配器(adapter),通過反射將消息處理委托給目標監(jiān)聽器的處理方法,并進行靈活的消息類型轉換。允許監(jiān)聽器方法對消息內容類型進行操作,完全獨立于Rabbit API。- defaultListenerMethod默認監(jiān)聽方法,設置監(jiān)聽方法名。
- deletage對象:真實委托對象,用于處理消息的對象。
- queueOrTagToMethodName隊列標識與方法名組成的集合??梢詫㈥犃信c方法名進行匹配。隊列名稱與方法名稱綁定,即指定的隊列的消息會被綁定的方法接受處理。
@Bean public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate()); adapter.setDefaultListenerMethod("consumeMessage"); //全局的轉換器: ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter(); TextMessageConverter textConvert = new TextMessageConverter(); convert.addDelegate("text", textConvert); convert.addDelegate("html/text", textConvert); convert.addDelegate("xml/text", textConvert); convert.addDelegate("text/plain", textConvert); adapter.setMessageConverter(convert); container.setMessageListener(adapter); return container; } -
MessageConverter消息轉換器接口,自定義消息轉換器實現(xiàn)這個接口重寫toMessage和fromMessage方法。
public class TextMessageConverter implements MessageConverter { @Override public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { return new Message(object.toString().getBytes(), messageProperties); } @Override public Object fromMessage(Message message) throws MessageConversionException { String contentType = message.getMessageProperties().getContentType(); if (null != contentType && contentType.contains("text")) { return new String(message.getBody()); } return message.getBody(); } } ```