看了https://mp.weixin.qq.com/s/8DFA36YvDdRSaM4JwNBWpQ之后,也仿寫了一個(gè)秒殺系統(tǒng),稱為v1版。最近對openresty和lua有了一些技術(shù)儲備、就開發(fā)了一套新的v2版秒殺。
seckill-v2秒殺系統(tǒng)
一、功能介紹
秒殺系統(tǒng)v2,主要提供3個(gè)接口方法:
-
/seckill/rest/OutletAndStock GET
查詢接口,返回當(dāng)前開放的預(yù)約網(wǎng)點(diǎn)以及庫存信息。
-
/seckill/rest/appointment POST
下單預(yù)約接口,接收姓名、身份證、手機(jī)號、預(yù)約網(wǎng)點(diǎn)等信息,執(zhí)行預(yù)約邏輯。
-
/seckill/rest/refreshRation POST
更新了數(shù)據(jù)庫里的配給和庫存之后,同步到redis里。給管理端調(diào)用的。
二、軟件架構(gòu)設(shè)計(jì)

1、負(fù)責(zé)均衡層
openresty + lua 來做負(fù)載均衡層,可以集群部署,上面用F5或lvs來做接入。
這一層主要是解決限流(nginx限流),防刷邏輯(比如同一個(gè)ip+token每x秒只允許一次請求),還一個(gè)是做庫存感知、沒貨以后馬上阻擋(比如返回一個(gè)靜態(tài)頁面)接下來的無效請求到后端核心服務(wù)。
(1)庫存感知timer 每1s查詢r(jià)edis里網(wǎng)點(diǎn)的剩余庫存,更新到openresty本地share-dict,share-dict里邊存放剩余庫存給前端展示用,另外下單預(yù)約請求先查一下share-dict、如果庫存沒了則直接返回前端“活動(dòng)已結(jié)束”。
(2)網(wǎng)點(diǎn)和庫存查詢接口 返回本地share-dict里的剩余庫存給前端。
(3)預(yù)約購買次數(shù)限制校驗(yàn) 每人每5天只能預(yù)約購買一次。redis里邊維護(hù)一份預(yù)約購買列表,由timer加載到share_dict。(核心服務(wù)層還會做一次這個(gè)校驗(yàn))
(4)限流模塊
雖然前面的庫存感知和預(yù)約次數(shù)限制校驗(yàn)可以在這里限制大部分無效請求進(jìn)入,但是考慮到極端情況,比如1秒內(nèi)有10萬這種級別的流量涌入、庫存感知和次數(shù)校驗(yàn)還來不及與redis同步一致,所以這些請求流量會穿過openresty到達(dá)后面的核心服務(wù)層,對于核心服務(wù)層的java應(yīng)用來說這個(gè)級別的請求處理起來是比較吃力的,所以我們需要在負(fù)載均衡層這里做一下限流。這里采用nginx自帶的限流功能:
nginx.conf http:
limit_req_zone $binary_remote_addr zone=perip_rps:10m rate=5r/s; #單ip每秒限制5個(gè)請求
limit_req_zone $server_name zone=perserver_rps:10m rate=3000r/s; #每個(gè)server每秒限制處理3000個(gè)請求
server:
limit_req zone=perserver_rps burst=2000 nodelay; #server每秒請求限流
location:
limit_req zone=perip_rps burst=10 nodelay; #每個(gè)ip每秒請求如果超過limit_req_zone的配置,最多可以緩沖10個(gè)
這里我們利用limit_req做了兩個(gè)維度的限流,首先是單個(gè)ip限制每秒5個(gè)請求、突發(fā)最多允許10個(gè),這里配置了nodelay意思是正常情況下應(yīng)該是200ms漏桶通過一個(gè)請求,但如果一下來了10個(gè)請求的話也是可以給通過,只不過后續(xù)要過2s才可以通過下一個(gè)請求。
然后是整個(gè)server限制3000的rps,允許突發(fā)2000。
2、核心服務(wù)層
由springboot + redis組成,redis里邊存訂單和庫存信息。
(1)下單預(yù)約接口,由redis lua script保證扣庫存操作的原子性,然后將訂單生成請求提交給mq。
除了庫存判斷之外,這里還要做一次預(yù)約購買次數(shù)限制的校驗(yàn),原因在于nginx上面是定時(shí)(比如500ms一次)去redis讀取blacklist的。在扣庫存之前一定要校驗(yàn)一下預(yù)約次數(shù)的規(guī)則。
再一個(gè)可選的查詢商品信息和庫存的接口、之所以可選是因?yàn)槿绻翘禺惢拿霘⑾到y(tǒng),完全可以把商品信息靜態(tài)化到cdn上的頁面上去。另外庫存不顯示也沒太大問題,庫存沒了會由負(fù)載均衡層動(dòng)態(tài)判定并攔截掉、或者活動(dòng)結(jié)束以后直接掛一個(gè)活動(dòng)結(jié)束的頁面上去。
3、異步服務(wù)層
核心服務(wù)層我們盡量做到功能單一化,把可以異步處理的邏輯用RocketMQ從核心服務(wù)中剝離出來,只保留必要的邏輯供負(fù)載均衡層過來的流量進(jìn)行同步調(diào)用。RocketMQ這里起到的便是削峰緩沖的作用了,提高整體的吞吐能力。這樣異步邏輯由于不直接承載C端的流量,并且異步服務(wù)作為末端業(yè)務(wù)邏輯相比最前端的負(fù)載均衡層流量要下降幾個(gè)數(shù)量級(想象10萬人搶100個(gè)商品,最后走到異步服務(wù)層去生成訂單落庫),可以作為mq的消費(fèi)端以較少的算力資源進(jìn)行部署。這些異步邏輯可能包括訂單寫入數(shù)據(jù)庫等等。
(1)異步訂單入庫: 從RocketMQ中拿訂單,然后寫入MySQL。消費(fèi)和入庫都使用批量處理,以提高效率。
(2)每天去redis更新維護(hù)一下購買記錄,做每日購買次數(shù)限制用。
三、數(shù)據(jù)結(jié)構(gòu)
redis數(shù)據(jù)結(jié)構(gòu)
某網(wǎng)點(diǎn)庫存
key:outlet:{id}:date:{date}:stock
value:String類型,存放網(wǎng)點(diǎn)的庫存
例如,key: outlet:1:date:2021-10-01:stock , value: 3000
某日內(nèi)已預(yù)約的身份證名單
key:appointment:idNos:{date}
value:Set類型 idNo1 ... idNon
mysql數(shù)據(jù)結(jié)構(gòu)
網(wǎng)點(diǎn)表
CREATE TABLE `t_seckill_outlet` (
`outlet_id` bigint(20) NOT NULL AUTO_INCREMENT,
`address` varchar(150) DEFAULT NULL COMMENT '網(wǎng)點(diǎn)地址',
`outlet_name` varchar(50) DEFAULT NULL COMMENT '網(wǎng)點(diǎn)名稱',
PRIMARY KEY (`outlet_id`)
)
預(yù)約記錄明細(xì)表
CREATE TABLE `t_seckill_appointment` (
`appointment_id` bigint(20) NOT NULL AUTO_INCREMENT,
`date` varchar(10) DEFAULT NULL COMMENT '日期',
`id_card` varchar(18) DEFAULT NULL COMMENT '身份證號',
`mobile` varchar(11) DEFAULT NULL COMMENT '手機(jī)號',
`name` varchar(15) DEFAULT NULL COMMENT '姓名',
`outlet_id` mediumtext COMMENT '網(wǎng)點(diǎn)ID',
PRIMARY KEY (`appointment_id`)
)
四、核心代碼
openresty上的lua代碼和nginx配置文件:
定時(shí)timer我們放在nginx.conf的http位置
init_worker_by_lua_file lua/wangan/seckill/task_timer.lua;
task_timer.lua代碼如下:
--[[
定時(shí)從redis加載網(wǎng)點(diǎn)和庫存到本地內(nèi)存
]]
local redis = require "wangan.common.redis_iresty"
local cache = require "wangan.common.share_cache"
local red = redis:new({
ip = "122.51.114.183",
port = 6379,
password = "7474@628",
timeout = 2000,
db_index = 0,
max_idle_ms = 60000,
pool_size = 32
})
local delay = 1 --每delay秒跟redis同步一次數(shù)據(jù)
local handlerRepeat
handlerRepeat = function ( ... )
--ngx.log(ngx.INFO, "從redis加載網(wǎng)點(diǎn)和庫存到本地內(nèi)存...")
local len = red:llen("outlets:ids")
local outlets = red:lrange("outlets:ids", 0, len)
local today = ngx.today()
for _, v in pairs(outlets) do --遍歷outlet id
local stock_key = "outlet:" .. v .. ":date:" ..today .. ":stock"
local stock = red:get(stock_key) --從redis查到當(dāng)日這個(gè)網(wǎng)點(diǎn)的庫存
if stock then
--ngx.log(ngx.INFO, stock_key)
--ngx.log(ngx.INFO, stock)
local cache_v = cache.get_from_cache(stock_key)
local ok, err = cache.set_to_cache(stock_key, stock, 30) --緩存在本地內(nèi)存shared_dict
if not ok then
ngx.log(ngx.ERR , "寫入本地緩存失?。?, err)
end
--[[
if cache_v then
ngx.log(ngx.INFO, "shared_dict中" .. stock_key .. "的庫存是" .. cache_v)
end
if not cache_v then
ngx.log(ngx.INFO, cache_v)
end
]]
end
end
end
local ok, err = ngx.timer.every(delay, handlerRepeat)
if not ok then
ngx.log(ngx.ERR, "創(chuàng)建timer.every(delay, handlerRepeat)失?。?, err)
return
end
如上面這樣openresty的本地內(nèi)存里邊就有每個(gè)網(wǎng)點(diǎn)的剩余庫存了,且1秒跟redis同步一次,數(shù)據(jù)比較實(shí)時(shí)。當(dāng)庫存沒了可以直接從本地內(nèi)存查到并返回客戶端,不用再去redis或者去后邊的核心服務(wù)去查了。很大程度上提高了性能。
所以接下來就是在請求的access階段去做這個(gè)庫存校驗(yàn):
--[[
預(yù)約校驗(yàn)
]]
local cache = require "wangan.common.share_cache"
local json = require "cjson"
--先讀request body
ngx.req.read_body()
--從request body里獲取參數(shù)
--local args = ngx.req.get_post_args()
--獲取request body data
local request_body_data = ngx.req.get_body_data()
if not request_body_data then
ngx.say("request body is nil")
return
end
ngx.log(ngx.INFO, "request body string", request_body_data)
--將request body data解析為json
local request_body_json = json.decode(request_body_data)
--ngx.log(ngx.INFO, "request body json", request_body_json)
--請求參數(shù)校驗(yàn)
local outletId = request_body_json.outletId
if not outletId then
ngx.say("網(wǎng)點(diǎn)id不可為空")
return
end
--庫存剩余校驗(yàn)
local stock_key = "outlet:" .. outletId .. ":date:" .. ngx.today() .. ":stock"
ngx.log(ngx.INFO, "stock_key ", stock_key)
local stock = cache.get_from_cache(stock_key);
if not stock then
ngx.say("未查到庫存,該網(wǎng)點(diǎn)尚未開始預(yù)約")
return
end
ngx.log(ngx.INFO, stock_key .. " , 當(dāng)前庫存: ", stock)
if tonumber(stock)<=0 then
ngx.say("庫存已空,已預(yù)約完畢,感謝參與");
return
end
關(guān)于openresty的執(zhí)行階段,可以進(jìn)一步參考:https://blog.51cto.com/lisea/2425794
然后是nginx.conf配置:
lua_shared_dict my_cache 128m;
init_worker_by_lua_file lua/wangan/seckill/task_timer.lua;
upstream seckillcore {
server 127.0.0.1:8080;
}
server {
listen 80;
server_name localhost;
#開發(fā)調(diào)試模式、關(guān)閉lua代碼緩存,生產(chǎn)環(huán)境請勿關(guān)閉
lua_code_cache off;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
root html;
index index.html index.htm;
}
location /seckill/rest/appointment {
default_type text/html;
access_by_lua_file lua/wangan/seckill/appointment_check.lua;
proxy_pass http://seckillcore;
proxy_redirect default;
}
}
核心服務(wù)里的java代碼:
/**
* 核心業(yè)務(wù)邏輯
* */
@Slf4j
@Service
public class SeckillService {
@Autowired
private RedisDao redisDao;
@Autowired
private DefaultRedisScript<List> deductMyStock;
@Autowired
private GeneralMqProducer generalMqProducer;
/**
* 預(yù)約業(yè)務(wù)邏輯
*
* */
public String appointment(@RequestBody AppointmentDto dto) {
AppointmentDetail appointDetail = new AppointmentDetail();
appointDetail.setName(dto.getName());
appointDetail.setDate(LocalDate.now().toString());
appointDetail.setIdCard(dto.getIdCard());
appointDetail.setMobile(dto.getMobile());
appointDetail.setOutletId(dto.getOutletId());
log.info(JSON.toJSONString(appointDetail));
//使用redis script扣庫存, 如成功則添加此身份證號到redis里的已預(yù)約列表
List<String> keys = new ArrayList<>();
keys.add("outlet:"+dto.getOutletId()+":date:"+LocalDate.now().toString()+":stock");
Map<String, Object> args = new HashMap<>();
args.put("buyNum", 5);
args.put("idCard", dto.getIdCard());
args.put("dates", dateList5());
List result = redisDao.executeScript(deductMyStock, keys, args);
Long errCode = (Long)result.get(0);
String errMsg = (String)result.get(1);
//預(yù)約,扣庫存。
if (errCode.longValue()==0) {
//用rocketmq異步寫預(yù)約記錄
sendAppointmentToMq(appointDetail);
return "預(yù)約成功";
} else {
if(errCode.longValue()==1) {
return "您" + errMsg + "預(yù)約過";
}else if(errCode.longValue()==2) {
return "網(wǎng)點(diǎn)尚未開放預(yù)約,請耐心等待";
}else if(errCode.longValue()==3) {
return "庫存不足";
}
return "預(yù)約失敗";
}
}
/**
* 異步寫預(yù)約記錄
* */
private void sendAppointmentToMq(AppointmentDetail appointDetail) {
String appointJson = JSON.toJSONString(appointDetail);
EventMessage eventMsg = new EventMessage();
eventMsg.setTopic("order");
eventMsg.setTag("newOrder");
eventMsg.setMsgBody(appointJson);
generalMqProducer.asyncPublish(eventMsg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("預(yù)約訂單入庫消息寫入rocketmq成功,消息ID:{}", sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
//如果與mq通信故障了,那么可以從日志文件里找到預(yù)約記錄,手工執(zhí)行寫入mysql
log.error("預(yù)約訂單寫入rocketmq失敗:{}, exception detail:{}" , appointJson , e.getMessage());
}
});
}
/**
* 返回從今天還是算往前5天的日期列表
* */
private List<String> dateList5(){
List<String> dates = new ArrayList<>();
LocalDate today = LocalDate.now();
for(int i=0; i<5; i++) {
dates.add(today.minusDays(i).toString());
}
return dates;
}
}
扣庫存、進(jìn)行5日內(nèi)已預(yù)約校驗(yàn)的redis lua腳本:
--[[
扣減redis庫存lua script
KEYS[1] 庫存key名稱,例如outlet:1:date:2021-10-01:stock
ARGV[1] 參數(shù),json字符串
buyNum表示一次扣多少庫存
idCard表示預(yù)約人身份證號
dates:從當(dāng)日開始往前倒排5天的日期的一個(gè)列表{"2021-10-06","2021-10-05","2021-10-04","2021-10-03","2021-10-02"}
返回 {int, string}
0成功, 1已經(jīng)5天內(nèi)預(yù)約過, 2網(wǎng)點(diǎn)尚未開放, 3庫存不足
]]
local stock_key = KEYS[1]
local args = ARGV[1]
redis.log(redis.LOG_NOTICE, stock_key)
redis.log(redis.LOG_NOTICE, args)
local args_json = cjson.decode(args)
local buy_num = args_json.buyNum
local id_card = args_json.idCard
local dates = args_json.dates
--查詢該身份證是否已預(yù)約過, 5日內(nèi)
for _,v in pairs(dates) do
local is= redis.call("sismember", "appointment:idNos:" .. v, id_card)
if is==1 then
return {1, v} --返回在哪天預(yù)約過
end
end
--扣庫存
local current_stock = redis.call("get", stock_key)
--redis.log(redis.LOG_NOTICE, type(current_stock))
if not current_stock then
return {2, "該網(wǎng)點(diǎn)尚未正式開放預(yù)約"}
end
if tonumber(current_stock) >= buy_num then
redis.call("set", stock_key, tonumber(current_stock) - buy_num) --庫存減去buy_num
redis.call("sadd", "appointment:idNos:" .. dates[1], id_card) --把身份證號寫入當(dāng)日預(yù)約記錄
return {0, "ok"}
end
return {3, "庫存不足"} --庫存不足
異步服務(wù)批量處理一次從RocketMQ輪詢到的訂單、批量入庫:
//注冊consumer,并使其訂閱相應(yīng)的topic、tag
private void registConsumer(MQMsgHandler msgHandler, String consumerGroup, String topic, String tag) {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup, getAclRPCHook(), new AllocateMessageQueueAveragely());
try {
consumer.setNamesrvAddr(mqurl);
consumer.setConsumeThreadMin(consumeThreadCorePoolSize);
consumer.setConsumeThreadMax(consumeThreadCorePoolSize);
consumer.setPullBatchSize(32); //一次長輪詢最多從mq里拿多少個(gè)消息,默認(rèn)32
consumer.subscribe(topic, tag);
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
List<EventMessage> eventMsgs = new ArrayList<>();
String msgContent = null;
try {
for(MessageExt msg : msgs) {
msgContent = new String(msg.getBody(),"utf-8");
EventMessage eventMsg = JSON.parseObject(msgContent, EventMessage.class);
log.debug(JSON.toJSONString(eventMsg));
eventMsgs.add(eventMsg);
}
msgHandler.handleMsg(eventMsgs); //批量處理本次拉取的消息,執(zhí)行業(yè)務(wù)邏輯
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
log.error("消息編碼錯(cuò)誤:" + e.getMessage(), e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}catch(Exception e) {
log.error("注冊消費(fèi)者出錯(cuò)" + e.getMessage(), e);
}
}
批量入庫:
@Slf4j
@Component
@MsgConsumer(consumerGroup = "newOrder-consumer-group", tag = "newOrder", topic = "order")
public class NewOrderMsgHandler implements MQMsgHandler{
@Autowired
private AppointmentDetailRepository appointmentDetailRepository;
@Override
public void handleMsg(List<EventMessage> eventMessages) {
log.debug("收到mq消息: {}", JSON.toJSONString(eventMessages));
List<AppointmentDetail> appointmentDetails = new ArrayList<>();
for(EventMessage eventMsg : eventMessages) {
AppointmentDetail appointmentDetail = JSON.parseObject(eventMsg.getMsgBody(), AppointmentDetail.class);
appointmentDetails.add(appointmentDetail);
}
appointmentDetailRepository.saveAll(appointmentDetails); //批量入庫
}
}
五、改進(jìn)與優(yōu)勢
相比V1版,相當(dāng)于把原來本地java內(nèi)存里的操作搬到redis上,然后一些個(gè)接口服務(wù)提前:由openresty調(diào)用redis,把一些業(yè)務(wù)邏輯直接在負(fù)載均衡層做掉。
V2版的優(yōu)勢還在于可以橫向擴(kuò)展算力來增加整體系統(tǒng)的性能。其實(shí)如果v1版單機(jī)承受范圍內(nèi)的話,直接讀寫本地內(nèi)存不見得比v2版性能差、可能還略好。但是請求量再大的話,v1就沒辦法了單機(jī)算力配置是有限的,而v2的優(yōu)勢就發(fā)揮出來了、因?yàn)榭梢岳^續(xù)擴(kuò)容算力,也就是說是可以橫向擴(kuò)展的架構(gòu)。