[spring-amqp-support] SpringBoot-AMQP 擴展組件,便于快速發(fā)送、接收 MQ 事件消息

spring-amqp-support

一個 Spring-AMQP 擴展組件,便于快速發(fā)送、接收 MQ 事件消息,并將消息自動序列化為事件對象。

信息

基于 spring-boot-starter-amqp:2.6.3 構(gòu)建

by:林同學(xué)(765371578@qq.com

Getting Started

引入依賴:

<dependency>
    <groupId>com.github.LinYuanBaoBao</groupId>
    <artifactId>spring-amqp-support</artifactId>
    <version>1.0.0-RELEASE</version>
</dependency>

定義 AMQP 配置類:

@Configuration
public class AmqpConfiguration {
    @Bean
    public MessageConverter messageConverter(EventMessageClassMapper classMapper) {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        converter.setClassMapper(classMapper);      // specify class mapper
        return converter;
    }

    @Bean
    public EventMessageClassMapper eventMessageClassMapper(EventMessageTypeMapping eventMessageTypeMapping) {
        return new EventMessageClassMapper(eventMessageTypeMapping);
    }

    @Bean
    public EventMessageTypeMapping eventMessageTypeMapping() {
        return new EventMessageTypeMapping.Builder()
                .pkg("my.package")
                .build();
    }
}

使用 @EventMessage 注解定義事件:

// my.package.UserRegisterEvent
@EventMessage(exchange = "user", code = "user-register")
public class UserRegisterEvent {
    private Integer userId;
    private Map<String, Object> userInfo;
    // ...
}

發(fā)送事件消息:

@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMessage() {
    UserRegisterEvent event = new UserRegisterEvent();
    event.setUserId(1001);
    Map<String, Object> userInfo = new HashMap<>();
    userInfo.put("username", "linyuan");
    event.setUserInfo(userInfo);

    rabbitTemplate.convertAndSend(EventMessageUtils.getEventExchange(event.getClass()), "default", event, message -> {
        MessageProperties properties = message.getMessageProperties();
        properties.setMessageId(UUID.randomUUID().toString());
        properties.setTimestamp(new Date());
        return message;
    });
}

消息格式如下:

Properties  
    timestamp:  1644401093
    message_id: dc7d0646-a6e6-4183-b755-79ffeefc1159
    headers: _EVENT_CODE_: user-register
    content_encoding:   UTF-8
    content_type:   application/json
Payload
    {"userId":1001,"userInfo":{"username":"linyuan"}}

接收事件消息:

@RabbitHandler
public void listen(UserRegisterEvent message, @Headers Map headers) {
    // do something here...
}

或采用下面方式,但該方式消息不會經(jīng)過 InvocableHandlerMethodDecorator 處理鏈

@Autowired
private RabbitTemplate rabbitTemplate;

public void recMessage(){
    UserRegisterEvent msg = (UserRegisterEvent) rabbitTemplate.receiveAndConvert("user.event");
}

InvocableHandlerMethodDecorator

可以注入多個 InvocableHandlerMethodDecorator Bean,形成消息處理鏈,對消息做其它操作:

@Bean
public InvocableHandlerMethodDecorator invocableHandlerMethodDecorator() {
    return invocableHandlerMethod -> new InvocableHandlerMethodDecoration(invocableHandlerMethod) {
        @Override
        public Object invoke(Message<?> message, Object... providedArgs) throws Exception {
            // do something here...
            return super.invoke(message, providedArgs);
        }
    };
}

IdempotentValidator

注入 IdempotentValidator Bean 對消息做冪等校驗:

@Bean
public IdempotentValidator idempotentValidator() {
    return new IdempotentValidator() {
        @Override
        public void valid(Message<?> message) {
            // do valid here
        }
    };
}

Authenticator

注入 Authenticator Bean 在消費信息之前進行登錄,在消費信息之后進行注銷:

@Bean
public Authenticator authenticator() {
    return new Authenticator() {
        @Override
        public void login(Message<?> message) {
            // do login here
        }

        @Override
        public void logout(Message<?> message) {
            // do logout here
        }
    };
}

記錄消息信息 - MessageRecorder

注入 MessageRecorder Bean 來記錄消息信息:

@Bean
public MessageRecorder messageRecorder() {
    return new MessageRecorder() {
        @Override
        public void record(Message<?> message, boolean success, Object result, Exception error) {
            // do record here
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容