SpringBoot2.0源碼分析(三):整合RabbitMQ分析

SpringBoot具體整合rabbitMQ可參考:SpringBoot2.0應(yīng)用(三):SpringBoot2.0整合RabbitMQ

RabbitMQ自動注入

當(dāng)項目中存在org.springframework.amqp.rabbit.core.RabbitTemplatecom.rabbitmq.client.Channel著兩個類時,SpringBoot將RabbitMQ需要使用到的對象注冊為Bean,供項目注入使用。一起看一下RabbitAutoConfiguration類。

@Configuration
@ConditionalOnClass({ RabbitTemplate.class, Channel.class })
@EnableConfigurationProperties(RabbitProperties.class)
@Import(RabbitAnnotationDrivenConfiguration.class)
public class RabbitAutoConfiguration {

    @Configuration
    @ConditionalOnMissingBean(ConnectionFactory.class)
    protected static class RabbitConnectionFactoryCreator {
        @Bean
        public CachingConnectionFactory rabbitConnectionFactory(
                RabbitProperties properties,
                ObjectProvider<ConnectionNameStrategy> connectionNameStrategy)
                throws Exception {
            PropertyMapper map = PropertyMapper.get();
            CachingConnectionFactory factory = new CachingConnectionFactory(
                    getRabbitConnectionFactoryBean(properties).getObject());
            ......
            return factory;
        }
    }

    @Configuration
    @Import(RabbitConnectionFactoryCreator.class)
    protected static class RabbitTemplateConfiguration {

        private final ObjectProvider<MessageConverter> messageConverter;

        private final RabbitProperties properties;

        ......

        @Bean
        @ConditionalOnSingleCandidate(ConnectionFactory.class)
        @ConditionalOnMissingBean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            PropertyMapper map = PropertyMapper.get();
            RabbitTemplate template = new RabbitTemplate(connectionFactory);
            ......
            return template;
        }
        ......

RabbitAutoConfiguration使和RabbitMQ相關(guān)的配置生效,并根據(jù)相關(guān)屬性創(chuàng)建和RabbitMQ的連接,并依賴連接工廠創(chuàng)建rabbitTemplate。
RabbitAutoConfiguration同時導(dǎo)入了RabbitAnnotationDrivenConfiguration,注入了rabbitListenerContainerFactory。

RabbitMQ的Queue,Exchange和Routing Key關(guān)系創(chuàng)建

先看一個例子:

    @Bean
    Binding bindingExchangeMessage(Queue queue, TopicExchange exchange) {
        // 將隊列1綁定到名為topicKey.A的routingKey
        return BindingBuilder.bind(queue).to(exchange).with("routingKey");
    }

將queue通過"routingKey"綁定到exchange。
RabbitAdmin類的initialize方法會拿出注入的Exchange,Queue和Binding進行聲明。

    public void initialize() {
        ......
        Collection<Exchange> contextExchanges = new LinkedList<Exchange>(
                this.applicationContext.getBeansOfType(Exchange.class).values());
        Collection<Queue> contextQueues = new LinkedList<Queue>(
                this.applicationContext.getBeansOfType(Queue.class).values());
        Collection<Binding> contextBindings = new LinkedList<Binding>(
                this.applicationContext.getBeansOfType(Binding.class).values());
        ......
        final Collection<Exchange> exchanges = filterDeclarables(contextExchanges);
        final Collection<Queue> queues = filterDeclarables(contextQueues);
        final Collection<Binding> bindings = filterDeclarables(contextBindings);
        ......
        this.rabbitTemplate.execute(channel -> {
            declareExchanges(channel, exchanges.toArray(new Exchange[exchanges.size()]));
            declareQueues(channel, queues.toArray(new Queue[queues.size()]));
            declareBindings(channel, bindings.toArray(new Binding[bindings.size()]));
            return null;
        });
    }

消息發(fā)送

以下面的發(fā)送為例:

    rabbitTemplate.convertAndSend(MQConst.TOPIC_EXCHANGE, MQConst.TOPIC_KEY1, "from key1");

這個方法會先對消息進行轉(zhuǎn)換,預(yù)處理,最終通過調(diào)用ChannelN的basicPublish方法提交一個AMQCommand,由AMQCommand完成最終的消息發(fā)送。

    public void transmit(AMQChannel channel) throws IOException {
        int channelNumber = channel.getChannelNumber();
        AMQConnection connection = channel.getConnection();

        synchronized (assembler) {
            Method m = this.assembler.getMethod();
            connection.writeFrame(m.toFrame(channelNumber));
            if (m.hasContent()) {
                byte[] body = this.assembler.getContentBody();
                connection.writeFrame(this.assembler.getContentHeader()
                        .toFrame(channelNumber, body.length));
                int frameMax = connection.getFrameMax();
                int bodyPayloadMax = (frameMax == 0) ? body.length : frameMax
                        - EMPTY_FRAME_SIZE;

                for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
                    int remaining = body.length - offset;
                    int fragmentLength = (remaining < bodyPayloadMax) ? remaining
                            : bodyPayloadMax;
                    Frame frame = Frame.fromBodyFragment(channelNumber, body,
                            offset, fragmentLength);
                    connection.writeFrame(frame);
                }
            }
        }
        connection.flush();
    }

消息接收

先看一個消費的事例:

@Component
public class TopicConsumer {

    @RabbitListener(queues = MQConst.TOPIC_QUEUENAME1)
    @RabbitHandler
    public void process1(String message) {
        System.out.println("queue:topic.message1,message:" + message);
    }
}

SpringBoot解析@RabbitListener@RabbitHandler兩個注解的是RabbitListenerAnnotationBeanPostProcessor中的buildMetadata方法:

    private TypeMetadata buildMetadata(Class<?> targetClass) {
        Collection<RabbitListener> classLevelListeners = findListenerAnnotations(targetClass);
        final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
        final List<ListenerMethod> methods = new ArrayList<>();
        final List<Method> multiMethods = new ArrayList<>();
        ReflectionUtils.doWithMethods(targetClass, method -> {
            Collection<RabbitListener> listenerAnnotations = findListenerAnnotations(method);
            if (listenerAnnotations.size() > 0) {
                methods.add(new ListenerMethod(method,
                        listenerAnnotations.toArray(new RabbitListener[listenerAnnotations.size()])));
            }
            if (hasClassLevelListeners) {
                RabbitHandler rabbitHandler = AnnotationUtils.findAnnotation(method, RabbitHandler.class);
                if (rabbitHandler != null) {
                    multiMethods.add(method);
                }
            }
        }, ReflectionUtils.USER_DECLARED_METHODS);
        if (methods.isEmpty() && multiMethods.isEmpty()) {
            return TypeMetadata.EMPTY;
        }
        return new TypeMetadata(
                methods.toArray(new ListenerMethod[methods.size()]),
                multiMethods.toArray(new Method[multiMethods.size()]),
                classLevelListeners.toArray(new RabbitListener[classLevelListeners.size()]));
    }

@RabbitListener可以放在方法上,也可以放在類上。對于Class級別的Listener,會配合RabbitHandle進行綁定。對于method級別的Listener則不會。
解析完成后,由processListener方法對處理Listener。

    protected void processListener(MethodRabbitListenerEndpoint endpoint, RabbitListener rabbitListener, Object bean,
            Object adminTarget, String beanName) {
        endpoint.setBean(bean);
        endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
        endpoint.setId(getEndpointId(rabbitListener));
        endpoint.setQueueNames(resolveQueues(rabbitListener));
        endpoint.setConcurrency(resolveExpressionAsStringOrInteger(rabbitListener.concurrency(), "concurrency"));
        endpoint.setBeanFactory(this.beanFactory);
        endpoint.setReturnExceptions(resolveExpressionAsBoolean(rabbitListener.returnExceptions()));
        String errorHandlerBeanName = resolveExpressionAsString(rabbitListener.errorHandler(), "errorHandler");
        if (StringUtils.hasText(errorHandlerBeanName)) {
            endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, RabbitListenerErrorHandler.class));
        }
        String group = rabbitListener.group();
        if (StringUtils.hasText(group)) {
            Object resolvedGroup = resolveExpression(group);
            if (resolvedGroup instanceof String) {
                endpoint.setGroup((String) resolvedGroup);
            }
        }
        String autoStartup = rabbitListener.autoStartup();
        if (StringUtils.hasText(autoStartup)) {
            endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup));
        }

        endpoint.setExclusive(rabbitListener.exclusive());
        String priority = resolve(rabbitListener.priority());
        if (StringUtils.hasText(priority)) {
            try {
                endpoint.setPriority(Integer.valueOf(priority));
            }
            catch (NumberFormatException ex) {
                throw new BeanInitializationException("Invalid priority value for " +
                        rabbitListener + " (must be an integer)", ex);
            }
        }

        String rabbitAdmin = resolve(rabbitListener.admin());
        if (StringUtils.hasText(rabbitAdmin)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to resolve RabbitAdmin by bean name");
            try {
                endpoint.setAdmin(this.beanFactory.getBean(rabbitAdmin, RabbitAdmin.class));
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
                        adminTarget + "], no " + RabbitAdmin.class.getSimpleName() + " with id '" +
                        rabbitAdmin + "' was found in the application context", ex);
            }
        }


        RabbitListenerContainerFactory<?> factory = null;
        String containerFactoryBeanName = resolve(rabbitListener.containerFactory());
        if (StringUtils.hasText(containerFactoryBeanName)) {
            Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
            try {
                factory = this.beanFactory.getBean(containerFactoryBeanName, RabbitListenerContainerFactory.class);
            }
            catch (NoSuchBeanDefinitionException ex) {
                throw new BeanInitializationException("Could not register rabbit listener endpoint on [" +
                        adminTarget + "] for bean " + beanName + ", no " + RabbitListenerContainerFactory.class.getSimpleName() + " with id '" +
                        containerFactoryBeanName + "' was found in the application context", ex);
            }
        }

        this.registrar.registerEndpoint(endpoint, factory);
    }

先設(shè)置endpoint的相關(guān)屬性,再獲取rabbitListenerContainerFactory,根據(jù)endpoint和rabbitListenerContainerFactory創(chuàng)建一個messageListenerContainer,并創(chuàng)建endpoint的id到messageListenerContainer的映射。
當(dāng)收到消息時,SpringBoot會調(diào)用綁定好的方法。


本篇到此結(jié)束,如果讀完覺得有收獲的話,歡迎點贊、關(guān)注、加公眾號【貳級天災(zāi)】,查閱更多精彩歷史?。?!


image
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容