Node.js中多線程,多主機(jī)下數(shù)據(jù)的處理

先看代碼

demo代碼背景:通過Socket,實(shí)時同步筆記內(nèi)容

const interviewDict: Record<string, InterviewDetail> = {};

export const interviewWebSocket: expressWs.WebsocketRequestHandler = (ws: IWebSocket, req) => {
  const id = req.query.id as string;
  ws.interviewId = id;
  ws.uuid = uuidv4();
  ws.on('message', msg => {
    if (!ArrayBuffer.isView(msg)) {
      return;
    }
    const [type] = msg;
    const valueBytes = msg.slice(1);
    const value = decode.decode(valueBytes);
    switch (type) {
      case encode.encode(WSTypes.heartbeat)[0]:
        ws.send(getBuffer(WSTypes.heartbeat, id));
        return;
      case encode.encode(WSTypes.setValue)[0]:
        if (value && interviewDict[id]) {
          interviewDict[id].value = value;
          getWsInstance()
            ?.getWss()
            ?.clients?.forEach((wsItem: IWebSocket) => {
            console.log('ws', wsItem.interviewId, wsItem.uuid, ws.uuid);
            if (wsItem.interviewId === id && wsItem.uuid !== ws.uuid) {
              wsItem.send(getBuffer(WSTypes.getValue, value));
            }
          });
        }
        return;
      case encode.encode(WSTypes.getValue)[0]:
        if (interviewDict[id]) {
          ws.send(getBuffer(WSTypes.getValue, interviewDict[id].value));
        }
        return;
    }
  });
};
復(fù)制代碼

會不會出現(xiàn)問題?

什么時候出現(xiàn)問題?

場景一:

為了充分利用cpu的多核,需要通過fork方式進(jìn)行多進(jìn)程運(yùn)行。

例:Egg默認(rèn)執(zhí)行start會創(chuàng)建和 CPU 核數(shù)相當(dāng)?shù)?app worker進(jìn)程。

解決方法:進(jìn)程間通信

Egg中的Master、Agent和Worker

當(dāng)一個應(yīng)用啟動時,會同時啟動這三類進(jìn)程。

類型 進(jìn)程數(shù)量 作用 穩(wěn)定性 是否運(yùn)行業(yè)務(wù)代碼
Master 1 進(jìn)程管理,進(jìn)程間消息轉(zhuǎn)發(fā) 非常高
Agent 1 后臺運(yùn)行工作(長連接客戶端) 少量
Worker 一般設(shè)置為 CPU 核數(shù) 執(zhí)行業(yè)務(wù)代碼 一般

interviewDict需要放在Agent中

// 發(fā)送
app.messenger.once('egg-ready', () => {
    app.messenger.sendToAgent('agent-event', { foo: 'bar' });
    app.messenger.sendToApp('app-event', { foo: 'bar' });
  });
};
// 接收
app.messenger.on(action, (data) => {
  // process data
});
app.messenger.once(action, (data) => {
  // process data
});
復(fù)制代碼

擴(kuò)展:為什么Node.js不適合cpu密集型任務(wù)

進(jìn)程間通信與線程間通訊:

node是單線程應(yīng)用(一個進(jìn)程一個線程),單線程的進(jìn)程只能運(yùn)行在一個 CPU內(nèi)核 上,node通過創(chuàng)建多進(jìn)程來進(jìn)行實(shí)現(xiàn)多線程。

線程:由于同一進(jìn)程中的多個線程具有相同的地址空間,線程間可以直接讀寫進(jìn)程數(shù)據(jù)段(如全局變量)來進(jìn)行通信只需要進(jìn)程同步和互斥手段的輔助,保證數(shù)據(jù)的一致性。

進(jìn)程:進(jìn)程通訊需要通過系統(tǒng)IPC等方式進(jìn)行通信,遠(yuǎn)比線程間通訊開銷大。

場景二:

生產(chǎn)環(huán)境部署服務(wù)的應(yīng)用,一般會部署到兩臺以上(可以是實(shí)體機(jī)也可以是虛擬機(jī))。然后nginx負(fù)載均衡到每臺機(jī)器上。

解決方法:redis、其它數(shù)據(jù)庫。

這種情況下更適合redis

Redis數(shù)據(jù)全部存在內(nèi)存,定期寫入磁盤,當(dāng)內(nèi)存不夠時,通過LRU算法刪除數(shù)據(jù)。

Egg中redis的使用

// 配置redis
config.redis = {
  client: {
    host: 'www.yzapp.cn',
    port: 6379,
    password: process.env.NESTOR_REDIS,
    db: 0
  }
};

config.websocket = {
  // 配置 websocket 使用 redis 作消息廣播
  redis: config.redis,
};
復(fù)制代碼
import { Service } from 'egg';
const prefix = '00353:';

/**
 * 調(diào)用redis的服務(wù)
 */
export default class RedisService extends Service {
    /**
     * 根據(jù)key獲得值
     * @param key key
     */
    public async get(key: string) {
        const { redis, logger } = this.app;
        const t = Date.now();
        let data = await redis.getBuffer(prefix + key);
        if (!data) return;
        const duration = Date.now() - t;
        logger.debug('Cache', 'get', key, duration + 'ms');
        return data;
    }

    /**
     * 根據(jù)key存值
     * @param key key
     * @param value value
     */
    public async set(key: string, value: any) {
        const { redis, logger } = this.app;
        const t = Date.now();
        await redis.set(prefix + key, value);
        const duration = Date.now() - t;
        logger.debug('Cache', 'set', key, duration + 'ms');
    }

    /**
     * 根據(jù)key存值并設(shè)置過期時間
     * @param key key
     * @param value value
     * @param seconds 過期時間
     */
    public async setex(key: string, value: any, seconds: number) {
        const { redis, logger } = this.app;
        const t = Date.now();
        await redis.set(prefix + key, value, 'EX', seconds);
        const duration = Date.now() - t;
        logger.debug('Cache', 'set', key, value, duration + 'ms');
    }

    /**
     * 根據(jù)key刪除緩存
     * @param key key
     */
    public async del(key: string) {
        const { redis, logger } = this.app;
        const t = Date.now();
        await redis.del(prefix + key);
        const duration = Date.now() - t;
        logger.debug('Cache', 'del', key, duration + 'ms');
    }

    /**
     * 遞增值并設(shè)定過期時間
     * @param key
     * @param seconds
     */
    public async incr(key: string, seconds: number) {
        const { redis, logger } = this.app;
        const t = Date.now();
        const result = await redis
            .multi()
            .incr(prefix + key)
            .expire(key, seconds)
            .exec();
        const duration = Date.now() - t;
        logger.debug('Cache', 'set', key, duration + 'ms');
        return result[0][1];
    }
}
復(fù)制代碼
/**
 * 同步答題內(nèi)容
 * @param websocket
 * @param id
 */
public async syncValue(websocket: EggWsClient, id: string): Promise<string | null> {
    const uuid = uuidV4();
    // 根據(jù)interview id創(chuàng)建房間
    websocket.room.join(id, ({message}) => {
        const data = JSON.parse(message.toString());
        if (!data || !data.value || data.from === uuid) {
            return;
        }
        // 將消息發(fā)送到除自己外的其它房間成員
        websocket.send(encode.encode(data.value));
    });

    // 從數(shù)據(jù)庫里讀取
    const info = await this.findInterviewById(Number(id));
    if (info === null) {
        return `沒有找到${id}的數(shù)據(jù)`;
    }
    // 從redis讀取最新的筆試信息
    const value = await this.service.redis.get("room" + id);
    websocket
        .on('message', async (msg) => {
            // console.log('receive', msg);
            if (!ArrayBuffer.isView(msg)) {
                return;
            }
            const type = msg[0];
            switch (type) {
                case encode.encode(WSTypes.heartbeat)[0]:
                    websocket.send(this.getBuffer(WSTypes.heartbeat, encode.encode(id)));
                    return;
                case encode.encode(WSTypes.setValue)[0]:
                    // 存入redis最新的筆試信息
                    await this.service.redis.set("room" + id, msg.slice(1));
                    if (msg.slice(1)) {
                        this.app.ws.sendJsonTo(id, {
                            from: uuid,
                            value: decode.decode(this.getBuffer(WSTypes.getValue, msg.slice(1)))
                        });
                    }
                    return;
                case encode.encode(WSTypes.getValue)[0]:
                    if (!value) {
                        return;
                    }
                    websocket.send(this.getBuffer(WSTypes.getValue, value));
                    return;
            }
        })
        .on('close', (code, reason) => {
            console.log('websocket closed', code, reason);
        });
    return null;
}
復(fù)制代碼

具體代碼見:github.com/nesror/biki…

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容