Redis+PHP實現(xiàn)的一個優(yōu)先級去重隊列

主要思路是用一個set做前端去重緩沖, 若干個list做后端的多優(yōu)先級消息隊列, 用一個進程來進行分發(fā), 即從set中分發(fā)消息到隊列.

set緩沖的設(shè)計為當天有效, 所以有個零點問題,有可能在零點前set中剛放進去的消息沒有分發(fā)即失效, 這一點可以用另一個進程彌補處理前一天的遺留消息和刪除前一天的緩沖


<?php

class MsgQuery {

?// TODO - Insert your code here

const KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息緩沖key前綴

const KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key

const KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已處理緩沖key前綴

const SCORE_NUM = 5; // 優(yōu)先級劃分數(shù)目

const MIN_SCORE = 1; // 最小優(yōu)先級

static $MAX_SCORE;

static $instance = null;

private $redis;

public static function getInstance($redis) {

if (null == self::$instance) {

self::$instance = new MsgQuery ( $redis );

}

return self::$instance;

}

/**

* 添加消息到消息緩沖區(qū)

* @param int $score 優(yōu)先級(1-5)

* @param string $msg 消息

*/

public function add($score, $msg) {

// 添加到消息緩沖

$socre = intval ( $score );

if ($socre < self::MIN_SCORE) {

$score = self::MIN_SCORE;

}

if ($score > self::$MAX_SCORE) {

$score = self::$MAX_SCORE;

}

$cacheKey = self::KEY_CACHE_PREFIX . date ( 'Ymd' );

$cacheData = array (

'score' => $score,

'msg' => $msg

);

$this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) );

}

/**

* 將消息從緩沖區(qū)移動到相應(yīng)的優(yōu)先級隊列中

*/

public function moveToQuery() {

// 獲取當前緩沖區(qū)沒有入隊列的消息

$dealKey = self::KEY_CACHE_DEAL_PREFIX.date('Ymd');

$cacheKey = self::KEY_CACHE_PREFIX.date('Ymd');

$msgs = $this->redis->sDiff($cacheKey, $dealKey);

foreach ($msgs as $cachedData){

// 放入已處理集合

$this->redis->sAdd ( $dealKey, $cachedData );

// 壓入相應(yīng)的優(yōu)先級隊列

$cachedData = unserialize($cachedData);

$score = $cachedData['score'];

$msg = $cachedData['msg'];

$queryKey = self::KEY_QUERY_PREFIX.$score;

$this->redis->rPush($queryKey, $msg);

}

unset($cachedData);

}

/**

* 從隊列阻塞式出棧一個最高優(yōu)先級消息

* @return string msg

*/

public function bPop(){

$queryKeys = array();

for($score=self::$MAX_SCORE;$score>=self::MIN_SCORE;$score--){

$queryKeys[] = self::KEY_QUERY_PREFIX.$score;

}

$msg = $this->redis->blPop($queryKeys, 0);

return $msg[1];

}

private function __construct($redis) {

$this->redis = $redis;

$this->redis->connect ();

self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1;

}

private function __destruct() {

$this->redis->close ();

}

}


?>

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,551評論 19 139
  • 轉(zhuǎn)載地址:http://gnucto.blog.51cto.com/3391516/998509 Redis與Me...
    Ddaidai閱讀 21,547評論 0 82
  • 1.1 資料 ,最好的入門小冊子,可以先于一切文檔之前看,免費。 作者Antirez的博客,Antirez維護的R...
    JefferyLcm閱讀 17,309評論 1 51
  • 轉(zhuǎn)至元數(shù)據(jù)結(jié)尾創(chuàng)建: 董瀟偉,最新修改于: 十二月 23, 2016 轉(zhuǎn)至元數(shù)據(jù)起始第一章:isa和Class一....
    40c0490e5268閱讀 2,043評論 0 9
  • 非原創(chuàng),記錄。 1、連接操作相關(guān)的命令 quit:關(guān)閉連接(connection) auth:簡單密碼認證 2、對...
    FocusOnMyself閱讀 439評論 0 0

友情鏈接更多精彩內(nèi)容