envoy 模塊源碼精析

介紹

requests 模塊一樣,envoy 也是 Reitz 的作品,連官方描述都類似——Python Subprocesses for Humans。

實際上,envoy 的核心代碼非常少,總共只有不到 300 行代碼,只是簡單的對標準庫 subprocess 的封裝。但是,所謂短小精干,envoy 實現(xiàn)的接口簡單易用,比裸用 subprocess 方便不少。

背景知識

在講 envoy 的代碼之前,我們先回顧一些背景知識。

程序和進程

在計算機科學及相關(guān)領(lǐng)域,經(jīng)常能看到程序和進程的概念。有些人不清楚它們的差別,混為一談。這是不好的。

  • 程序:一般是一組CPU指令的集合構(gòu)成的文件,靜態(tài)存儲在諸如硬盤之類的存儲設備上。
  • 進程:當一個程序要被計算機運行時,就是在內(nèi)存中產(chǎn)生該程序的一個運行時實例,我們就把這個實例叫做進程。

簡單來說,程序就是編譯出來的二進制可執(zhí)行文件,比如 Windows 里的 .exe 文件,nix 里的 ELF 文件。操作系統(tǒng)將它們裝載到內(nèi)存空間并執(zhí)行時的實例,就是進程。程序和進程之間隔著一個「裝載」的步驟*。

Linux 里的進程

以下實驗均在 CentOS 5.4 環(huán)境下操作。

首先,我們在終端里執(zhí)行
ps -eo pid,ppid,comm,cmd | less

這里 ps 命令用來查詢正在運行的進程,-e 表示我們想要查看所有的進程,-o 則選擇我們想查看的列名稱。這里我們查看 pid, ppid, comm, cmd。

在這個輸出結(jié)果中,每一行代表一個進程(表頭除外),共分為 4 列。

  • PID: Process IDentity,進程在當前系統(tǒng)中的唯一識別碼,相當于我們的身份證號。
  • PPID: Parent PID,父進程的 PID。
  • COMMAND: 進程的簡稱。
  • CMD: 進程對應的程序及其運行時所帶的參數(shù)。

從計算機啟動到進程的創(chuàng)建

計算機啟動時,首先會從主板上的 BIOS (Basic Input/Output System) 中執(zhí)行程序,從某個設備(比如軟盤、硬盤、光盤、網(wǎng)絡等)上啟動計算機。而后,計算機會定位到所選的設備上,讀取開頭的 512 字節(jié)里的 MBR (Master Boot Record)。MBR 里記錄著從存儲設備啟動 Boot Loader 的具體分區(qū)和位置。Boot Loder 里記錄著操作系統(tǒng)名稱、內(nèi)核所在位置等信息,啟動 Boot Loader 之后,它會幫我們加載 Kernel。內(nèi)核負責兩件事:對下負責管理硬件,對上負責提供系統(tǒng)調(diào)用。于是,內(nèi)核首先會預留自己運行所需的內(nèi)存空間,然后調(diào)用驅(qū)動程序 (drivers)檢測計算機硬件,最后啟動 init 進程,并將控制權(quán)交給這個進程。在 Linux 里,init 的 PID 是 1。init 進程負責設置計算機名稱、時區(qū),檢測文件系統(tǒng),掛載硬盤,清空臨時文件,設置網(wǎng)絡等操作。通常意義上說,當 init 完成這些工作,計算機就算啟動完成了。

我們小結(jié)一下,計算機啟動的流程是:

BIOS -> MBR -> Boot Loader -> Kernel -> 預留內(nèi)存空間 -> drivers -> init -> settings

我們知道,運行于操作系統(tǒng)上的進程(包括 init)與操作系統(tǒng)交互,都是通過系統(tǒng)調(diào)用來完成的。然而 Linux 并沒有提供創(chuàng)建新進程的系統(tǒng)調(diào)用。實際上,Linux 里創(chuàng)建新的進程這一動作,是通過 forkexec 兩個函數(shù)來實現(xiàn)的。

我們先來看看 fork 函數(shù)的用法。

pid_t pid;
if (pid = fork()) {
    // ...
} else {
    // ...
}

調(diào)用 fork 函數(shù)后,新的進程(任務)和當前進程一起從代碼的同一位置開始執(zhí)行:從 fork 函數(shù)獲得返回值。在這里,新的進程稱為子進程 (Child Process),當前進程相對應稱之為父進程 (Parent Process)。不過,在子進程中,fork 函數(shù)返回 0;在父進程中,fork 函數(shù)則返回子進程的 PID。因此,在子進程中,表達式 pid = fork()false,跳轉(zhuǎn)到后續(xù)的 else 語句塊繼續(xù)執(zhí)行;在父進程中,表達式 pid = fork()true,繼續(xù)執(zhí)行語句塊。

fork 函數(shù)的產(chǎn)生子進程的速度非???。這是因為,通過 fork 產(chǎn)生的子進程,只是簡單地分配了內(nèi)存空間,并與父進程共享寫時復制 (Copy on Write, COW)內(nèi)存空間。這意味著,通過 fork 產(chǎn)生子進程的過程中,并沒有內(nèi)存中內(nèi)容的復制,因此速度非???。

fork 產(chǎn)生的子進程,只是父進程的鏡像。通過 fork 的返回值,我們可以在代碼里判斷是否是子進程。如果是子進程,就可以調(diào)用 exec 函數(shù),使用新的程序(可執(zhí)行映像文件)覆蓋當前的映像,從而執(zhí)行新的任務。

不難發(fā)現(xiàn),Linux 中所有的進程,不斷追溯其父進程,都會最終追溯到 init 進程。

進程的終止

當一個進程執(zhí)行 exit 函數(shù)之后,內(nèi)核會釋放它所打開的文件、占用的內(nèi)存等資源,然后在操作系統(tǒng)內(nèi)核中保留一些退出信息

  • PID
  • Exit Code
  • CPU time taken by the process

簡而言之,進程退出后,會釋放資源,然后在內(nèi)核里留下一些診斷信息,成為僵尸進程 (Zombie Process)。進程退出后,將 PID 留在了操作系統(tǒng)內(nèi)核中尚未釋放。因此,該 PID 是不可以被后續(xù)的新進程使用的。因此,在 Linux 的設計中,父進程需要調(diào)用 wait 或者 waitpid 函數(shù)從內(nèi)核中獲取并處理子進程的診斷信息,并釋放 PID(清掃僵尸進程)。

如果子進程退出時,父進程尚在,但父進程始終不處理子進程退出后留下的僵尸進程,而不斷因為業(yè)務邏輯產(chǎn)生新的子進程,那么僵尸進程就會不斷積累,最終占滿所有可用的 PID(沒有進程槽了)。這樣一來,在操作系統(tǒng)中就無法產(chǎn)生新的子進程了。(參見 fork 炸彈)因此,通過 fork 函數(shù)創(chuàng)建子進程之后,一定要注意 wait 子進程。

如果父進程退出時,子進程尚在。這時候,沒爹沒娘的孤兒進程(Orphand Process)就會被 init 進程收養(yǎng),直到它退出后被 init 處理。

envoy 源碼剖析

Reitz 的 envoy 項目地址是 https://github.com/kennethreitz/envoy。為了保證本文的長期有效性,我將它 fork 到了這里 https://github.com/reviewlib/envoy

envoy 的核心代碼保存在 ./envoy/core.py 當中。我們先就這份代碼的語法點做分析,然后討論它的結(jié)構(gòu)。

import os
import sys
import shlex
import signal
import subprocess
import threading
import traceback

最頭上的兩個 ossys 是常用的標準庫,不必多說。

shlex 的名字可以分為兩部分:sh 代表 shell;lex 是一個著名的詞法分析器的生成器(lexical analyzer)。運用這個標準庫,我們可以很容易地解析出用戶需要在子進程中執(zhí)行的命令。

signal 是 Python 里處理 Linux 內(nèi)核信號的標準庫。我們這里主要用它內(nèi)部定義的信號的值,不涉及它的具體用法。

subprocess 是 Python 中實現(xiàn)子進程的標準庫,是 envoy 封裝的實際內(nèi)容。

threading 是 Python 中實現(xiàn)多線程的一個標準庫。在 envoy 里,我們實際用它來執(zhí)行 subprocess.Popen() 創(chuàng)建子進程并執(zhí)行任務。

traceback 是 Python 中用來追溯異常的標準庫。

Command

我們來看 Command 類。這是一個模塊內(nèi)部使用的類,Command 類的每個實例都能執(zhí)行 run() 方法,在一個子進程里執(zhí)行 Shell 命令。

初始化函數(shù) __init__() 直截了當,只是簡單地對各個數(shù)據(jù)成員賦值。

整個類的主要部分是 run() 函數(shù),我們仔細深入進去觀察一下。

environ = dict(os.environ)
environ.update(env or {})

第一個值得注意的地方,是對環(huán)境變量的處理。

首先,作者將 os.environ 轉(zhuǎn)換成一個 Python 內(nèi)建字典,保存在 environ 中。而后,用字典的 update() 方法,將用戶傳入的環(huán)境變量補充到 environ 中。這里,update() 方法有兩個特點

  • 輸入必須是一個非空的字典,因此作者利用短路求值 env or {} 的方式確?!阜强铡?/strong>;
  • 輸入的 env 如果有與 os.environ 同名的環(huán)境變量,則會以 env 中的值為準,否則直接在 environ 中添加鍵值對。

利用這兩個特點,作者巧妙地實現(xiàn)了程序邏輯。

第二個值得注意的地方,是在 run() 函數(shù)的內(nèi)部,嵌套定義了 target() 函數(shù)。

def target():

    try:
        self.process = subprocess.Popen(self.cmd,
            universal_newlines=True,
            shell=False,
            env=environ,
            stdin=subprocess.PIPE,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            bufsize=0,
            cwd=cwd,
        )

        if sys.version_info[0] >= 3:
            self.out, self.err = self.process.communicate(
                input = bytes(self.data, "UTF-8") if self.data else None
            )
        else:
            self.out, self.err = self.process.communicate(self.data)
    except Exception as exc:
        self.exc = exc

在 Python 中,函數(shù)定義是允許嵌套的,不過

  • 各個函數(shù)有自己的作用域;
  • 內(nèi)層函數(shù)優(yōu)先訪問內(nèi)層作用域的變量,如果內(nèi)層沒有所需變量,則逐層向外尋找所需變量;
  • 外層函數(shù)不能訪問內(nèi)層函數(shù)的變量(對外層函數(shù)來說,這是局部變量);除非內(nèi)層函數(shù)聲明變量時加上了 global 關(guān)鍵字修飾,并且在訪問它時已經(jīng)調(diào)用過內(nèi)層函數(shù)。

這里的 target() 函數(shù)定義了我們接到一個執(zhí)行 Shell 命令的需求時,我們要做哪些事情。依其定義,我們要首先使用 subprocess.Popen() 創(chuàng)建一個子進程,并在相應的條件下執(zhí)行 self.cmd。然后調(diào)用 self.process.communicate() 方法,將 self.data 通過管道傳給正在 Shell 中執(zhí)行的程序,并獲取程序的標準輸出和標準錯誤。在整個過程中,但凡出現(xiàn)任何問題,都保存在 self.exc 當中。這里作者使用了所有異常的基類 Exception,這是因為對于作者來說 self.cmd 是不可控的,在執(zhí)行 self.cmd 的過程中可能出現(xiàn)任何形式的異常。為了能夠處理所有異常,作者必須使用 Exception 來處理。

第三個值得注意的地方,是作者在工作線程中去實際執(zhí)行 target() 完成的任務。


if self.exc:
    raise self.exc
if _is_alive(thread) :
    _terminate_process(self.process)
    thread.join(kill_timeout)
    if _is_alive(thread):
        _kill_process(self.process)
        thread.join()
self.returncode = self.process.returncode
return self.out, self.err

首先,作者創(chuàng)建了一個線程,將 target() 函數(shù)作為參數(shù)傳入構(gòu)造。也就是說,thread.start() 實際會執(zhí)行 target() 函數(shù)的代碼。而后,作者用 thread.join(timeout) 的方式,來處理上層傳下來的超時限制。這樣,主進程將會阻塞住,直到

  • 線程中的任務完成(也就是 target() 中創(chuàng)建的子進程的任務完成);或者
  • 達到超時時間限制。

第四個值得注意的地方,是作者回收和處理在線程中運行的子進程任務的執(zhí)行狀態(tài)信息。

if self.exc:
    raise self.exc
if _is_alive(thread) :
    _terminate_process(self.process)
    thread.join(kill_timeout)
    if _is_alive(thread):
        _kill_process(self.process)
        thread.join()
self.returncode = self.process.returncode
return self.out, self.err

首先,子進程可能拋出異常,因此需要捕獲和繼續(xù)向上拋出異常。

其次,線程 thread 可能因為超時而執(zhí)行到當前代碼,因此通過預定義的 _is_alive() 函數(shù)來判斷線程是正常退出還是扔在超時運行。如果確實超時,那么首先應該終止子進程,然后嘗試等待線程超時終止。如果線程仍然還活著,說明線程內(nèi)的子進程沒有被正確終止,那么首先殺死子進程,然后阻塞線程直到它完成。這樣的設計,是確保子進程和線程都完全停止,防止僵尸進程的出現(xiàn)

最后,函數(shù)返回標準輸出和標準錯誤的內(nèi)容。

Response

我們來看 Response 類。這是一個模塊內(nèi)部使用的類,Response 類的每個實例都是 Command 類的實例調(diào)用 run() 方法后的執(zhí)行結(jié)果信息。


class Response(object):
    """A command's response"""

    def __init__(self, process=None):
        super(Response, self).__init__()

        self._process = process
        self.command = None
        self.std_err = None
        self.std_out = None
        self.status_code = None
        self.history = []


    def __repr__(self):
        if len(self.command):
            return '<Response [{0}]>'.format(self.command[0])
        else:
            return '<Response>'

從只有一個 __repr__() 方法可以看出,Response 類幾乎只是一個簡單的數(shù)據(jù)結(jié)構(gòu),提供了可供打印的功能,僅此而已。那么作者為什么要設計這樣一個類呢?這里我們留給讀者思考。

expand_args 函數(shù)

expand_args(command) 函數(shù)接收一個字符串作為參數(shù),并將之解析為一個個的命令。

Prepare arguments.

if isinstance(command, (str, unicode)):
splitter = shlex.shlex(command.encode('utf-8'))
splitter.whitespace = '|'
splitter.whitespace_split = True
command = []

while True:
token = splitter.get_token()
if token:
command.append(token)
else:
break

command = list(map(shlex.split, command))

return command

我們以 'cat inFile | sort | uniq' 為引數(shù),傳入 expand_args 函數(shù),分析一下會發(fā)生什么。

首先,作者用 shlex.shlex() 構(gòu)造了一個詞法分析器,并設置以管道符號 | 為標志,分割傳入的字符串(或者 unicode 類型的實例,后不再重復)。加上之后的 while 循環(huán),這基本相當于執(zhí)行了 command = command.split('|') 的效果。

而后,執(zhí)行 command = list(map(shlex.split, command)),調(diào)用 shlex.split 函數(shù),作用在 command 的每一個元素上,并返回一個列表,保存在 command 當中。最后以 returncommand 返回給調(diào)用函數(shù)。

這里的 map() 函數(shù)接收兩個參數(shù)

  • 一個函數(shù)
  • 一個可迭代的列表

然后將函數(shù)作用在列表的每一個元素上,并返回一個列表。類似的函數(shù)還有 reduce() 函數(shù)(參考 Google 的 MapReduce 架構(gòu))。這里給出兩個示例,供體會它們的作用

map.py

#!/usr/bin/env python
inIter = ['adam', 'LISA', 'barT']
regNames = lambda iter: map ((lambda inStr: inStr.capitalize()), iter)
print regNames (inIter)

最后,輸入 'cat inFile | sort | uniq' 有輸出 [['cat', 'inFile'], ['sort'], ['uniq']]。

run 函數(shù)

run(command, data=None, timeout=None, kill_timeout=None, env=None, cwd=None) 函數(shù)是 envoy 模塊的主要接口,用來在子進程里執(zhí)行 Shell 命令。

首先解釋一下 run() 函數(shù)的各個參數(shù)的含義

  • command 需要執(zhí)行的 Shell 命令(可以包含管道,但是不允許包含 && 或者 ; 之類的符號);
  • data 通過管道傳入 Shell 命令的內(nèi)容;
  • timeout 子進程執(zhí)行超時時間;
  • kill_timeout 終止子進程失敗的超時時間,超過這個時間將直接殺死子進程;
  • env 環(huán)境變量;
  • cwd Current Working Directory,工作目錄。

run 函數(shù)的實現(xiàn)相對來說是平鋪直敘的,這里用注釋簡單說明一下各個部分都做了什么即可。

'''對 command 做詞法分析,拆分命令'''
command = expand_args(command)

history = []
for c in command:

'''模擬管道的作用,傳入上一程序的標準輸出'''
if len(history):

due to broken pipe problems pass only first 10 KiB

data = history[-1].std_out[0:10*1024]

'''實際在子進程里執(zhí)行命令'''
cmd = Command(c)
try:
out, err = cmd.run(data, timeout, kill_timeout, env, cwd)
status_code = cmd.returncode
except OSError as e:
out, err = '', u"\n".join([e.strerror, traceback.format_exc()])
status_code = 127

'''將執(zhí)行結(jié)果保存在 history 當中'''
r = Response(process=cmd)

r.command = c
r.std_out = out
r.std_err = err
r.status_code = status_code

history.append(r)

'''函數(shù)返回最后一個管道(如果有)之后命令的輸出和詳細情況'''
r = history.pop()
r.history = history

return r
</pre>

|

模塊設計分析

Kenneth Reitz 不愧是公認的這個世界上 Python 代碼寫得最好的人之一——雖然 envoy 是對 subprocess 的簡單封裝,功能有限,但是代碼結(jié)構(gòu)非常優(yōu)雅,內(nèi)部實現(xiàn)的逐層封裝十分完善。

對于模塊的用戶(程序員)來說,envoy 幾乎只有 run 這一個入口。而它的作用也很明確:開一個子進程,執(zhí)行一條 Shell 命令,然后在規(guī)定時間內(nèi)取得執(zhí)行結(jié)果——中間的臟活累活(處理異常、超時、主進程阻塞等待、保存歷史等等),envoy 都幫你做好了。

對于 run() 函數(shù)來說,它只需要知道執(zhí)行 out, err = cmd.run() 就能在子進程里執(zhí)行用戶需要的命令,然后將結(jié)果存在 Response 里就可以了。

對于 Command.run() 函數(shù)來說,它只需要處理好環(huán)境變量,執(zhí)行 target() 最后處理超時、異常、收集結(jié)果信息就可以了。

對于 target() 來說,這是一個嵌套定義的函數(shù),它才是真正 fork 子進程并執(zhí)行 Shell 命令的函數(shù)。

不難發(fā)現(xiàn),每個層次完成的任務,幾乎都可以用簡單一句話解釋清楚

  • envoy.run() - 將 Shell 命令交給它,就會在子進程里執(zhí)行這些命令并處理好返回結(jié)果;
  • Command.run() - 將一個具體的 Shell 命令(不包含管道)交給它,就會在子進程里執(zhí)行這些命令并處理好返回結(jié)果;
  • target() - fork 一個子進程,然后在子進程里開心地執(zhí)行命令。

這種符合 *nix 哲學的設計,造就了優(yōu)雅好用的 envoy 庫。對于程序員來說,將命令交給它,然后坐等結(jié)果就可以了。無愧于 Python Subprocesses for Humans 的豪言壯語。

轉(zhuǎn)自:
https://blog.csdn.net/junli_chen/article/details/78295454

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容