Redis 使用隊(duì)列進(jìn)行批量處理

需求

某個(gè)請(qǐng)求會(huì)有時(shí)候突然很大的訪問(wèn),這些訪問(wèn)如果都是一個(gè)個(gè)處理,會(huì)導(dǎo)致很高的 DB 與 程序的 CPU 占用。

解決

通過(guò) redis 隊(duì)列存儲(chǔ)進(jìn)來(lái)的內(nèi)容,然后批量進(jìn)行處理。

備注:
使用 redis.rpop 主要是為了避免通過(guò) ruby 的 loop 一直遍歷讀取 redis 的值,判斷是否有更新。

  WS_CHANNEL = 'ws_channel_1'
  MESSAGE_BULK_SIZE = 20 # 批量處理的消息數(shù)量

  # 讀取隊(duì)列頂部(最早)的信息(阻塞的,直到有信息才繼續(xù)執(zhí)行)
  while (msg = redis.rpop(WS_CHANNEL))
    messages = [msg[1]] # 消息格式如下:["ws_channel", "{:id=>4}"]
    
    # sleep(0.5) # 如果有需要,可以加等待,等足夠多的信息再進(jìn)行處理,但影響實(shí)時(shí)性能

    # 批量讀取頂部的信息(假設(shè)突然間有多個(gè)信息進(jìn)入到 redis, 使用 pile_lines 優(yōu)化性能)
    pipe_lines = redis.pipelined do
      # 讀取信息,并按時(shí)間的遠(yuǎn)到近排序
      redis.lrange(WS_CHANNEL, 0, MESSAGE_BULK_SIZE)

      # 清空
      redis.ltrim(WS_CHANNEL, MESSAGE_BULK_SIZE + 1, -1)
    end
    ext_messages = pipe_lines[0]

    # 按最早到最晚排序
    messages += ext_messages.reverse if ext_messages.present?
    
    # 輸入內(nèi)容
    puts messages
  end

其他用途

也可以參考上述代碼,通過(guò)隊(duì)列監(jiān)聽(tīng)取代 pub/sub。非推薦,只是作為簡(jiǎn)化的可選方案。

參考

Redis 參考:https://www.runoob.com/redis/redis-lists.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容