主要思路是用一個set做前端去重緩沖, 若干個list做后端的多優(yōu)先級消息隊列, 用一個進程來進行分發(fā), 即從set中分發(fā)消息到隊列.
set緩沖的設(shè)計為當天有效, 所以有個零點問題,有可能在零點前set中剛放進去的消息沒有分發(fā)即失效, 這一點可以用另一個進程彌補處理前一天的遺留消息和刪除前一天的緩沖
<?php
class MsgQuery {
?// TODO - Insert your code here
const KEY_CACHE_PREFIX = 'mass.query.cache'; // 消息緩沖key前綴
const KEY_QUERY_PREFIX = 'mass.query.lv'; // 消息key
const KEY_CACHE_DEAL_PREFIX = 'mass.query.deal'; // 已處理緩沖key前綴
const SCORE_NUM = 5; // 優(yōu)先級劃分數(shù)目
const MIN_SCORE = 1; // 最小優(yōu)先級
static $MAX_SCORE;
static $instance = null;
private $redis;
public static function getInstance($redis) {
if (null == self::$instance) {
self::$instance = new MsgQuery ( $redis );
}
return self::$instance;
}
/**
* 添加消息到消息緩沖區(qū)
* @param int $score 優(yōu)先級(1-5)
* @param string $msg 消息
*/
public function add($score, $msg) {
// 添加到消息緩沖
$socre = intval ( $score );
if ($socre < self::MIN_SCORE) {
$score = self::MIN_SCORE;
}
if ($score > self::$MAX_SCORE) {
$score = self::$MAX_SCORE;
}
$cacheKey = self::KEY_CACHE_PREFIX . date ( 'Ymd' );
$cacheData = array (
'score' => $score,
'msg' => $msg
);
$this->redis->sAdd ( $cacheKey, serialize ( $cacheData ) );
}
/**
* 將消息從緩沖區(qū)移動到相應(yīng)的優(yōu)先級隊列中
*/
public function moveToQuery() {
// 獲取當前緩沖區(qū)沒有入隊列的消息
$dealKey = self::KEY_CACHE_DEAL_PREFIX.date('Ymd');
$cacheKey = self::KEY_CACHE_PREFIX.date('Ymd');
$msgs = $this->redis->sDiff($cacheKey, $dealKey);
foreach ($msgs as $cachedData){
// 放入已處理集合
$this->redis->sAdd ( $dealKey, $cachedData );
// 壓入相應(yīng)的優(yōu)先級隊列
$cachedData = unserialize($cachedData);
$score = $cachedData['score'];
$msg = $cachedData['msg'];
$queryKey = self::KEY_QUERY_PREFIX.$score;
$this->redis->rPush($queryKey, $msg);
}
unset($cachedData);
}
/**
* 從隊列阻塞式出棧一個最高優(yōu)先級消息
* @return string msg
*/
public function bPop(){
$queryKeys = array();
for($score=self::$MAX_SCORE;$score>=self::MIN_SCORE;$score--){
$queryKeys[] = self::KEY_QUERY_PREFIX.$score;
}
$msg = $this->redis->blPop($queryKeys, 0);
return $msg[1];
}
private function __construct($redis) {
$this->redis = $redis;
$this->redis->connect ();
self::$MAX_SCORE = self::MIN_SCORE + self::SCORE_NUM - 1;
}
private function __destruct() {
$this->redis->close ();
}
}
?>