Python網(wǎng)絡(luò)編程筆記(六):服務(wù)器架構(gòu)

網(wǎng)絡(luò)服務(wù)面臨兩個基本問題:

  1. 編碼:編寫出正確處理請求和響應(yīng)的代碼。
  2. 部署:使用守護進程,活動日志持久化存儲,防止各種失敗或者在失敗后立即重啟。

部署

  • 服務(wù)地址:硬編碼IP、DNS解析、在服務(wù)前端配置負(fù)載均衡器。
  • 兩種部署思路:
    • 每個服務(wù)器程序編寫服務(wù)的所有功能:成為unix守護進程或者windows服務(wù),安排系統(tǒng)級的日志,支持配置文件,提供啟動、關(guān)閉、重啟功能。這些功能不一定自己編寫,也使用第三方庫。
    • 服務(wù)器程序只實現(xiàn)最小功能,只是一個普通的前臺程序,而不是守護進程。一般重環(huán)境變量中獲得配置。不過可以添加框架使思路二變成思路一的守護進程。Paas服務(wù)即是如此,重復(fù)的功能平臺方搞定。
    • 推薦supervisord

參考:十二要素應(yīng)用宣言

單線程服務(wù)器

以一個簡單的TCP協(xié)議為例:
即使 listen()的參數(shù)大于0,第一個對話未完成時,第二個對話仍舊會在隊列中,只是減少了切換時建立連接的時間。

缺點:

  • 拒絕服務(wù)攻擊
  • 嚴(yán)重浪費CPU資源,因為在等待客戶端時,服務(wù)端什么都不能做。

tips:性能測試使用 trace 模塊:python -m trace -tg --ignore-dir=/usr main.py

多線程與多進程服務(wù)器

利用操作系統(tǒng)的多路復(fù)用??梢詣?chuàng)建多個共享相同內(nèi)存的線程,或者完全獨立的進程。

優(yōu)點:簡單,復(fù)用單線程代碼。

缺點:

  • 服務(wù)器同時通信的客戶端數(shù)量受OS限制。
  • 即使某個客戶端空閑或者運行緩慢,仍會獨占線程或進程。
  • 大量客戶端時,上下文切換成本很大。

模塊:threadingmultiprocessing

異步服務(wù)器

利用服務(wù)端向客戶端發(fā)送響應(yīng)后等待下一次響應(yīng)的時間。

異步(asyhchronous),表示從不停下來,區(qū)別于同步(synchronized)

操作系統(tǒng)級異步

傳統(tǒng)的 select(), 后續(xù) Linux 的 poll() 和 BSD 的 epoll()

看下面這段簡單的異步代碼:精髓在于自己設(shè)計數(shù)據(jù)結(jié)構(gòu)保存客戶端狀態(tài),而不依賴操作系統(tǒng)的上下文切換。

#!/usr/bin/env python3
# Foundations of Python Network Programming, Third Edition
# https://github.com/brandon-rhodes/fopnp/blob/m/py3/chapter07/srv_async.py
# Asynchronous I/O driven directly by the poll() system call.

# zen_utils 是自己編寫的處理各種業(yè)務(wù)邏輯的包
import select, zen_utils

# 兩層循環(huán),while 不斷調(diào)用 poll(), 針對poll返回的不通事件再循環(huán)
# 為了代碼簡潔,用生成器寫
def all_events_forever(poll_object):
    while True:
        for fd, event in poll_object.poll():
            yield fd, event

def serve(listener):
    # 維護 sockets 字典和 address 字典
    sockets = {listener.fileno(): listener} 
    addresses = {}
    # 要接受和要發(fā)送的緩存字典。這四個字典是核心。
    bytes_received = {}
    bytes_to_send = {}

    poll_object = select.poll()
    # 監(jiān)聽套接字始終在 sockets 字典里,且狀態(tài)始終未 POLLIN
    poll_object.register(listener, select.POLLIN)

    for fd, event in all_events_forever(poll_object):
        sock = sockets[fd]

        # Socket closed: remove it from our data structures.
        # 出錯、關(guān)閉、異常等

        if event & (select.POLLHUP | select.POLLERR | select.POLLNVAL):
            address = addresses.pop(sock)
            rb = bytes_received.pop(sock, b'')
            sb = bytes_to_send.pop(sock, b'')
            if rb:
                print('Client {} sent {} but then closed'.format(address, rb))
            elif sb:
                print('Client {} closed before we sent {}'.format(address, sb))
            else:
                print('Client {} closed socket normally'.format(address))
            poll_object.unregister(fd)
            del sockets[fd]

        # New socket: add it to our data structures.
        # 監(jiān)聽套接字,accept

        elif sock is listener:
            sock, address = sock.accept()
            print('Accepted connection from {}'.format(address))
            sock.setblocking(False)     # force socket.timeout if we blunder
            sockets[sock.fileno()] = sock
            addresses[sock] = address
            poll_object.register(sock, select.POLLIN)

        # Incoming data: keep receiving until we see the suffix.
        # POLLIN狀態(tài),recv()

        elif event & select.POLLIN:
            more_data = sock.recv(4096)
            if not more_data:  # end-of-file
                sock.close()  # next poll() will POLLNVAL, and thus clean up
                continue
            data = bytes_received.pop(sock, b'') + more_data
            if data.endswith(b'?'):
                bytes_to_send[sock] = zen_utils.get_answer(data)
                poll_object.modify(sock, select.POLLOUT)
            else:
                bytes_received[sock] = data

        # Socket ready to send: keep sending until all bytes are delivered.
        # POLLOUT狀態(tài),send

        elif event & select.POLLOUT:
            data = bytes_to_send.pop(sock)
            n = sock.send(data)
            if n < len(data):
                bytes_to_send[sock] = data[n:]
            else:
                poll_object.modify(sock, select.POLLIN)

if __name__ == '__main__':
    address = zen_utils.parse_command_line('low-level async server')
    listener = zen_utils.create_srv_socket(address)
    serve(listener)

回調(diào)風(fēng)格的 asynio

把 select 調(diào)用的細節(jié)隱藏起來。

通過對象實例來維護每個打開的客戶端鏈接,使用對象的方法調(diào)用。

#!/usr/bin/env python3
# Foundations of Python Network Programming, Third Edition
# https://github.com/brandon-rhodes/fopnp/blob/m/py3/chapter07/srv_asyncio1.py
# Asynchronous I/O inside "asyncio" callback methods.

import asyncio, zen_utils

# 一個對象實例維護一個客戶端鏈接
class ZenServer(asyncio.Protocol):

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.data = b''
        print('Accepted connection from {}'.format(self.address))

    def data_received(self, data):
        self.data += data
        if self.data.endswith(b'?'):
            answer = zen_utils.get_answer(self.data)
            # 響應(yīng)通過 self.transport.write() 即可
            self.transport.write(answer)
            self.data = b''

    def connection_lost(self, exc):
        if exc:
            print('Client {} error: {}'.format(self.address, exc))
        elif self.data:
            print('Client {} sent {} but then closed'
                  .format(self.address, self.data))
        else:
            print('Client {} closed socket'.format(self.address))

if __name__ == '__main__':
    address = zen_utils.parse_command_line('asyncio server using callbacks')
    loop = asyncio.get_event_loop()
    coro = loop.create_server(ZenServer, *address)
    server = loop.run_until_complete(coro)
    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.close()

協(xié)程風(fēng)格的 asyncio

協(xié)程(coroutine)是一個函數(shù),在進行IO操作是不會阻塞,而是暫停,將控制權(quán)轉(zhuǎn)移回調(diào)用方。python支持協(xié)程的標(biāo)準(zhǔn)形式就是生成器 yield。

#!/usr/bin/env python3
# Foundations of Python Network Programming, Third Edition
# https://github.com/brandon-rhodes/fopnp/blob/m/py3/chapter07/srv_asyncio2.py
# Asynchronous I/O inside an "asyncio" coroutine.

import asyncio, zen_utils

@asyncio.coroutine
def handle_conversation(reader, writer):
    address = writer.get_extra_info('peername')
    print('Accepted connection from {}'.format(address))
    while True:
        data = b''
        while not data.endswith(b'?'):
            # 注意 yield from 
            more_data = yield from reader.read(4096)
            if not more_data:
                if data:
                    print('Client {} sent {!r} but then closed'
                          .format(address, data))
                else:
                    print('Client {} closed socket normally'.format(address))
                return
            data += more_data
        answer = zen_utils.get_answer(data)
        writer.write(answer)

if __name__ == '__main__':
    address = zen_utils.parse_command_line('asyncio server using coroutine')
    loop = asyncio.get_event_loop()
    coro = asyncio.start_server(handle_conversation, *address)
    server = loop.run_until_complete(coro)
    print('Listening at {}'.format(address))
    try:
        loop.run_forever()
    finally:
        server.close()
        loop.close()

完美方案

異步的缺點是:所有操作都在單個線程中完成。即使多核機器,也只會使用一個核。

方案:檢查核數(shù),有幾個核,就啟動幾個事件循環(huán)進程。在每個CPU上,使用異步(回調(diào)或者協(xié)程)方案。操作系統(tǒng)負(fù)責(zé)新建立的連接分配給某個服務(wù)器進程。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容