springboot繼承rabbitMQ做一個(gè)簡(jiǎn)單的商品秒殺業(yè)務(wù)

私聊我做畢設(shè)或者實(shí)驗(yàn)課題。

1.設(shè)計(jì)數(shù)據(jù)庫(kù)

設(shè)計(jì)product表,用來(lái)記錄商品的總數(shù)量
image.png
設(shè)計(jì)record表,用來(lái)記錄消費(fèi)者的id
image.png

2.業(yè)務(wù)的實(shí)現(xiàn)

1.導(dǎo)入相關(guān)依賴(lài)

  <!--整合rabbitmq-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

2.相關(guān)文件的配置

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://112.124.17.134:3306/rabbitMQ?serverTimezone=GMT%2B8
    username: root
    password: 123456
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
  rabbitmq:
    host: localhost
    username: guest
    password: guest
    publisher-confirms: true  # 開(kāi)啟Rabbitmq發(fā)送消息確認(rèn)機(jī)制,發(fā)送消息到隊(duì)列并觸發(fā)回調(diào)方法
    publisher-returns: true
    listener:
      simple:
        concurrency: 10 #消費(fèi)者數(shù)量
        max-concurrency: 10 #最大消費(fèi)者數(shù)量
        prefetch: 1 #限流(消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量)
        auto-startup: true  #啟動(dòng)時(shí)自動(dòng)啟動(dòng)容器
        acknowledge-mode: manual #開(kāi)啟ACK手動(dòng)確認(rèn)模式

mybatis-plus:
  mapper-locations: classpath:xz/mapper/xml/*.xml

3.代碼的實(shí)現(xiàn)

1.RabbitConfig類(lèi)的實(shí)現(xiàn)

主要用于生成隊(duì)列和交換機(jī)并進(jìn)行綁定,這里將消息的轉(zhuǎn)化為json輸出,我注釋掉了,因?yàn)楹竺嫖野l(fā)送消息并不是以string的形式發(fā)送,而是自定義一個(gè)消息實(shí)體類(lèi)messageHandler類(lèi)用來(lái)發(fā)送消息

@Component
public class RabbitConfig {
    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfig.class);
    public static final String DIRECT_QUEUE ="DIRECT_QUEUE" ;
    public static final String DIRECT_EXCHANGE = "DIRECT_EXCHANGE";
    public static final String DIRECT_KEY = "DIRECT_ROUTING_KEY";
    @Resource
    private RabbitTemplate rabbitTemplate;

    @Bean
    public AmqpTemplate amqpTemplate() {
        /**
         * 定義消息轉(zhuǎn)換實(shí)例 ,轉(zhuǎn)化成 JSON傳輸
         *
         * @return Jackson2JsonMessageConverter
         */
        //rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        //rabbitTemplate.setEncoding("UTF-8");
        // 消息發(fā)送失敗返回到隊(duì)列中,yml需要配置 publisher-returns: true
        rabbitTemplate.setMandatory(true);
        /**
         * 消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)。
         * 使用該功能需要開(kāi)啟確認(rèn),spring-boot中配置如下:
         * spring.rabbitmq.publisher-confirms = true
         */
        rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean b, String s)-> {
                if (b) {
                    LOGGER.info("消息已確認(rèn) cause:{}",correlationData.getId());
                } else {
                    LOGGER.info("消息未確認(rèn) cause:{}", s);
                }
        });
        /**
         * 通過(guò)實(shí)現(xiàn)ReturnCallback接口,
         * 如果消息從交換器發(fā)送到對(duì)應(yīng)隊(duì)列失敗時(shí)觸發(fā)
         * 比如根據(jù)發(fā)送消息時(shí)指定的routingKey找不到隊(duì)列時(shí)會(huì)觸發(fā)
         * 使用該功能需要開(kāi)啟確認(rèn),spring-boot中配置如下:
         * spring.rabbitmq.publisher-returns = true
         */
        rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)-> {
                LOGGER.error("消息被退回:{}", message);
                LOGGER.error("消息使用的交換機(jī):{}", exchange);
                LOGGER.error("消息使用的路由鍵:{}", routingKey);
                LOGGER.error("描述:{}", replyText);
        });
        return rabbitTemplate;
    }



    /**
     * 聲明Direct交換機(jī) 支持持久化.
     *
     * @return the exchange
     */
    @Bean("directExchange")
    public Exchange directExchange() {
        return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build();
    }

    /**
     * 聲明一個(gè)隊(duì)列 支持持久化.
     *
     * @return the queue
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable(DIRECT_QUEUE).build();
    }

    /**
     * 通過(guò)綁定鍵 將指定隊(duì)列綁定到一個(gè)指定的交換機(jī) .
     *
     * @param queue    the queue
     * @param exchange the exchange
     * @return the binding
     */
    @Bean
    public Binding directBinding(@Qualifier("directQueue") Queue queue,
                                 @Qualifier("directExchange") Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(DIRECT_KEY).noargs();
    }
}
2.發(fā)送消息

寫(xiě)一個(gè)controller類(lèi)用來(lái)實(shí)現(xiàn)商品的搶購(gòu)

 private int userId=0;
    //開(kāi)始搶單
    @RequestMapping("/begin")
    @ResponseBody
    public void begin(){
        userId++;
        this.send(new MessageHander(true,userId));
    }


    public String send(MessageHander message){
        //第一個(gè)參數(shù):交換機(jī)名字  第二個(gè)參數(shù):Routing Key的值  第三個(gè)參數(shù):傳遞的消息對(duì)象
        CorrelationData correlationData=new CorrelationData(Integer.toString(message.getUserId()));
        rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.DIRECT_KEY, message,correlationData);
        return "發(fā)送消息成功";
    }
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageHander implements Serializable {

   private boolean flag;

   private int userId;
}

3.接受消息
@Component
public class Receiver {
    private static final Logger log = LoggerFactory.getLogger(Receiver.class);
    @Autowired
    RabbitController controller;

    /**
     * @RabbitListener 可以標(biāo)注在類(lèi)上面,需配合 @RabbitHandler 注解一起使用
     * @RabbitListener 標(biāo)注在類(lèi)上面表示當(dāng)有收到消息的時(shí)候,就交給 @RabbitHandler 的方法處理,具體使用哪個(gè)方法處理,
     * 根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類(lèi)型
     *
     *
     * 通過(guò) ACK 確認(rèn)是否被正確接收,每個(gè) Message 都要被確認(rèn)(acknowledged),可以手動(dòng)去 ACK 或自動(dòng) ACK
     */
    @RabbitListener(queues = {RabbitConfig.DIRECT_QUEUE}) //指定監(jiān)聽(tīng)的隊(duì)列名
    public void receiver(MessageHander messageHander, @Headers Channel channel, Message message) throws IOException {
        log.info("用戶(hù){}開(kāi)始搶單", messageHander.getUserId());
        try {
            //處理消息
            controller.robbingProduct(messageHander.getUserId());
           //  確認(rèn)消息已經(jīng)消費(fèi)成功
         channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            // 拒絕當(dāng)前消息,并把消息返回原隊(duì)列
          channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }
}
public void robbingProduct(Integer userId) {
        QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
        queryWrapper.eq("productNO","123321NO");
        Product product = productService.getOne(queryWrapper);
        if (product != null && product.getTotal() > 0) {
            int i = productService.updateProduct("123321NO");
           if(i>0){
                //插入記錄
                productService.insertProductRecord(new Record(null,"123321NO", userId));
                //發(fā)送短信
                LOGGER.info("用戶(hù){}搶單成功", userId);
            }else {
                LOGGER.error("用戶(hù){}搶單失敗", userId);
            }
        } else {
            LOGGER.error("用戶(hù){}搶單失敗", userId);
        }
  }

3.Jmeter測(cè)試

現(xiàn)在我們可以進(jìn)行測(cè)試,一次發(fā)送1000個(gè)請(qǐng)求代表1000個(gè)用戶(hù),但是商品數(shù)量有限只有200個(gè)商品,用來(lái)模擬商品秒殺業(yè)務(wù)
image.png

image.png

image.png

測(cè)試

image.png

image.png
image.png

原博主文章:https://blog.csdn.net/weixin_44001965/article/details/105557610?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.channel_param&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-2.channel_param

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容