需求
某個(gè)請(qǐng)求會(huì)有時(shí)候突然很大的訪問(wèn),這些訪問(wèn)如果都是一個(gè)個(gè)處理,會(huì)導(dǎo)致很高的 DB 與 程序的 CPU 占用。
解決
通過(guò) redis 隊(duì)列存儲(chǔ)進(jìn)來(lái)的內(nèi)容,然后批量進(jìn)行處理。
備注:
使用 redis.rpop 主要是為了避免通過(guò) ruby 的 loop 一直遍歷讀取 redis 的值,判斷是否有更新。
WS_CHANNEL = 'ws_channel_1'
MESSAGE_BULK_SIZE = 20 # 批量處理的消息數(shù)量
# 讀取隊(duì)列頂部(最早)的信息(阻塞的,直到有信息才繼續(xù)執(zhí)行)
while (msg = redis.rpop(WS_CHANNEL))
messages = [msg[1]] # 消息格式如下:["ws_channel", "{:id=>4}"]
# sleep(0.5) # 如果有需要,可以加等待,等足夠多的信息再進(jìn)行處理,但影響實(shí)時(shí)性能
# 批量讀取頂部的信息(假設(shè)突然間有多個(gè)信息進(jìn)入到 redis, 使用 pile_lines 優(yōu)化性能)
pipe_lines = redis.pipelined do
# 讀取信息,并按時(shí)間的遠(yuǎn)到近排序
redis.lrange(WS_CHANNEL, 0, MESSAGE_BULK_SIZE)
# 清空
redis.ltrim(WS_CHANNEL, MESSAGE_BULK_SIZE + 1, -1)
end
ext_messages = pipe_lines[0]
# 按最早到最晚排序
messages += ext_messages.reverse if ext_messages.present?
# 輸入內(nèi)容
puts messages
end
其他用途
也可以參考上述代碼,通過(guò)隊(duì)列監(jiān)聽(tīng)取代 pub/sub。非推薦,只是作為簡(jiǎn)化的可選方案。