用Python操作nanomsg(三)——PubSub

上篇文章:用Python操作nanomsg(二)——PipeLine講了PipeLine模式下的應用并分析了其優(yōu)缺點,顯然像PipeLine這種必須兩頭都連接上才能完成一次消息傳遞的模式并不適用于大部分場景,如果做一個消息訂閱器,難道消息發(fā)布者非得等到有人連接了才能繼續(xù)下一條消息的發(fā)送?顯然這是不可取的,為此本文將探討更加符合消息訂閱的一種通信模式——PubSub。

PubSub

PubSub,顧名思義就是Publish(發(fā)布)和Subscribe(訂閱)。

在這個模式下一個Pub Node可以被多個Sub Node連接,Pub Node發(fā)布的消息是不阻塞的:

你Sub Node沒在線的話沒收到就沒收到,它不管你

但是如果Sub Node在線,則一定不會錯過

另外PubSub中的一個特性是,Sub Node是可以指定消息前綴來過濾要接收的信息,也就是”訂閱“,默認這個消息前綴是"",此時默認接收所有消息。下面來看PubSub的基本用法。

PubSub基本用法

Pub Node的用法與PipeLine中的Push Node沒有太大區(qū)別,只是在新建對象時候要指定protocol為nnpy.PUB

# 2 create a object
pub_server = nnpy.Socket(nnpy.AF_SP, nnpy.PUB)
# 3 establish a sever to push message
pub_server.bind('tcp://*:4000')

Sub Node的用法也與PipeLine中的Pull Node相差不大,除了指定protocol為nnpy.SUB外,還需要進行一次訂閱設置,使用setsockopt()方法,level賦為nnpy.SUB,option賦為nnpy.SUB_SUBSCRIBE,value賦你想要的消息前綴:

# 2 create a object
sub_client = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
# subscribe message to receive by prefix, default is '' and receive all messages
sub_client.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, '')
# 3 establish a sever to push message
sub_client.connect('tcp://127.0.0.1:4000')

SUBSCRIBE對消息循環(huán)的影響

首先我們?yōu)榱朔奖悖燃s定消息前綴的定義規(guī)范,就是要以英文冒號":"結束,然后Pub Node發(fā)布消息時也要在消息前綴和消息本體之間用英文冒號":"進行間隔:

假設消息前綴為roleA,則Sub Node啟動時,參數(shù)--subscribe要設置為roleA:;

而Pub Node發(fā)送消息你還好嗎?時,則要在輸入框中輸入roleA:你還好嗎?。

OK,前面提到Sub Node通過指定消息前綴來接收消息,實際使用中發(fā)現(xiàn):

recv_data = sub_client.recv()這行代碼中就已經(jīng)有了對消息前綴的判斷,如果不符合消息前綴的比較是不會結束函數(shù)往下走的,如果往下走了那就是接受的消息已經(jīng)符合了你的消息內(nèi)容。

這就有意思了:

我們知道recv_data此刻還是二進制數(shù)據(jù),我們并沒有告訴nanomsg我們使用了何種編碼方式轉(zhuǎn)換成的二進制(這里用的是utf-8),但是sub_client.recv()卻能夠在:

不知道編碼格式的二進制數(shù)據(jù)我們給的前綴字符串 之間進行內(nèi)容判斷,尚且不知道怎么做到的,反正在sub_client.recv()一波神操作知曉了里面內(nèi)容后,我們還是要乖乖對recv_data進行解碼:

if recv_data:
    decoded_data = recv_data.decode(config.DATA_ENCODING)
    # 3 process received data
    # remove the subscribe prefix
    decoded_data = decoded_data[len(arguments.subscribe):]

因為已經(jīng)確定了頭部包含消息前綴,所以直接使用字符串截斷即可分離出消息本體,后面對消息本體的操作就跟之前的一樣了。

改進LAN-chat Program

將上述改動應用到前文中的LAN-chat Program,并完善config.py中的常量、變量后可以得到一個”消息訂閱模式“的程序原型,我們在WSL中測試一下。

測試

在IDE中打開Terminal,輸入bash啟動Linux子系統(tǒng):

啟動WSL

這里使用tmux進行終端復用,沒有安裝的話輸入命令:

sudo apt-get install tmux

tmux(terminal multiplexer)是Linux上的終端復用神器,可從一個屏幕上管理多個終端(準確說是偽終端)。使用該工具,用戶可以連接或斷開會話,而保持終端在后臺運行。類似的工具還有screen,個人對這二者的使用感受是,用過tmux就再也不想用screen了。(引用自Guanglin

進來以后我們用快捷鍵Ctrl + Shift + '將窗格最大化方便查看(再次按下Ctrl + Shift + '時可恢復原大?。琂etBrains大法好?。?/p>

最大化后我們輸入tmux

輸入tmux進入復用界面

這時候tmux只有一個終端,我們可以通過快捷鍵將其復用成多個獨立的終端窗格:


分成了0、1、2三個終端

三個窗格分別設置為:

窗格號 角色 配置 說明
0 Publish Node bind tcp *:4000 發(fā)布
1 Subscribe Node connect tcp 127.0.0.1:4000 --subscribe "roleA:" 訂閱"roleA:"
2 Subscribe Node connect tcp 127.0.0.1:4000 --subscribe "roleB:" 訂閱"roleB:"
分別啟動1個pub node和2個sub node

接下來測試發(fā)送如下消息:

驗證序號 內(nèi)容 響應對象 驗證內(nèi)容
1 你好,你們在嗎? 普通消息體
2 roleA:你在哪里? pannel 1 消息前綴消息體
3 roleB:告訴我A的事情。 pannel 2 消息前綴體
4 roleA:client-offline-now pannel 1 帶消息前綴的下線指令
5 roleB:client-offline-now pannel 2 帶消息前綴的下線指令
6 server-exit-now pannel 0 自下線指令
驗證內(nèi)容1

結果:正常,沒有符合的消息前綴,故A、B都沒有接收該消息。


驗證內(nèi)容2、3、4、5、6

結果:正常,相應消息前綴的消息都被訂閱者正確接收,包括對訂閱者和對發(fā)布者的指令也能正確執(zhí)行。

完整實現(xiàn)

GitHub

(尚未上傳)

Raw Files

以下基于3327f4cb時刻本地git 版本“PubSub v0.1.0 releases”

chat.py

# _*_coding:utf-8 _*_
# @Time    : 2020/2/5 18:41
# @Author  : Shek 
# @FileName: chat.py
# @Software: PyCharm
import argparse
from module.func import *
import config

parser = argparse.ArgumentParser(description=config.PROGRAM_DESCRIPTION)
subparsers = parser.add_subparsers()

# command 'bind'
cmd_bind = subparsers.add_parser('bind', help=config.H_BIND)
cmd_bind.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_BIND_PROTOCOL)
cmd_bind.add_argument('addr', action='store', nargs='?', default=config.BIND_ADDR, help=config.H_BIND_ADDR)
cmd_bind.set_defaults(func=sub_cmd_bind)

# command 'connect'
cmd_connect = subparsers.add_parser('connect', help=config.H_CONNECT)
cmd_connect.add_argument('protocol', action='store', nargs='?', default=config.PROTOCOL, help=config.H_CONNECT_PROTOCOL)
cmd_connect.add_argument('addr', action='store', nargs='?', default=config.CONNECT_ADDR, help=config.H_CONNECT_ADDR)
cmd_connect.add_argument('--subscribe', action='store', nargs='?', default=config.CONNECT_SUBSCRIBE,
                         help=config.H_CONNECT_SUBSCRIBE)
cmd_connect.add_argument('--keep-alive', action='store_true', help=config.H_CONNECT_KEEP_ALIVE)
cmd_connect.set_defaults(func=sub_cmd_connect)

args = parser.parse_args()  # 處理輸入的參數(shù)
if not hasattr(args, 'func'):
    # 無參數(shù)時跳轉(zhuǎn)到-h,否則會提示 namespace object has not attribute 'func',故這里用hasattr()判斷
    args = parser.parse_args(['-h'])
args.func(args)  # 跳轉(zhuǎn)到對應的函數(shù)

config.py

# _*_coding:utf-8 _*_
# @Time    : 2020/2/5 8:59
# @Author  : Shek 
# @FileName: config.py
# @Software: PyCharm

# universal configuration
DATA_ENCODING = 'utf-8'
PROTOCOL = 'tcp'
HOST_BIND = '*'
HOST_CONNECT = '127.0.0.1'
PORT = '4000'
CONNECT_ADDR = '{}:{}'.format(HOST_CONNECT, PORT)
BIND_ADDR = '{}:{}'.format(HOST_BIND, PORT)
CONNECT_SUBSCRIBE = ''
# program configuration
NN_MODE = 'PUB/SUB'
PROGRAM_URL = 'http://github.com/YourGithub/RepositoryAddress'
PROGRAM_DESCRIPTION = 'A {} LAN-chat program written in Python3 {}'.format(NN_MODE, PROGRAM_URL)
ROLE_NAME_PUSH = 'push'
ROLE_NAME_PULL = 'pull'
ROLE_NAME_PUB = 'pub'
ROLE_NAME_SUB = 'sub'
# info text
I_OP_SUCCESS = 'success'
I_OP_FAILED = 'failed'
I_KEEP_ALIVE_ENABLED = 'automatically reconnect enabled'

I_SUBSCRIBE_ALL = 'configured to receive all message'
I_SUBSCRIBE_SPECIFIC_PREFIX = 'configured to receive message start with:'
# help text
H_BIND = 'bind server'
H_BIND_PROTOCOL = 'communicate protocol'
H_BIND_ADDR = '<host>:<port>'
H_CONNECT = 'connect to a server'
H_CONNECT_PROTOCOL = 'communicate protocol'
H_CONNECT_ADDR = '<host>:<port>'
H_CONNECT_KEEP_ALIVE = 'automatically reconnect when corrupted'
H_CONNECT_SUBSCRIBE = 'subscribe the message that you are interested in'
# OneWayPipe DEFAULT CONFIGURATION
# count
COUNT_SEND_SUCCESS = 0
COUNT_SEND_FAILED = 0
# flag
FLAG_CLIENT_OFFLINE = 'client-offline-now'
FLAG_SERVER_EXIT = 'server-exit-now'
# log text
L_CLIENT_CTRL_C = 'closing...'
L_CLIENT_FLAG_OFFLINE_DETECTED = 'offline flag received from server, closing...'
L_CLIENT_CLOSED = 'client closed'

L_SERVER_EXIT = 'closing...'
L_SERVER_CLOSED = 'server closed'
L_SERVER_SEND_FAILED_PREFIX = 'SEND FAILED'

func.py

# _*_coding:utf-8 _*_
# @Time    : 2020/2/6 11:42
# @Author  : Shek 
# @FileName: func.py
# @Software: PyCharm
import nnpy
import time
import datetime
import config
from module import logger


def current_datetime():
    return datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')


def sub_cmd_bind(arguments):
    # 1 initialize a logger
    log = logger.Logger(config.ROLE_NAME_PUB)
    # 2 create a object
    pub_server = nnpy.Socket(nnpy.AF_SP, nnpy.PUB)
    # 3 establish a sever to push message
    log.info('binding to {}://{} ...'.format(arguments.protocol, arguments.addr))
    result = pub_server.bind('{}://{}'.format(arguments.protocol, arguments.addr))
    # bind status
    log.info('success') if result else log.info('failed') and exit(0)

    # 4 push loop
    time.sleep(0.5)
    while True:
        content = input('Send({})>'.format(config.COUNT_SEND_SUCCESS))
        # process input message/command
        if content == config.FLAG_SERVER_EXIT:
            # exit command caught, break loop
            log.info(config.L_SERVER_EXIT)
            break
        else:
            # send message/data/command
            send_result = pub_server.send(bytes(content, encoding=config.DATA_ENCODING))
            if send_result:  # success
                config.COUNT_SEND_SUCCESS += 1
            else:  # failed (warning: in push / pull mode will never reach here)
                config.COUNT_SEND_FAILED += 1
                log.warning('{}:{}'.format(config.L_SERVER_SEND_FAILED_PREFIX, content))
    # 5 close server
    pub_server.close()
    log.info(config.L_SERVER_CLOSED)


def sub_cmd_connect(arguments):
    # 1 initialize a logger
    log = logger.Logger(config.ROLE_NAME_SUB)
    # 2 create a object
    sub_client = nnpy.Socket(nnpy.AF_SP, nnpy.SUB)
    # subscribe message to receive by prefix, default is '' and receive all messages
    sub_client.setsockopt(nnpy.SUB, nnpy.SUB_SUBSCRIBE, arguments.subscribe)
    if arguments.subscribe == '':
        log.info(config.I_SUBSCRIBE_ALL)
    else:
        log.info('{}{}'.format(config.I_SUBSCRIBE_SPECIFIC_PREFIX, str(arguments.subscribe)))
    # keep-alive: not completed yet
    if arguments.keep_alive:
        log.info(config.I_KEEP_ALIVE_ENABLED)
    # 3 connect to a server for receiving message
    log.info('connecting to {}://{}'.format(arguments.protocol, arguments.addr))
    result = sub_client.connect('{}://{}'.format(arguments.protocol, arguments.addr))
    # connect status
    log.info(config.I_OP_SUCCESS) if result else log.info(config.I_OP_FAILED) and exit(0)

    # 4 receive in loop
    time.sleep(0.5)
    while True:
        try:
            recv_data = sub_client.recv()
            if recv_data:
                decoded_data = recv_data.decode(config.DATA_ENCODING)
                # 3 process received data
                # remove the subscribe prefix
                decoded_data = decoded_data[len(arguments.subscribe):]
                # receive a go-offline flag from server, break loop
                if decoded_data == config.FLAG_CLIENT_OFFLINE:
                    log.info(config.L_CLIENT_FLAG_OFFLINE_DETECTED)
                    break
                # display message push by server
                print('{}|{}'.format(current_datetime(), decoded_data))
                # logging to text file
                log.debug(decoded_data)
        except KeyboardInterrupt:
            # ctrl + c detected
            log.info(config.L_CLIENT_CTRL_C)
            break

    # 5 close client
    sub_client.close()
    log.info(config.L_CLIENT_CLOSED)

總結

PubSub的消息機制非常適用于消息訂閱、消息發(fā)布之類的應用,雖然不能保證下線的用戶收到所有消息,但因為其發(fā)布消息時不是阻塞的,可以有更大的余地通過其他手段彌補,例如建立緩存數(shù)據(jù)庫向未接收消息的用戶繼續(xù)廣播,也可以建立expire機制精確地投送訂閱消息。

下一章我們將討論nanomsg的Pair機制,本系列其他文章:

內(nèi)容 文章地址 說明
準備 用Python操作nanomsg(一)——準備 2020.2.7更新
PipeLine 用Python操作nanomsg(二)——PipeLine 2020.2.7更新
Pair 用Python操作nanomsg(四)——Pair 未開始
ReqRep 用Python操作nanomsg(五)——ReqRep 未開始
Survey 用Python操作nanomsg(六)——Survey 未開始
Bus 用Python操作nanomsg(七)——Bus 未開始
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容