邏輯比較簡單,大概是用戶點擊商品,對應(yīng)的為每個商品統(tǒng)計一下被點擊次數(shù),然后根據(jù)商品的次數(shù)降序排序得到一個熱門查看排行榜。
邏輯挺簡單,只是考慮排行榜的數(shù)據(jù)屬于熱點數(shù)據(jù),并發(fā)過多時,直接到數(shù)據(jù)庫可能會引起性能問題,所以這邊用redis隊列方式進行統(tǒng)計,所以數(shù)據(jù)還是會延遲更新到到數(shù)據(jù)庫的,生成排行榜時依舊是走數(shù)據(jù)庫(其實就是一種思路嘗鮮)。
所以這邊通過redis的List作為消息隊列,多個進程向隊列寫,一個進程讀取隊列,并將其更新到數(shù)據(jù)庫中,然后通過nodejs事件的方式進行消息的消化:
import * as events from "events";
import { RedisService } from "./redis.service";
export default class TestEvent extends events.EventEmitter {
static client: TestEvent;
private constructor() {
super();
this.setMaxListeners(0);
this.on("cache_record", this.cacheRecord);
}
async cacheRecord(result: { result: any}) {
// 將消息從右邊壓入隊列
await RedisService.rpush("cache:key", values);
}
static getClient() {
if (!TestEvent.client) {
TestEvent.client = new TestEvent();
}
return TestEvent.client;
}
}
通過事件把消息緩存到redis:
import TestEvent from "./test.event";
@Path("/detail")
export default class TestController {
@GET("/:id")
async searchPlant(req: Request, res: Response, next: NextFunction): Promise<void> {
try {
// 處理完業(yè)務(wù)邏輯后,通過事件將用戶點擊行為緩存到redis
TestEvent.getClient().emit("cache_record", "");
res.send();
} catch (e) {
console.log(e);
next(new ErrorResponse("system error"));
}
}
}
現(xiàn)在消息的添加已經(jīng)有了,下一步進行消息緩存,通過setImmediate()去不斷的消費數(shù)據(jù):
// 通過redis包封裝的一些redis指令service
import { RedisService } from "./redis.service";
export async function persistenceRecord() {
if (INSTANCE_ID === 0) {
return setImmediate(persistenceRecord);
}
try {
// 從隊列里取出消息,進行相應(yīng)處理
// 這里需要注意,blpop是阻塞指令,不要和非阻塞指令共用一個連接
const record: string = await RedisService.blpop(cache:key, 1800);
} catch (e) {
console.log(e);
}
// 自注冊,進行下一次消費
setImmediate(persistenceRecord);
}
由于是用pm2開啟的cluster模式,所以必須通過pm2定義的cluster實例ID:INSTANCE_ID進行指定某個進程進行消息消費(詳情配置可看這里)。
至于blpop為什么不要和非阻塞指令共用一個連接,看這里。