私聊我做畢設(shè)或者實(shí)驗(yàn)課題。
1.設(shè)計(jì)數(shù)據(jù)庫(kù)
設(shè)計(jì)product表,用來(lái)記錄商品的總數(shù)量

image.png
設(shè)計(jì)record表,用來(lái)記錄消費(fèi)者的id

image.png
2.業(yè)務(wù)的實(shí)現(xiàn)
1.導(dǎo)入相關(guān)依賴(lài)
<!--整合rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.相關(guān)文件的配置
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://112.124.17.134:3306/rabbitMQ?serverTimezone=GMT%2B8
username: root
password: 123456
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
rabbitmq:
host: localhost
username: guest
password: guest
publisher-confirms: true # 開(kāi)啟Rabbitmq發(fā)送消息確認(rèn)機(jī)制,發(fā)送消息到隊(duì)列并觸發(fā)回調(diào)方法
publisher-returns: true
listener:
simple:
concurrency: 10 #消費(fèi)者數(shù)量
max-concurrency: 10 #最大消費(fèi)者數(shù)量
prefetch: 1 #限流(消費(fèi)者每次從隊(duì)列獲取的消息數(shù)量)
auto-startup: true #啟動(dòng)時(shí)自動(dòng)啟動(dòng)容器
acknowledge-mode: manual #開(kāi)啟ACK手動(dòng)確認(rèn)模式
mybatis-plus:
mapper-locations: classpath:xz/mapper/xml/*.xml
3.代碼的實(shí)現(xiàn)
1.RabbitConfig類(lèi)的實(shí)現(xiàn)
主要用于生成隊(duì)列和交換機(jī)并進(jìn)行綁定,這里將消息的轉(zhuǎn)化為json輸出,我注釋掉了,因?yàn)楹竺嫖野l(fā)送消息并不是以string的形式發(fā)送,而是自定義一個(gè)消息實(shí)體類(lèi)messageHandler類(lèi)用來(lái)發(fā)送消息
@Component
public class RabbitConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfig.class);
public static final String DIRECT_QUEUE ="DIRECT_QUEUE" ;
public static final String DIRECT_EXCHANGE = "DIRECT_EXCHANGE";
public static final String DIRECT_KEY = "DIRECT_ROUTING_KEY";
@Resource
private RabbitTemplate rabbitTemplate;
@Bean
public AmqpTemplate amqpTemplate() {
/**
* 定義消息轉(zhuǎn)換實(shí)例 ,轉(zhuǎn)化成 JSON傳輸
*
* @return Jackson2JsonMessageConverter
*/
//rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//rabbitTemplate.setEncoding("UTF-8");
// 消息發(fā)送失敗返回到隊(duì)列中,yml需要配置 publisher-returns: true
rabbitTemplate.setMandatory(true);
/**
* 消息發(fā)送到交換器Exchange后觸發(fā)回調(diào)。
* 使用該功能需要開(kāi)啟確認(rèn),spring-boot中配置如下:
* spring.rabbitmq.publisher-confirms = true
*/
rabbitTemplate.setConfirmCallback((CorrelationData correlationData, boolean b, String s)-> {
if (b) {
LOGGER.info("消息已確認(rèn) cause:{}",correlationData.getId());
} else {
LOGGER.info("消息未確認(rèn) cause:{}", s);
}
});
/**
* 通過(guò)實(shí)現(xiàn)ReturnCallback接口,
* 如果消息從交換器發(fā)送到對(duì)應(yīng)隊(duì)列失敗時(shí)觸發(fā)
* 比如根據(jù)發(fā)送消息時(shí)指定的routingKey找不到隊(duì)列時(shí)會(huì)觸發(fā)
* 使用該功能需要開(kāi)啟確認(rèn),spring-boot中配置如下:
* spring.rabbitmq.publisher-returns = true
*/
rabbitTemplate.setReturnCallback((Message message, int replyCode, String replyText, String exchange, String routingKey)-> {
LOGGER.error("消息被退回:{}", message);
LOGGER.error("消息使用的交換機(jī):{}", exchange);
LOGGER.error("消息使用的路由鍵:{}", routingKey);
LOGGER.error("描述:{}", replyText);
});
return rabbitTemplate;
}
/**
* 聲明Direct交換機(jī) 支持持久化.
*
* @return the exchange
*/
@Bean("directExchange")
public Exchange directExchange() {
return ExchangeBuilder.directExchange(DIRECT_EXCHANGE).durable(true).build();
}
/**
* 聲明一個(gè)隊(duì)列 支持持久化.
*
* @return the queue
*/
@Bean("directQueue")
public Queue directQueue() {
return QueueBuilder.durable(DIRECT_QUEUE).build();
}
/**
* 通過(guò)綁定鍵 將指定隊(duì)列綁定到一個(gè)指定的交換機(jī) .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding directBinding(@Qualifier("directQueue") Queue queue,
@Qualifier("directExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(DIRECT_KEY).noargs();
}
}
2.發(fā)送消息
寫(xiě)一個(gè)controller類(lèi)用來(lái)實(shí)現(xiàn)商品的搶購(gòu)
private int userId=0;
//開(kāi)始搶單
@RequestMapping("/begin")
@ResponseBody
public void begin(){
userId++;
this.send(new MessageHander(true,userId));
}
public String send(MessageHander message){
//第一個(gè)參數(shù):交換機(jī)名字 第二個(gè)參數(shù):Routing Key的值 第三個(gè)參數(shù):傳遞的消息對(duì)象
CorrelationData correlationData=new CorrelationData(Integer.toString(message.getUserId()));
rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE, RabbitConfig.DIRECT_KEY, message,correlationData);
return "發(fā)送消息成功";
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class MessageHander implements Serializable {
private boolean flag;
private int userId;
}
3.接受消息
@Component
public class Receiver {
private static final Logger log = LoggerFactory.getLogger(Receiver.class);
@Autowired
RabbitController controller;
/**
* @RabbitListener 可以標(biāo)注在類(lèi)上面,需配合 @RabbitHandler 注解一起使用
* @RabbitListener 標(biāo)注在類(lèi)上面表示當(dāng)有收到消息的時(shí)候,就交給 @RabbitHandler 的方法處理,具體使用哪個(gè)方法處理,
* 根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類(lèi)型
*
*
* 通過(guò) ACK 確認(rèn)是否被正確接收,每個(gè) Message 都要被確認(rèn)(acknowledged),可以手動(dòng)去 ACK 或自動(dòng) ACK
*/
@RabbitListener(queues = {RabbitConfig.DIRECT_QUEUE}) //指定監(jiān)聽(tīng)的隊(duì)列名
public void receiver(MessageHander messageHander, @Headers Channel channel, Message message) throws IOException {
log.info("用戶(hù){}開(kāi)始搶單", messageHander.getUserId());
try {
//處理消息
controller.robbingProduct(messageHander.getUserId());
// 確認(rèn)消息已經(jīng)消費(fèi)成功
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒絕當(dāng)前消息,并把消息返回原隊(duì)列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
}
}
}
public void robbingProduct(Integer userId) {
QueryWrapper<Product> queryWrapper=new QueryWrapper<>();
queryWrapper.eq("productNO","123321NO");
Product product = productService.getOne(queryWrapper);
if (product != null && product.getTotal() > 0) {
int i = productService.updateProduct("123321NO");
if(i>0){
//插入記錄
productService.insertProductRecord(new Record(null,"123321NO", userId));
//發(fā)送短信
LOGGER.info("用戶(hù){}搶單成功", userId);
}else {
LOGGER.error("用戶(hù){}搶單失敗", userId);
}
} else {
LOGGER.error("用戶(hù){}搶單失敗", userId);
}
}
3.Jmeter測(cè)試
現(xiàn)在我們可以進(jìn)行測(cè)試,一次發(fā)送1000個(gè)請(qǐng)求代表1000個(gè)用戶(hù),但是商品數(shù)量有限只有200個(gè)商品,用來(lái)模擬商品秒殺業(yè)務(wù)

image.png

image.png

image.png
測(cè)試

image.png

image.png

image.png