Odoo 即時(shí)通訊(IM)的實(shí)現(xiàn)方法

背景介紹

Odoo 是最好的開源企業(yè)應(yīng)用系統(tǒng),沒有之一。雖然有很多技術(shù)落伍了,但是經(jīng)過近20年的持續(xù)發(fā)展,Odoo積累了所有企業(yè)運(yùn)營(yíng)所需要的所有軟件;即使沒有提供企業(yè)所需要的功能,它本身還是一個(gè)完整的開發(fā)平臺(tái),可以開發(fā)出你能想象的應(yīng)用,當(dāng)然不會(huì)開發(fā)也沒有關(guān)系,可以盡情參考已經(jīng)開源的模塊是如何做到的。

這里就講一講 Odoo 如何使用‘過時(shí)的’技術(shù)實(shí)現(xiàn)的即時(shí)通訊,通過對(duì)即時(shí)通訊的支持,Odoo可以讓企業(yè)內(nèi)部人員可以通過 Odoo 進(jìn)行實(shí)時(shí)溝通,也可以讓企業(yè)內(nèi)部人員和網(wǎng)站客戶進(jìn)行實(shí)時(shí)溝通;同時(shí) Odoo 將即時(shí)通訊的消息與郵件和 Robot 整合,為營(yíng)銷自動(dòng)化和服務(wù)智能化乃至應(yīng)用智能化交互提供了基礎(chǔ)。

核心技術(shù)

通過數(shù)據(jù)庫(kù)消息隊(duì)列

先說最重要的,Odoo 即時(shí)通訊使用了 PostgreSQL 數(shù)據(jù)庫(kù)的 listen 和 notify 的機(jī)制完成。這個(gè)機(jī)制是 PostgreSQL 數(shù)據(jù)庫(kù)私有的,其它數(shù)據(jù)庫(kù)未必支持。所以要用 Odoo 是必須要用 PostgreSQL,這是原因之一。參考這里可以了解更多關(guān)于 PostgreSQL listen notify 的信息。

使用 listen 和 notify 可以讓連接數(shù)據(jù)庫(kù)的各個(gè)客戶端之間進(jìn)行實(shí)時(shí)通訊。

通過長(zhǎng)連接

連接數(shù)據(jù)庫(kù)的客戶端不是 Odoo 的客戶端,數(shù)據(jù)庫(kù)的客戶端實(shí)際上是 Odoo 的服務(wù)端,是 Python 代碼連接 數(shù)據(jù)庫(kù);而 Odoo 客戶端是通過 Javascript 實(shí)現(xiàn)的 Web 應(yīng)用,它通過長(zhǎng)連接方式與 Odoo 后臺(tái)保持信息的實(shí)時(shí)性。長(zhǎng)連接的鏈接地址是 longpolling/poll ,Odoo 客戶端會(huì)發(fā)起這個(gè)連接請(qǐng)求,Odoo 服務(wù)端處理這個(gè)請(qǐng)求,如果有這個(gè)請(qǐng)求關(guān)注的 Channels 的消息,那么這個(gè)請(qǐng)求就會(huì)立即返回,如果沒有消息,這個(gè)連接會(huì)保持 TIMEOUT 秒,目前 TIMEOUT 是50秒。Channels 就是會(huì)話標(biāo)記,可以理解為一個(gè)聊天室、一個(gè)群等等,客戶 poll 數(shù)據(jù)的時(shí)候要寫上它關(guān)注的 Channels。

異步處理

如果很多用戶同時(shí)使用 Odoo,那么 Odoo 為每個(gè)客戶保持一個(gè)連接,這是無(wú)疑問的,因?yàn)闆]有連接就沒有辦法讀寫數(shù)據(jù);但是每個(gè)連接是在一個(gè)線程里面呢,還是多個(gè)呢?簡(jiǎn)單說,Odoo 只為 longpolling 維護(hù)了一個(gè)線程或者一個(gè)進(jìn)程(gevent),如果你啟動(dòng) Odoo 的時(shí)候使用了 worker 參數(shù),就意味這 Odoo 要以多進(jìn)程方式運(yùn)作,如果沒有指定 woker 就是多線程方式,如果你啟動(dòng)的是線程模式,longpolling 將是一個(gè)線程,如果你啟動(dòng)的是 worker (進(jìn)程)模式那么 Odoo 會(huì)通過 Popen 一個(gè)全新進(jìn)程,這個(gè)全新進(jìn)程的命令行 加上 gevent,很怪異吧,確實(shí)就是這么干的。

    def long_polling_spawn(self):
        nargs = stripped_sys_argv()
        cmd = [sys.executable, sys.argv[0], 'gevent'] + nargs[1:]
        popen = subprocess.Popen(cmd)
        self.long_polling_pid = popen.pid

把原來的命令行插入一個(gè) gevent,再啟動(dòng)一遍。當(dāng)然后續(xù)的代碼會(huì)判斷如果是以 gevent 啟動(dòng)命令的,這是要啟動(dòng) longpolling。

gevent 在 Python 3 asyncio 的大環(huán)境下是個(gè)過時(shí)的技術(shù)了,它使用了 Monkey Patch 的方式對(duì) Python 庫(kù)進(jìn)行了異步化,感覺代碼的書寫方式還是一樣,但是已經(jīng)異步化了。好處是代碼在沒有 gevent 的時(shí)候可以同步跑,引入 gevent 后不用改變代碼邏輯就可以異步化。有人會(huì)問,異步化有啥好處???異步化可以讓 Odoo 同時(shí)處理多個(gè)連接,就這么簡(jiǎn)單,如果沒有異步化,一個(gè)連接就占用了 Odoo,別的連接進(jìn)不來,解決這個(gè)問題的老方法是啟動(dòng)更多進(jìn)程,但是進(jìn)程的方式太重了,隨著互聯(lián)網(wǎng)服務(wù)的普及,開發(fā)人員發(fā)現(xiàn)實(shí)際上只需要維護(hù) I/O 并不需要啟動(dòng)多個(gè)進(jìn)程或者多個(gè)進(jìn)程,只需要維護(hù)好文件描述符,并且能夠正確發(fā)現(xiàn)這些描述符什么時(shí)候該讀什么時(shí)候該寫。selelct,poll,epoll一步一步把異步I/O的性能榨干了最后一滴血。

在 Python 2 的時(shí)候,Python 沒有內(nèi)置異步 I/O 的功能,所以 gevent,Tornado 都是解決 Python 異步 I/O 問題的。Odoo 使用了 gevent,當(dāng) longpolling 服務(wù)正在服務(wù)一個(gè)客戶端的時(shí)候,也沒有任何消息給這個(gè)客戶端,那么這個(gè)客戶端將保持連接 50 秒,這時(shí)候 longpolling 服務(wù)端會(huì)把基于這個(gè)連接的處理 wait 讓出 CPU,讓其它連接能進(jìn)來。 當(dāng)這個(gè)連接的文件描述符準(zhǔn)備好讀寫的時(shí)候 Odoo 通過 select 調(diào)用得以了解。然后會(huì)通過 Thread 或者 gevent 的 Event 通知等待的客戶連接。

   def loop(self):
        """ Dispatch postgres notifications to the relevant polling threads/greenlets """
        _logger.info("Bus.loop listen imbus on db postgres")
        with odoo.sql_db.db_connect('postgres').cursor() as cr:
            conn = cr._cnx
            cr.execute("listen imbus")
            cr.commit();
            while True:
                if select.select([conn], [], [], TIMEOUT) == ([], [], []):
                    pass
                else:
                    conn.poll()
                    channels = []
                    while conn.notifies:
                        channels.extend(json.loads(conn.notifies.pop().payload))
                    # dispatch to local threads/greenlets
                    events = set()
                    for channel in channels:
                        events.update(self.channels.pop(hashable(channel), set()))
                    for event in events:
                        event.set()

上邊的這段代碼在 bus 模塊里面,Odoo 只有一個(gè)線程或者 gevent 程序去無(wú)差別的listen 系統(tǒng)所有的 imbus 上的消息,notify imbus 的消息都會(huì)讓 select 返回準(zhǔn)備好的文件描述符(不是空的,所以就不會(huì)等于 ([],[],[])),無(wú)差別就是它不判斷 Channel,而每個(gè)客戶端是需要關(guān)注 Channel的,所以這是系統(tǒng)級(jí)別的不是用戶級(jí)別的,它取出數(shù)據(jù)后通過 event set 來通知那些 wait 在具體 event (關(guān)聯(lián)了 Channel)上的客戶。

Channels 的 Overload

每次 longpolling 的 poll 請(qǐng)求都要帶上這個(gè)用戶想要關(guān)注的 channels,而 用戶怎么知道自己要 polling 什么channels呢?

Channels 一般來自兩種可能,一個(gè)是同一種應(yīng)用導(dǎo)致的會(huì)話數(shù)量的增加,比如在線客服,每個(gè)新訪客都有可能跟 Odoo 的用戶建立一個(gè) Channel 就是會(huì)話,這樣就會(huì)有很多會(huì)話。

還有一種可能就是,Odoo 有很多應(yīng)用,每個(gè)應(yīng)用都會(huì)有自己建立或者判斷 Channel 的方式,在線客服是 Odoo 的一個(gè)應(yīng)用,CRM 也是一個(gè)應(yīng)用,每個(gè)應(yīng)用對(duì) Channel 的標(biāo)記和維護(hù)方法各不相同,一般是一個(gè)元組 (db,table,id) 再 hashable 或者文本化一下,就變成字符串,作為 Channel 的唯一標(biāo)記,具體有多少個(gè)這樣的 Channels 也是存儲(chǔ)在各自應(yīng)用的表里面。所以 bus 應(yīng)用的 Controller 提供了一個(gè)可以 Overload 的機(jī)會(huì)來修改 Channels,就是 _load。

    # override to add channels
    def _poll(self, dbname, channels, last, options):
        # update the user presence
        if request.session.uid and 'bus_inactivity' in options:
            request.env['bus.presence'].update(options.get('bus_inactivity'))
        request.cr.close()
        request._cr = None        
        return dispatch.poll(dbname, channels, last, options)

它輕描淡寫的注釋暴露了它存在的意義。

再看 mail 應(yīng)用下的 controller 對(duì)這個(gè)函數(shù)的 overload。

    # --------------------------
    # Extends BUS Controller Poll
    # --------------------------
    def _poll(self, dbname, channels, last, options):
        if request.session.uid:
            partner_id = request.env.user.partner_id.id

            if partner_id:
                channels = list(channels)       # do not alter original list
                for mail_channel in request.env['mail.channel'].search([('channel_partner_ids', 'in', [partner_id])]):
                    channels.append((request.db, 'mail.channel', mail_channel.id))
                # personal and needaction channel
                channels.append((request.db, 'res.partner', partner_id))
                channels.append((request.db, 'ir.needaction', partner_id))
        return super(MailChatController, self)._poll(dbname, channels, last, options)

把在 mail (就是討論應(yīng)用)中需要的 channels 都圈出來提供給 bus 應(yīng)用去處理。

這里面又學(xué)到一個(gè) Odoo 的知識(shí),如何搜索 many2many 的字段 (channel partner ids),因?yàn)?many2many 是 Odoo 加了一個(gè)中間表實(shí)現(xiàn)的,沒看過這段代碼還不知道咋搜索呢。這里面的 channel partner ids 是在 mail channel 中對(duì)應(yīng)的 partner id,在 res partner 表中也有 partner 對(duì)應(yīng)的 mail channel。這是一個(gè)多對(duì)多的關(guān)系,一個(gè) mail channel 可以含有多個(gè) partner,一個(gè)partner 可以在多個(gè) mail channel 中,這很自然,人可以在很多對(duì)話中,對(duì)話中含有很多人 。但是如果你想搜索哪些對(duì)話中含有我這個(gè)人的怎么搜?

其它

Odoo 的即時(shí)通訊幾乎都在 bus 這個(gè) addon 下面,但是在odoo 全局的代碼中也有很多配合的 code,比如上文提到的 gevent 命令行;還有更加復(fù)雜的部分,就是 WSGI 和 數(shù)據(jù)連接的處理部分,由于 longpolling 同時(shí)重用了普通 httprequest 和數(shù)據(jù)庫(kù)運(yùn)行環(huán)境 (registry,Enviroments,Enviroment,cusor),這段代碼比較亂,不如 addon 里面的結(jié)構(gòu)清晰,當(dāng)然可能也是為了讓 addon 結(jié)構(gòu)清晰,不得不做出的妥協(xié)。值得說明的是,當(dāng) longpolling 的請(qǐng)求來的時(shí)候,WSGI 請(qǐng)求自帶的 Odoo 數(shù)據(jù)庫(kù)執(zhí)行環(huán)境會(huì)被拋棄,而是每次請(qǐng)求重新再次建立:

event.wait(timeout=timeout)
with registry.cursor() as cr:
  env = api.Environment(cr, SUPERUSER_ID, {})
  notifications = env['bus.bus'].poll(channels, last, options)

讓我們知道了 Odoo 如何每次建立數(shù)據(jù)環(huán)境。如果不是每次建立環(huán)境那么這里的數(shù)據(jù)操作別的客戶不會(huì)同時(shí)發(fā)現(xiàn)的。

通過分析 Odoo 的 IM 實(shí)現(xiàn)過程可以看出 Odoo 的技術(shù)的確有點(diǎn)過時(shí)了,跟蹤的不夠猛。因?yàn)?Python 3 已經(jīng)支持 syncio 了,關(guān)于 asyncio 可以讀讀這個(gè) blog

如果通過 asyncio 去實(shí)現(xiàn),我的思路是在 asyncio 中加入 postgresql connection 的描述符,就是上邊用來select 的,watching 這個(gè)描述符。當(dāng)有數(shù)據(jù)的時(shí)候 callback 就會(huì)運(yùn)行,再去通過 asyncio 的 locks 中的 Event 去 set()。用 asyncio.wait_for(event.wait(), timeout) 來響應(yīng)用戶的請(qǐng)求,用戶的 HTTP 請(qǐng)求就會(huì)被阻塞直到 Event 被 set 或者超時(shí),而 CPU 會(huì)被讓出,完美。

`loop.``add_reader`(*fd*, *callback*, **args*)

Start monitoring the *fd* file descriptor for read availability and invoke *callback* with the specified arguments once *fd* is available for reading.

這樣就用原生的 Python 3 解決了,不需要引入 gevent,也不需要引入異步的 PostgreSQL Python 庫(kù),重用原來的 psycopg2 阻塞庫(kù)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • # Python 資源大全中文版 我想很多程序員應(yīng)該記得 GitHub 上有一個(gè) Awesome - XXX 系列...
    小邁克閱讀 3,120評(píng)論 1 3
  • 必備的理論基礎(chǔ) 1.操作系統(tǒng)作用: 隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口。 管理調(diào)度進(jìn)程,并將多個(gè)進(jìn)程對(duì)硬件...
    drfung閱讀 3,745評(píng)論 0 5
  • 一. 操作系統(tǒng)概念 操作系統(tǒng)位于底層硬件與應(yīng)用軟件之間的一層.工作方式: 向下管理硬件,向上提供接口.操作系統(tǒng)進(jìn)行...
    月亮是我踢彎得閱讀 6,144評(píng)論 3 28
  • 心好煩 總是胡思亂想 也不知道在煩什麼 多麼希望人生可以重來 從新活一次 我想重新開始 我很想我的讀書有同學(xué)可以一...
    wgzmb閱讀 128評(píng)論 0 1
  • 賣參閱讀 439評(píng)論 0 0

友情鏈接更多精彩內(nèi)容