500 lines or less學(xué)習(xí)筆記(四)——持續(xù)集成系統(tǒng)(ci)

本文介紹了一個(gè)簡(jiǎn)單的分布式持續(xù)集成系統(tǒng),包含監(jiān)聽器、調(diào)度器和測(cè)試運(yùn)行器三個(gè)組件,主要由 Python 語言實(shí)現(xiàn),并包含一部分 shell 腳本。我是在 Windows 下運(yùn)行的代碼,所以對(duì)部分地方進(jìn)行了修改。

原文作者

Malini Das。 Malini Das是一位軟件工程師,他熱衷于快速開發(fā)(但保證安全?。┮约敖鉀Q交叉編程問題。她曾在 Mozilla 擔(dān)任工具工程師,當(dāng)前在 Twitch 的技能。你可以通過關(guān)注 Malini 的 Twitter 或她的博客來了解她的最新動(dòng)態(tài)。

什么是持續(xù)集成系統(tǒng)?

在開發(fā)軟件時(shí),我們希望能夠驗(yàn)證我們的新功能或錯(cuò)誤修復(fù)是否安全,是否按預(yù)期工作。我們通過對(duì)代碼運(yùn)行測(cè)試來做到這一點(diǎn)。有時(shí),開發(fā)人員會(huì)在本地運(yùn)行測(cè)試來驗(yàn)證他們的更改是安全的,但是開發(fā)人員可能沒有時(shí)間在運(yùn)行軟件的每個(gè)系統(tǒng)上測(cè)試他們的代碼。此外,隨著越來越多的測(cè)試被添加,運(yùn)行所有這些測(cè)試(即使只是在本地)變得不太可行。正因?yàn)槿绱?,持續(xù)集成系統(tǒng)被創(chuàng)造出來。

持續(xù)集成(CI)系統(tǒng)是用來測(cè)試新代碼的專用系統(tǒng)。新代碼提交到代碼存儲(chǔ)庫后,持續(xù)集成系統(tǒng)負(fù)責(zé)驗(yàn)證此提交不會(huì)導(dǎo)致任何測(cè)試失敗。為此,系統(tǒng)必須能夠獲取新更改的代碼,運(yùn)行測(cè)試并報(bào)告結(jié)果。和其他系統(tǒng)一樣,它也應(yīng)該有良好的穩(wěn)定性。這意味著,如果系統(tǒng)的任何部分發(fā)生故障,它應(yīng)該能夠從故障點(diǎn)恢復(fù)并繼續(xù)運(yùn)行。

這個(gè)測(cè)試系統(tǒng)應(yīng)該能夠很好地處理負(fù)載均衡,這樣如果提交的速度比測(cè)試的運(yùn)行速度還要快,我們就可以在合理的時(shí)間內(nèi)得到測(cè)試結(jié)果。我們可以通過分布式和并行化測(cè)試工作來實(shí)現(xiàn)這一點(diǎn)。這個(gè)項(xiàng)目將展示一個(gè)小型的、基本的分布式連續(xù)集成系統(tǒng),它是為可擴(kuò)展性而設(shè)計(jì)的。

項(xiàng)目限制和注意事項(xiàng)

本項(xiàng)目使用 Git 作為測(cè)試代碼的存儲(chǔ)庫,但只會(huì)使用標(biāo)準(zhǔn)的源代碼管理調(diào)用指令,因此如果你不熟悉 Git,但熟悉其他版本控制系統(tǒng)(VCS),如 svn 或 Mercurial,你仍然可以繼續(xù)學(xué)習(xí)。

由于代碼長(zhǎng)度和單元測(cè)試的限制,我簡(jiǎn)化了測(cè)試發(fā)現(xiàn)機(jī)制。我們將只運(yùn)行存儲(chǔ)庫中 tests 目錄中的測(cè)試。

持續(xù)集成系統(tǒng)監(jiān)視主存儲(chǔ)庫,該存儲(chǔ)庫通常托管在 Web 服務(wù)器上,而不是 CI 文件系統(tǒng)的本地存儲(chǔ)庫。對(duì)于我們的示例,我們將使用本地存儲(chǔ)庫而不是遠(yuǎn)程存儲(chǔ)庫。

持續(xù)集成系統(tǒng)不需要按固定的、定期的時(shí)間表運(yùn)行。你還可以讓它們每隔幾次提交或每次提交運(yùn)行一次。對(duì)于我們的示例,CI 系統(tǒng)將定期運(yùn)行。這意味著,如果將其設(shè)置為在五秒鐘內(nèi)檢查更改,則它將針對(duì)五秒鐘后所做的最新提交運(yùn)行測(cè)試。它不會(huì)測(cè)試在這段時(shí)間內(nèi)所做的每一次提交,只測(cè)試最近的一次。

此 CI 系統(tǒng)旨在定期檢查存儲(chǔ)庫中的更改。在真實(shí)的 CI 系統(tǒng)中,你還可以讓存儲(chǔ)庫監(jiān)聽器得到托管存儲(chǔ)庫的通知。例如,Github 提供了“post commit hooks”,用于向 URL 發(fā)送通知。按照此模型,位于該 URL 的 Web 服務(wù)器將調(diào)用存儲(chǔ)庫監(jiān)聽器來響應(yīng)該通知。由于這在本地建模很復(fù)雜,所以我們使用了一個(gè)觀察者模型,在這個(gè)模型中,存儲(chǔ)庫監(jiān)聽器將檢查更改,而不是得到通知。

CI 系統(tǒng)也有一個(gè)報(bào)告器組件,測(cè)試運(yùn)行程序?qū)⑵浣Y(jié)果報(bào)告給一個(gè)組件,以便人們可以在網(wǎng)頁上看到這些結(jié)果。為了簡(jiǎn)單起見,本項(xiàng)目收集測(cè)試結(jié)果并將其作為文件存儲(chǔ)在調(diào)度程序進(jìn)程本地的文件系統(tǒng)中。

注意,這個(gè) CI 系統(tǒng)使用的架構(gòu)只是眾多可能性中的一種。選擇這種方法是為了將我們的案例研究簡(jiǎn)化為三個(gè)主要部分。

引言

持續(xù)集成系統(tǒng)的基礎(chǔ)結(jié)構(gòu)包括三個(gè)部分:監(jiān)聽器,測(cè)試任務(wù)調(diào)度器,和測(cè)試運(yùn)行器。首先監(jiān)聽器會(huì)監(jiān)視代碼庫,當(dāng)發(fā)生提交時(shí),監(jiān)聽器會(huì)通知調(diào)度器。之后,調(diào)度器會(huì)分配給一個(gè)可用的測(cè)試運(yùn)行器來完成對(duì)應(yīng)提交版本號(hào)的測(cè)試。

構(gòu)建 CI 系統(tǒng)的方式有很多。我們可以將他們?nèi)窟\(yùn)行在一臺(tái)電腦的同一個(gè)線程之中。但是這樣一來,我們的 CI 系統(tǒng)就會(huì)缺少了處理大負(fù)載的能力,當(dāng)很多的提交帶來了大量的測(cè)試內(nèi)容時(shí),這種方案非常容易引起工作的積壓。同時(shí)這種方案的容錯(cuò)率非常低,一旦運(yùn)行該系統(tǒng)的計(jì)算機(jī)發(fā)生故障或是斷電,沒有后備的系統(tǒng)完成中斷的工作。我們希望我們的 CI 系統(tǒng)應(yīng)該根據(jù)需求盡可能的同時(shí)完成多項(xiàng)測(cè)試工作,并且在機(jī)器發(fā)生意外停機(jī)時(shí)有很好的后備運(yùn)行方案。

為了構(gòu)建一個(gè)負(fù)載能力強(qiáng)并且容錯(cuò)率又高的 CI 系統(tǒng),在本項(xiàng)目中,上述的每一個(gè)組件都以獨(dú)立的進(jìn)程運(yùn)行。每個(gè)進(jìn)程之間完全獨(dú)立,并且每個(gè)進(jìn)程可以同時(shí)運(yùn)行多個(gè)實(shí)例。這種方案在很多的測(cè)試工作需要同時(shí)展開時(shí)很有用。我們可以并行運(yùn)行多個(gè)測(cè)試運(yùn)行器的實(shí)例,每個(gè)測(cè)試運(yùn)行器獨(dú)立工作,這樣就可以有效的解決測(cè)試隊(duì)列積壓的問題。

在本項(xiàng)目中這些組件雖然運(yùn)行在獨(dú)立的進(jìn)程上,但是相互之間可以通過套接字進(jìn)行通信,這樣我們就可以在網(wǎng)絡(luò)中的不同主機(jī)上分別運(yùn)行這些進(jìn)程。我們會(huì)為每一個(gè)進(jìn)程分配一個(gè)地址/端口,這樣每個(gè)進(jìn)程之間就可以通過向分配到的地址發(fā)送消息來互相通信。

通過分布式的架構(gòu),我們可以做到在硬件發(fā)生故障時(shí)即時(shí)的進(jìn)行處理。我們可以把監(jiān)聽器,測(cè)試任務(wù)調(diào)度器,和測(cè)試運(yùn)行器分別運(yùn)行在不同的機(jī)器上,他們可以通過網(wǎng)絡(luò)保持相互通信。當(dāng)他們之中的任何一個(gè)發(fā)生問題時(shí),我們可以安排一臺(tái)新的主機(jī)上線運(yùn)行發(fā)生問題的進(jìn)程。這樣一來這個(gè)系統(tǒng)就會(huì)有非常高的容錯(cuò)率。

在本項(xiàng)目并沒有包含自動(dòng)恢復(fù)的代碼。自動(dòng)恢復(fù)的功能取決于你使用的分布式系統(tǒng)的結(jié)構(gòu)。在實(shí)際的使用中,CI 系統(tǒng)通常運(yùn)行在支持故障轉(zhuǎn)移(舉個(gè)例子,當(dāng)分布式系統(tǒng)中的一個(gè)機(jī)器發(fā)生故障,我們?cè)O(shè)定好的后備機(jī)器會(huì)自動(dòng)接手中斷的工作)的分布式系統(tǒng)之中。

對(duì)于本項(xiàng)目,這些進(jìn)程中的每一個(gè)都將在本地以不同的本地端口手動(dòng)啟動(dòng)。

項(xiàng)目文件結(jié)構(gòu)

項(xiàng)目中每個(gè)組件的 Python 文件結(jié)構(gòu)如下:監(jiān)聽器 (repo_observer.py),測(cè)試任務(wù)調(diào)度器(dispatcher.py),測(cè)試運(yùn)行器(test_runner.py)。上述每個(gè)進(jìn)程之間通過套接字通信,我們將用于實(shí)現(xiàn)通信功能的代碼統(tǒng)一的放在 helpers.py 中。這樣就可以讓每個(gè)組件直接從這個(gè)文件中導(dǎo)入通信函數(shù),而不用在每個(gè)組件中重復(fù)的寫這段代碼。

另外,我們還用到了 bash 腳本。這些腳本用來執(zhí)行一些簡(jiǎn)單的 bash 和 git 的操作,直接通過 bash 腳本要比利用 Python 提供的系統(tǒng)級(jí)別的模塊(比如,os 或者 subprocess 之類的)要更方便一些。

最后,我們還建立了一個(gè) tests 目錄來存放我們需要 CI 系統(tǒng)運(yùn)行的測(cè)試樣例。在這個(gè)目錄中包含兩個(gè)用于測(cè)試的樣例,其中一個(gè)樣例模擬了樣例通過時(shí)的情況,另一個(gè)則模擬了失敗時(shí)的情況。

初始設(shè)置

雖然我們的 CI 系統(tǒng)是為分布式的運(yùn)行而設(shè)計(jì)的,但是為了在理解 CI 系統(tǒng)運(yùn)行原理的過程中不受網(wǎng)絡(luò)因素的影響,我們會(huì)在同一臺(tái)計(jì)算機(jī)上運(yùn)行所有的組件。當(dāng)然,如果你想要試一試分布式的運(yùn)行環(huán)境,你也可以將每一個(gè)組件分別運(yùn)行到不同的主機(jī)上。

持續(xù)集成系統(tǒng)通過監(jiān)聽代碼的變動(dòng)來觸發(fā)測(cè)試,所以在開始之前我們需要設(shè)置一個(gè)用于監(jiān)聽的代碼儲(chǔ)存庫。

我們稱這個(gè)用于測(cè)試的項(xiàng)目為 test_repo:

$ mkdir test_repo 
$ cd test_repo 
$ git init

這是開發(fā)人員遷入代碼的主存儲(chǔ)庫。監(jiān)聽器模塊通過檢查 commit (提交)來進(jìn)行代碼更新的監(jiān)聽,所以我們至少需要一次的 commit 才能進(jìn)行監(jiān)聽器模塊的測(cè)試。

將 tests 文件夾拷貝到 test_repo 中,然后提交:

$ cp -r /this/directory/tests /path/to/test_repo/ 
$ cd /path/to/test\_repo 
$ git add tests/ 
$ git commit -m ”add tests”

現(xiàn)在,我們的代碼倉庫中的 master 分支上有了一次提交。

監(jiān)聽器組件需要一份單獨(dú)的代碼拷貝來檢測(cè)新的提交。讓我們從 master 分支做一份代碼拷貝,起名為 test_repo_clone_obs

$ git clone /path/to/test_repo test_repo_clone_obs

測(cè)試運(yùn)行器同樣需要一個(gè)自己的代碼拷貝,這樣它才能在 commit 發(fā)生時(shí)運(yùn)行相關(guān)的測(cè)試。我們同樣從 master 分支做一份代碼拷貝,并起名為 test_repo_clone_runner

$ git clone /path/to/test_repo test_repo_clone_runner

組件

監(jiān)聽器(repo_observer.py

監(jiān)聽器的任務(wù)是監(jiān)聽代碼庫中的改動(dòng),并在發(fā)現(xiàn)新提交時(shí)通知測(cè)試任務(wù)分配器。為了保證我們的 CI 系統(tǒng)與各種版本控制系統(tǒng)(并不是所有的 VCS 都有內(nèi)置的通知系統(tǒng))都能夠兼容,我們?cè)O(shè)定 CI 系統(tǒng)定時(shí)檢查代碼庫是否有新的提交,而不是等待 VCS 在代碼提交時(shí)發(fā)送通知。

監(jiān)聽器會(huì)定時(shí)輪詢存儲(chǔ)庫,當(dāng)觀察到更改時(shí),監(jiān)聽器會(huì)向分配器推送需要運(yùn)行測(cè)試的代碼的提交ID。監(jiān)聽器通過獲取當(dāng)前的提交 ID 來檢查新的提交,然后將本地庫更新至這個(gè)版本,最后將這個(gè)版本與遠(yuǎn)程庫最近一次的提交 ID 進(jìn)行比對(duì)。這樣,監(jiān)聽器中本地的當(dāng)前版本與遠(yuǎn)程的最新版本不一致時(shí)就判定為發(fā)生了新的提交。在我們的 CI 系統(tǒng)中,監(jiān)聽器只會(huì)向分配器推送最近的一次提交。這意味著,如果在一次的輪詢周期內(nèi)發(fā)生了兩次提交,監(jiān)聽器只會(huì)為最近的一次運(yùn)行測(cè)試。通常來講,CI 系統(tǒng)會(huì)為自上一次更新以來的每一次的提交運(yùn)行相應(yīng)的測(cè)試。但是為了簡(jiǎn)單起見,這次我們搭建的 CI 系統(tǒng)采取了僅為最后一次提交運(yùn)行測(cè)試的方案。

監(jiān)聽器必須清楚自己監(jiān)聽的到底是哪一個(gè)存儲(chǔ)庫,我們之前已經(jīng)在 /path/to/test_repo_clone_obs 建立了一份用于監(jiān)聽的存儲(chǔ)庫拷貝。我們的監(jiān)聽器會(huì)使用這份拷貝進(jìn)行檢測(cè)。為了監(jiān)聽器能夠使用這份拷貝,我們?cè)谡{(diào)用 repo_observer.py 時(shí)會(huì)傳入這個(gè)拷貝的路徑。監(jiān)聽器會(huì)利用這份拷貝從主倉庫中拉取最新的代碼。

同樣,我們還需要為監(jiān)聽器提供測(cè)試任務(wù)分配器的地址,這樣監(jiān)聽器推送的消息才能傳遞到分配器中。在運(yùn)行監(jiān)聽器時(shí),可以通過命令行參數(shù) --dispatcher-server 來傳遞分配器的地址。如果不手動(dòng)傳入地址,分配器的默認(rèn)地址取值為:localhost:8888

def poll():
    parser = argparse.ArgumentParser()
    parser.add_argument("--dispatcher-server",
                        help="dispatcher host:port, " \
                        "by default it uses localhost:8888",
                        default="localhost:8888",
                        action="store")
    parser.add_argument("repo", metavar="REPO", type=str,
                        help="path to the repository this will observe")
    args = parser.parse_args()
    dispatcher_host, dispatcher_port = args.dispatcher_server.split(":")

當(dāng)調(diào)用了監(jiān)聽器腳本后,會(huì)直接從 poll() 開始運(yùn)行。這個(gè)函數(shù)會(huì)將命令行的參數(shù)傳遞進(jìn)來,并開始一個(gè)無限的 while 循環(huán)。這個(gè) while 循環(huán)會(huì)定期的檢查存儲(chǔ)庫的變化。這個(gè)循環(huán)中所做的第一個(gè)工作就是運(yùn)行 bash 腳本update_repo.sh[1]。

    while True:
        try:
            # 調(diào)用更新存儲(chǔ)庫的 bash 腳本并檢查更新。如果發(fā)現(xiàn)更新,它會(huì)刪除
            # 當(dāng)前工作目錄的 .commit_id 文件
            subprocess.check_output(["./update_repo.sh", args.repo])
        except subprocess.CalledProcessError as e:
            raise Exception("Could not update and check repository. " +
                            "Reason: %s" % e.output)

update_repo.sh 用于識(shí)別新的提交并通知監(jiān)聽器。它首先記錄當(dāng)前所在的提交 ID,然后拉取最新的代碼,接著檢查最新的提交 ID。如果二者匹配,說明代碼沒有變動(dòng),所以監(jiān)聽器不會(huì)作出任何響應(yīng)。但是,如果提交 ID 間存在不同,就意味著有了新的提交。這時(shí),update_repo.sh 會(huì)創(chuàng)建一個(gè)叫 .commit_id 的文件來記錄最新的提交 ID。

update_repo.sh 的具體步驟如下:首先,我們的腳本源自于一個(gè)叫 run_or_fail.sh 的文件。run_or_fail.sh 提供了一些 shell 腳本的輔助函數(shù)。通過這些函數(shù)我們可以運(yùn)行指定的腳本并可以在運(yùn)行出錯(cuò)時(shí)輸出錯(cuò)誤信息。

#!/bin/bash

source run_or_fail.sh 

接下來,我們的腳本會(huì)試圖刪除 .commit_id 文件。因?yàn)?repo_observer.py 會(huì)不斷循環(huán)的調(diào)用 updaterepo.sh,如果在上一次的調(diào)用中產(chǎn)生了 .commit_id 文件,并且其中儲(chǔ)存的版本ID我們?cè)谏弦淮屋喸冎幸呀?jīng)完成了測(cè)試,就會(huì)造成混亂。所以我們?cè)诿看味紩?huì)先刪除上一次的 .commit_id 文件。

bash rm -f .commit_id

在刪除了文件之后(在文件已經(jīng)存在的情況下),它會(huì)驗(yàn)證我們正在觀察的存儲(chǔ)庫是否存在,然后將其重置為最新的提交,以防任何原因?qū)е滤煌健?/p>

run_or_fail "Repository folder not found!" pushd $1 1> /dev/null
run_or_fail "Could not reset git" git reset --hard HEAD

再之后,讀取 git 的日志,將其中最后一次的提交 ID 解析出來。

COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
  echo "Could not call 'git log' on repository"
  exit 1
fi
COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`

接下來,拉取存儲(chǔ)庫,獲取最近所有的更改,并得到最新的提交ID。

run_or_fail "Could not pull from repository" git pull
COMMIT=$(run_or_fail "Could not call 'git log' on repository" git log -n1)
if [ $? != 0 ]; then
  echo "Could not call 'git log' on repository"
  exit 1
fi
NEW_COMMIT_ID=`echo $COMMIT | awk '{ print $2 }'`

最后,如果新得到的提交 ID 與上一次的 ID 不匹配,我們就知道在兩次輪詢間發(fā)生了新的提交,所以我們的腳本應(yīng)該將新的提交ID儲(chǔ)存在 .commit_id 文件中。

# if the id changed, then write it to a file
if [ $NEW_COMMIT_ID != $COMMIT_ID ]; then
  popd 1> /dev/null
  echo $NEW_COMMIT_ID > .commit_id
fi

當(dāng) repo_observer.py 中的 update_repo.sh 腳本運(yùn)行結(jié)束后,監(jiān)聽器會(huì)檢查 .commit_id 是否存在。如果文件存在,我們就知道在上一次的輪詢后又發(fā)生了新的提交,我們需要通知測(cè)試樣例調(diào)度器來開始測(cè)試。監(jiān)聽器會(huì)通過連接并發(fā)送一個(gè)'status'請(qǐng)求來檢查調(diào)度器服務(wù)的運(yùn)行狀態(tài),以保證它處在可以正常接受指令的狀態(tài)正常工作狀態(tài)。

        if os.path.isfile(".commit_id"):
            try:
                response = helpers.communicate(dispatcher_host,
                                               int(dispatcher_port),
                                               "status")
            except socket.error as e:
                raise Exception("Could not communicate with dispatcher server: %s" % e)

如果調(diào)度器返回一個(gè)“OK”,監(jiān)聽器就會(huì)讀取 .commit_id 文件中最新的提交ID,并使用 dispatch:<commit ID> 請(qǐng)求將 ID 發(fā)送到調(diào)度器中。監(jiān)聽器會(huì)每隔 5 秒發(fā)送一次指令。如果發(fā)生任何錯(cuò)誤,監(jiān)聽器同樣會(huì)每隔 5s 進(jìn)行一次重試。

           if response == "OK":
                # 調(diào)度器已存在,讓我們發(fā)送測(cè)試
                commit = ""
                with open(".commit_id", "r") as f:
                    commit = f.readline()
                response = helpers.communicate(dispatcher_host,
                    int(dispatcher_port), "dispatch:%s" % commit)
                if response != "OK":
                    raise Exception("Could not dispatch the test: %s" % response)
                print("dispatched!")
            else:
                # 調(diào)度器出現(xiàn)了錯(cuò)誤
                raise Exception("Could not dispatch the test: %s" % response) 
        time.sleep(20)

監(jiān)聽器會(huì)永遠(yuǎn)重復(fù)這一操作,直到你使用 KeyboardInterrupt (Ctrl+C)終止監(jiān)聽器發(fā)送進(jìn)程或發(fā)送終止信號(hào),。

測(cè)試任務(wù)調(diào)度器(dispatcher.py

測(cè)試任務(wù)調(diào)度器是一個(gè)用來為測(cè)試運(yùn)行器分配測(cè)試任務(wù)的獨(dú)立進(jìn)程。它會(huì)在一個(gè)指定端口監(jiān)聽來自存儲(chǔ)庫監(jiān)聽器及測(cè)試運(yùn)行器的請(qǐng)求。調(diào)度器允許測(cè)試運(yùn)行器主動(dòng)注冊(cè),當(dāng)監(jiān)聽器發(fā)送一個(gè)提交 ID 時(shí),它會(huì)將測(cè)試工作分配給一個(gè)已經(jīng)注冊(cè)的測(cè)試運(yùn)行器。同時(shí),它還可以平穩(wěn)的處理測(cè)試運(yùn)行器遇到的各種問題,當(dāng)一個(gè)運(yùn)行器發(fā)生故障,它可以立即將該運(yùn)行器運(yùn)行測(cè)試的提交 ID 重新分配給一個(gè)新的測(cè)試運(yùn)行器。

dispatch.py 腳本從 serve 函數(shù)開始運(yùn)行。首先,它會(huì)解析你設(shè)定的分配器的地址及端口:

def serve():
    parser = argparse.ArgumentParser()
    parser.add_argument("--host",
                        help="dispatcher's host, by default it uses localhost",
                        default="localhost",
                        action="store")
    parser.add_argument("--port",
                        help="dispatcher's port, by default it uses 8888",
                        default=8888,
                        action="store")
    args = parser.parse_args()

這里我們會(huì)開啟分配器進(jìn)程以及一個(gè) runner_checker 函數(shù)進(jìn)程,和一個(gè) redistribute 函數(shù)進(jìn)程。

    server = ThreadingTCPServer((args.host, int(args.port)), DispatcherHandler)
    print("serving on %s:%s" % (args.host, int(args.port)))

    ...

    runner_heartbeat = threading.Thread(target=runner_checker, args=(server,))
    redistribute = threading.Thread(target=redistribute, args=(server,))
    try:
        runner_heartbeat.start()
        redistribute.start()
        # 激活 server; 一直運(yùn)行直到
        # 用 Ctrl+C 或 Cmd+C 打斷程序
        server.serve_forever()
    except(KeyboardInterrupt, Exception):
        # 如果發(fā)生異常,則殺死進(jìn)程
        server.dead = True
        runner_heartbeat.join()
        redistribute.join()

runner_checker 函數(shù)會(huì)定期的 ping 每一個(gè)注冊(cè)的運(yùn)行器,來確保他們都處于正常工作的狀態(tài)。如果有運(yùn)行器沒有響應(yīng),該函數(shù)就會(huì)將其從注冊(cè)的運(yùn)行器池中刪除,并且之前分配給它的提交 ID 會(huì)被重新分配給一個(gè)新的可用的運(yùn)行器。函數(shù)會(huì)在 pending_commits 變量中記錄運(yùn)行受到運(yùn)行器失去響應(yīng)影響的提交ID。

    def runner_checker(server):
        def manage_commit_Lists(runner):
            for commit, assigned_runner in server.dispatched_commits.items():
                if assigned_runner == runner:
                    del server.dispatched_commits[commit]
                    server.pending_commits.append(commit)
                    break
            server.runners.remove(runner)

        while not server.dead:
            time.sleep(10)
            for runner in server.runners:               
                s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                try:
                    response = helpers.communicate(runner["host"], int(runner["port"]),
                        "ping")
                    if response != "pong":
                        print("removing runner %s" % runner)
                        manage_commit_Lists(runner)
                except socket.error as e:
                    manage_commit_Lists(runner)

redistribute 用來將 pending_commits 中記錄的提交 ID 進(jìn)行重新分配。redistribute 運(yùn)行時(shí)會(huì)不斷的檢查 pending_commits 文件,一旦發(fā)現(xiàn) pending_commits 中存在提交 ID,函數(shù)會(huì)調(diào)用 dispatch_tests 方法來分配這個(gè)提交 ID。

    def redistribute(server):
        while not server.dead:
            for commit in server.pending_commits:
                print "running redistribute"
                print server.pending_commits
                dispatch_tests(server, commit)
                time.sleep(5)

dispatch_tests 函數(shù)用來從已注冊(cè)的運(yùn)行器池中返回一個(gè)可用的運(yùn)行器。如果得到了一個(gè)可用的運(yùn)行器,函數(shù)會(huì)發(fā)送一個(gè)帶有提交 ID 的運(yùn)行測(cè)試指令。如果當(dāng)前沒有可用的運(yùn)行器,函數(shù)會(huì)在 2s 的休眠之后重復(fù)上述過程。如果分配成功了,函數(shù)會(huì)在 dispatched_commits 變量中記錄提交 ID 及該提交 ID 的測(cè)試正在由哪一個(gè)運(yùn)行器運(yùn)行。如果提交ID在 pending_commits 中, dispatch_tests 函數(shù)會(huì)在重新分配后將提交ID從 pending_commits 中刪除。

def dispatch_tests(server, commit_id):
    # NOTE: 通常我們不會(huì)永遠(yuǎn)運(yùn)行
    while True:
        print("trying to dispatch to runners")
        for runner in server.runners:
            response = helpers.communicate(runner["host"],
                int(runner["port"]), "runtest:%s" % commit_id)
            if response == "OK":
                print("adding id %s" % commit_id)
                server.dispatched_commits[commit_id] = runner
                if commit_id in server.pending_commits:
                    server.pending_commits.remove(commit_id)
                return
        time.sleep(5)

調(diào)度器服務(wù)用到了標(biāo)準(zhǔn)庫中的一個(gè)叫 SocketServer 的非常簡(jiǎn)單的網(wǎng)絡(luò)服務(wù)器模塊。SocketServer 模塊中有四種基本的服務(wù)器類型:TCP,UDP, UnixStreamServerUnixDatagramServer。為了保證我們的數(shù)據(jù)傳輸連續(xù)穩(wěn)定,我們使用基于 TCP 協(xié)議的套接字(UDP 并不能保證數(shù)據(jù)的穩(wěn)定和連續(xù))。

SocketServer 中提供的默認(rèn)的 TCPServer 最多只支持同時(shí)處理一個(gè)連接。所以當(dāng)調(diào)度器與一個(gè)運(yùn)行器建立連接之后,就無法再與監(jiān)聽器建立連接了。此時(shí)來自監(jiān)聽器的連接只能等待第一個(gè)連接完成并斷開才能建立與調(diào)度器的連接。這對(duì)于我們的項(xiàng)目而言并不是很理想,在我們預(yù)想中,調(diào)度器應(yīng)該直接而迅速的同時(shí)與所有運(yùn)行器及監(jiān)聽器進(jìn)行通信。

為了使調(diào)度器可以同時(shí)處理多個(gè)連接,我們使用了一個(gè)自定義的類 ThreadingTCPServer 來為默認(rèn)的 SocketServer 類增加多線程運(yùn)行的功能。也就是說無論何時(shí)調(diào)度器接收到連接請(qǐng)求,它都會(huì)新建一個(gè)線程來處理這個(gè)連接。這使調(diào)度器可以同時(shí)處理多個(gè)連接。

class ThreadingTCPServer(socketserver.ThreadingTCPServer):
    runners = [] # 測(cè)試運(yùn)行器池
    dead = False # 向其它線程指示是否還在運(yùn)行
    dispatched_commits = {} # 我們派遣的 commit
    pending_commits = [] # 尚未派遣的任務(wù)

調(diào)度器通過為每一個(gè)請(qǐng)求定義處理程序來工作。在繼承自 SocketServerBaseRequestHandler 類的 DispatcherHandler 類中定義了處理的方法。這個(gè)基類只要求我們定義一個(gè) handle 函數(shù),每當(dāng)有連接請(qǐng)求時(shí)就會(huì)調(diào)用它。我們將這個(gè)函數(shù)的寫在 DispatcherHandler中,并且確保在每一次請(qǐng)求到來時(shí),這個(gè)函數(shù)能夠被調(diào)用。這個(gè)函數(shù)會(huì)不斷地監(jiān)聽發(fā)來的請(qǐng)求(self.request 會(huì)攜帶請(qǐng)求信息),并解析請(qǐng)求中的指令。

class DispatcherHandler(socketserver.BaseRequestHandler):
    """
    調(diào)度器的 RequestHandler 類
    它會(huì)將傳入的 commit 派遣給測(cè)試運(yùn)行器并處理它們發(fā)過來的請(qǐng)求和
    測(cè)試結(jié)果。
    """

    command_re = re.compile(r"(\w+)(:.+)*")
    BUF_SIZE = 1024

    def handle(self):
        # self.request 是連接這個(gè)客戶端的 TCP socket
        self.data = self.request.recv(self.BUF_SIZE).decode('utf-8').strip()
        print('receive data: ' + self.data)
        command_groups = self.command_re.match(self.data)
        if not command_groups:
            self.request.sendall("Invalid command")
            return
        command = command_groups.group(1)

這個(gè)函數(shù)可以處理如下指令:status,registerdispatch,以及 results。其中 status 函數(shù)用來檢測(cè)調(diào)度器服務(wù)是否處于運(yùn)行狀態(tài)。

        if command == "status":
            print("in status")
            self.request.sendall("OK".encode('utf-8'))

為了讓調(diào)度器的功能生效,我們需要注冊(cè)至少一個(gè)測(cè)試運(yùn)行器。當(dāng) register 命令和一個(gè)主機(jī)端口對(duì)被調(diào)用時(shí),它將運(yùn)行器的信息存儲(chǔ)在一個(gè)列表,(運(yùn)行器對(duì)象會(huì)附件到 ThreadingTCPServer 對(duì)象中)。以便在以后需要為運(yùn)行器提供一個(gè)提交 ID 來運(yùn)行測(cè)試時(shí)與運(yùn)行器通信。

        elif command == "register":
            # 將測(cè)試運(yùn)行器添加到我們的池子中
            print("register")
            address = command_groups.group(2)
            host, port = re.findall(r":(\w*)", address)
            runner = {"host": host, "port": port}
            self.server.runners.append(runner)
            print(" runner %s has registered" % runner)
            self.request.sendall("OK".encode('utf-8'))

dispatch 由存儲(chǔ)庫監(jiān)聽器用于為提交分配測(cè)試運(yùn)行器。此命令的格式為 dispatch:<commit ID>。調(diào)度器從該消息中解析出提交 ID 并將其發(fā)送給測(cè)試運(yùn)行器。

        elif command == "dispatch":
            print("going to dispatch")
            commit_id = command_groups.group(2)[1:]
            if not self.server.runners:
                self.request.sendall("No runners are registered".encode('utf-8'))
            else:
                # The coordinator can trust us to dispatch the test
                self.request.sendall("OK".encode('utf-8'))
                dispatch_tests(self.server, commit_id)

results 指令會(huì)由測(cè)試運(yùn)行器在上報(bào)測(cè)試結(jié)果時(shí)調(diào)用。此命令的用法為 results:<commit ID>:<length of results data in bytes>:<results> 。<commit ID>用于標(biāo)識(shí)測(cè)試報(bào)告對(duì)應(yīng)的提交ID。<length of results data in bytes> 用于計(jì)算結(jié)果數(shù)據(jù)使用需要多大的緩沖區(qū)。最后,<results> 中是實(shí)際報(bào)告信息。

        elif command == "results":
            print("got test results")
            results = command_groups.group(2)[1:]
            results = results.split(":")
            commit_id = results[0]
            length_msg = int(results[1])
            # 3 代表發(fā)送命令中的冒號(hào)數(shù)量
            remaining_buffer = self.BUF_SIZE - (len(command) + len(commit_id) + len(results[1]) + 3)
            if length_msg > remaining_buffer:
                self.data += self.request.recv(length_msg - remaining_buffer).strip()
            del self.server.dispatched_commits[commit_id]
            if not os.path.exists("test_results"):
                os.makedirs("test_results")
            with open("test_results/%s" % commit_id, "w") as f:
                data = self.data.split(":")[3:]
                data = "\n".join(data)
                f.write(data)
            self.request.sendall("OK".encode('utf-8'))

測(cè)試運(yùn)行器(test_runner.py

測(cè)試運(yùn)行器負(fù)責(zé)對(duì)給定的提交 ID 運(yùn)行測(cè)試并上報(bào)測(cè)試結(jié)果。它僅與調(diào)度器通信,調(diào)度器負(fù)責(zé)為其提供需要運(yùn)行測(cè)試的提交ID,并接收測(cè)試結(jié)果。

test_runner.py 文件被調(diào)用后會(huì)首先調(diào)用 serve 函數(shù)以啟動(dòng)測(cè)試運(yùn)行器服務(wù),并啟動(dòng)一個(gè)線程來運(yùn)行 dispatcher_checker 函數(shù)。由于此啟動(dòng)過程與 repo_observer.pydispatcher.py 的啟動(dòng)過程非常相似,因此我們?cè)谶@里就不再贅述。

dispatcher_checker 函數(shù)每五秒對(duì)調(diào)度器執(zhí)行一次 ping 操作,以確保它仍然在正常運(yùn)行。這個(gè)操作主要是出于資源管理上的考慮。如果對(duì)應(yīng)的調(diào)度器已經(jīng)關(guān)閉,那么測(cè)試運(yùn)行器也會(huì)關(guān)閉。否則測(cè)試運(yùn)行器就只能空跑,無法接收新的任務(wù)也無法提交之前任務(wù)產(chǎn)生報(bào)告。

     def dispatcher_checker(server):
        # 檢查調(diào)度器是否出故障。如果出現(xiàn)問題,我們將關(guān)閉它
        # 當(dāng)調(diào)度器程序返回后它可能沒有相同的主機(jī)/端口
        while not server.dead:
            time.sleep(30) # 稍微改長(zhǎng)點(diǎn)
            if(time.time() - server.last_communication) > 10:
                try:
                    response = helpers.communicate(
                        server.dispatcher_server["host"],
                        int(server.dispatcher_server["port"]),
                        "status"
                    )
                    if response != "OK":
                        print("Dispatcher is no longer functional")
                        server.shutdown()
                        return
                except socket.error as e:
                    print("Can't communicate with dispatcher: %s" % e)
                    server.shutdown()
                    return

測(cè)試運(yùn)行器的服務(wù)與調(diào)度器相同都是 ThreadingTCPServer,它需要多線程運(yùn)行,因?yàn)檎{(diào)度器既會(huì)向它下發(fā)提交 ID,也可能在測(cè)試運(yùn)行的期間 ping 它是否在運(yùn)行狀態(tài)。

class ThreadingTCPServer(socketserver.ThreadingTCPServer):
    dispatcher_server = None # 保存調(diào)度服務(wù)器的 host/port 信息
    last_communication = None # 追蹤來自調(diào)度器的最后一次v通信
    busy = False # 狀態(tài)標(biāo)志
    dead = False # 狀態(tài)標(biāo)

整個(gè)通信流是從調(diào)度器向測(cè)試運(yùn)行器發(fā)送需要運(yùn)行測(cè)試的提交 ID 開始的。如果測(cè)試運(yùn)行器可以運(yùn)行測(cè)試,它會(huì)發(fā)送確認(rèn)消息響應(yīng)調(diào)度器,然后關(guān)閉第一個(gè)連接。為了使測(cè)試運(yùn)行器在運(yùn)行測(cè)試的同時(shí)還能接受來自調(diào)度器的請(qǐng)求,它會(huì)單獨(dú)啟動(dòng)一個(gè)線程來運(yùn)行測(cè)試。

這樣,當(dāng)調(diào)度器在測(cè)試運(yùn)行器正在運(yùn)行測(cè)試的時(shí)候發(fā)來一個(gè)請(qǐng)求(比如一個(gè) ping 請(qǐng)求),運(yùn)行器服務(wù)將在一個(gè)單獨(dú)的線程上作出響應(yīng),測(cè)試運(yùn)行器的測(cè)試在另一個(gè)線程上仍在運(yùn)行。這樣測(cè)試運(yùn)行器就可支持同時(shí)運(yùn)行多個(gè)任務(wù)了。還有一種替代多線程運(yùn)行的設(shè)計(jì)是在調(diào)度器與測(cè)試運(yùn)行器間建立一個(gè)長(zhǎng)連接。但這樣會(huì)在調(diào)度器端消耗大量的內(nèi)存來維持連接,另外這種方式還容易受網(wǎng)絡(luò)影響,比如突然的斷線。

測(cè)試運(yùn)行器會(huì)從調(diào)度器接收兩種消息。第一種是 ping 消息 ,調(diào)度器用這個(gè)消息來驗(yàn)證測(cè)試運(yùn)行器是否仍處于活躍狀態(tài)。

class TestHandler(SocketServer.BaseRequestHandler):
    ...

    def handle(self):
        ....
        if command == "ping":
            print("pinged")
            self.server.last_communication = time.time()
            self.request.sendall("pong".encode('utf-8'))

另一個(gè)是 runtest,它的格式是 runtest:<commit ID> 。這條指令用于分配器下發(fā)需要測(cè)試的提交 ID。當(dāng)接收到 runtest 時(shí),測(cè)試運(yùn)行器將檢查當(dāng)前是否有正在運(yùn)行的測(cè)試。如果有,它會(huì)給調(diào)度器返回 BUSY 的響應(yīng)。如果沒有,它會(huì)返回 OK,將其狀態(tài)設(shè)置為 busy 并運(yùn)行其 run_tests 函數(shù)。

        elif command == "runtest":
            print("got runtest command: am I busy? %s" % self.server.busy)
            if self.server.busy:
                self.request.sendall("BUSY".encode('utf-8'))
            else:
                self.request.sendall("OK".encode('utf-8'))
                print("running")
                commit_id = command_groups.group(2)[1:]
                self.server.busy = True
                self.run_tests(commit_id, self.server.repo_folder)
                self.server.busy = False

這個(gè)函數(shù)會(huì)調(diào)用一個(gè)叫 test_runner_script.sh 的 shell 腳本,該腳本會(huì)將存儲(chǔ)庫更新到給定的提交ID。腳本返回后,如果存儲(chǔ)庫已經(jīng)被成功的更新,運(yùn)行器會(huì)使用 unittest 運(yùn)行測(cè)試并將結(jié)果收集到一個(gè)文件中。測(cè)試運(yùn)行完畢后,測(cè)試運(yùn)行器將讀入結(jié)果報(bào)告文件,并將報(bào)告發(fā)送給調(diào)度器。

    def run_tests(self, commit_id, repo_folder):
        # update repo
        output = subprocess.check_output(["./test_runner_script.sh",
                                        repo_folder, commit_id])
        print(output)
        # 運(yùn)行測(cè)試
        test_folder = os.path.join(repo_folder, "tests")
        suite = unittest.TestLoader().discover(test_folder)
        result_file = open("results", "w")
        unittest.TextTestRunner(result_file).run(suite)
        result_file.close()
        result_file = open("results", "r")
        # 將結(jié)果發(fā)給調(diào)度器
        output = result_file.read()
        helpers.communicate(self.server.dispatcher_server["host"],
            int(self.server.dispatcher_server["port"]),
            "results:%s:%s:%s" % (commit_id, len(output), output))

test_runner_script.sh 的內(nèi)容如下 :

#!/bin/bash
REPO=$1
COMMIT=$2
source run_or_fail.sh
run_or_fail "Repository folder not found" pushd "$REPO" 1> /dev/null
run_or_fail "Could not clean repository" git clean -d -f -x
run_or_fail "Could not call git pull" git pull
run_or_fail "Could not update to given commit hash" git reset --hard "$COMMIT"

要運(yùn)行 test_runner.py ,必須將其指向存儲(chǔ)庫的副本。你可以使用我們先前創(chuàng)建的 /path/to/test_repo test_repo_clone_runner 副本作為啟動(dòng)參數(shù)。默認(rèn)情況下, test_runner.py將在 localhost 的 8900-9000 端口上啟動(dòng),并嘗試連接到 localhost:8888 上的調(diào)度服務(wù)器。你可以通過一些可選參數(shù)來更改這些值。--host--port 參數(shù)用于指定運(yùn)行測(cè)試運(yùn)行器服務(wù)器的地址和端口,--dispatcher-server 參數(shù)指定調(diào)度器的地址。

控制流程圖

下圖是該系統(tǒng)的概述圖。圖中假設(shè)所有三個(gè)文件( repo_observer.py , dispatcher.py和test_runner.py )都已在運(yùn)行,并描述了每個(gè)進(jìn)程在新的提交發(fā)生時(shí)所采取的操作。

diagram.png

運(yùn)行代碼

我們可以在本地運(yùn)行這個(gè)簡(jiǎn)單的 CI 系統(tǒng),為每個(gè)進(jìn)程使用不同的終端 shell。我們首先啟動(dòng)調(diào)度器,它默認(rèn)運(yùn)行在端口 8888 上:

$ python dispatcher.py

打開一個(gè)新的的 shell,我們啟動(dòng)測(cè)試運(yùn)行器(這樣它就可以在調(diào)度器中注冊(cè)了):

$ python test_runner.py <path/to/test_repo_clone_runner>

測(cè)試運(yùn)行器將自動(dòng)為自己分配端口,范圍為 8900-9000。你可以根據(jù)需求運(yùn)行多個(gè)測(cè)試運(yùn)行器。

最后,在另一個(gè)新 shell 中,讓我們啟動(dòng)代碼庫監(jiān)聽器:

$ python repo_observer.py --dispatcher-server=localhost:8888 <path/to/repo_clone_obs>

現(xiàn)在一切準(zhǔn)備就緒,讓我們觸發(fā)一些測(cè)試玩一下吧!根據(jù)設(shè)計(jì)我們需要?jiǎng)?chuàng)建一個(gè)新的提交來觸發(fā)測(cè)試。切換到你的主存儲(chǔ)庫中, 隨便改點(diǎn)什么:

$ cd /path/to/test_repo
$ touch new_file
$ git add new_file
$ git commit -m"new file" new_file

然后 repo_observer.py 識(shí)別到有一個(gè)新的提交產(chǎn)生了,之后通知調(diào)度器。你可以在它們各自的 shell 窗口中查看它們的運(yùn)行日志。當(dāng)調(diào)度器收到測(cè)試結(jié)果,它就會(huì)將它們保存在此存儲(chǔ)庫中的 test_results/ 文件夾中,并使用提交ID作為文件名。

錯(cuò)誤處理

該 CI 系統(tǒng)中包括一些簡(jiǎn)單的錯(cuò)誤處理。

如果你將 test_runner.py 進(jìn)程殺掉,dispatcher.py 能夠識(shí)別該運(yùn)行器已經(jīng)不再活躍,并將其從運(yùn)行器池中移除。

你也可以模擬網(wǎng)絡(luò)或系統(tǒng)故障,在測(cè)試運(yùn)行器執(zhí)行測(cè)試的時(shí)候?qū)⑺鼩⑺?。這時(shí),調(diào)度器會(huì)識(shí)別到運(yùn)行器已經(jīng)掛了,它會(huì)將掛掉的運(yùn)行器從運(yùn)行器池中移除,并將這個(gè)運(yùn)行器之前在執(zhí)行的任務(wù)分配給池中其它的運(yùn)行器。

如果你殺掉調(diào)度器,那么監(jiān)聽器會(huì)直接報(bào)錯(cuò)。測(cè)試運(yùn)行器也會(huì)發(fā)現(xiàn)調(diào)度器不再運(yùn)行,并自動(dòng)關(guān)閉。

總結(jié)

通過將關(guān)注點(diǎn)分解到各自的流程,我們構(gòu)建了一個(gè)分布式的持續(xù)集成系統(tǒng)的的基礎(chǔ)。通過套接字請(qǐng)求實(shí)現(xiàn)進(jìn)程間的通信,我們的 CI 系統(tǒng)可以分布式的運(yùn)行在不同的機(jī)器上,這增強(qiáng)了我們的系統(tǒng)可靠性和可擴(kuò)展性。

這套 CI 系統(tǒng)現(xiàn)在的功能仍然非常簡(jiǎn)單,你可以對(duì)它進(jìn)行各種擴(kuò)展以實(shí)現(xiàn)更多功能。以下是一些改進(jìn)建議。

每次提交運(yùn)行測(cè)試

當(dāng)前系統(tǒng)將定期檢查是否有新的提交并對(duì)最近的一次提交運(yùn)行測(cè)試。這個(gè)設(shè)計(jì)可以改為每次提交都觸發(fā)測(cè)試。你可以修改定期檢查程序,獲取在兩次輪詢中發(fā)生的所有提交來實(shí)現(xiàn)這個(gè)功能。

更智能的運(yùn)行器

如果測(cè)試運(yùn)行器檢測(cè)到調(diào)度器沒有響應(yīng),則它將停止運(yùn)行。當(dāng)測(cè)試運(yùn)行器正在運(yùn)行測(cè)試時(shí),也會(huì)立即關(guān)閉!如果測(cè)試運(yùn)行器可以有一段時(shí)的等待期或者長(zhǎng)期運(yùn)行(如果你并不在乎它對(duì)資源的占用)來等待調(diào)度器恢復(fù)可能會(huì)更好一些。這樣當(dāng)調(diào)度器恢復(fù)時(shí),運(yùn)行器既可以將之前執(zhí)行的測(cè)試的報(bào)告重新發(fā)回調(diào)度器。這樣可以避免因調(diào)度器故障而引起的重復(fù)任務(wù),在對(duì)每一個(gè)提交都執(zhí)行測(cè)試時(shí),這將很大程度上節(jié)約運(yùn)行器資源。

報(bào)告展示

在真正的 CI 系統(tǒng)中,測(cè)試報(bào)告一般會(huì)發(fā)送到一個(gè)單獨(dú)的收集結(jié)果的報(bào)告服務(wù)中。報(bào)告服務(wù)會(huì)將結(jié)果發(fā)送到某個(gè)地方供查看,或是設(shè)置一些通知規(guī)則,在遇到故障或其他一些特殊的情況下通知相關(guān)人員。你可以為我們的 CI 系統(tǒng)創(chuàng)建一個(gè)獨(dú)立的報(bào)告進(jìn)程,替換掉調(diào)度器的報(bào)告收集功能。這個(gè)新的進(jìn)程可以是一個(gè) Web 服務(wù)(或鏈接到一個(gè)Web服務(wù)上), 這樣我們就可以在網(wǎng)頁上直接在線查看測(cè)試報(bào)告,甚至可以用一個(gè)郵件服務(wù)器來實(shí)現(xiàn)測(cè)試失敗時(shí)的提醒。

測(cè)試運(yùn)行器管理器

在當(dāng)前的系統(tǒng)中,我們必須手動(dòng)運(yùn)行 test_runner.py 文件來啟動(dòng)測(cè)試運(yùn)行器。你可以創(chuàng)建一個(gè)測(cè)試運(yùn)行器管理器進(jìn)程,通過這個(gè)進(jìn)程來管理查看所有運(yùn)行器上的負(fù)載和來自調(diào)度器的請(qǐng)求,對(duì)運(yùn)行器的數(shù)量進(jìn)行相應(yīng)的調(diào)整。這個(gè)進(jìn)程會(huì)接受所有的測(cè)試任務(wù),根據(jù)任務(wù)啟動(dòng)測(cè)試運(yùn)行器,并在任務(wù)少的時(shí)候減少運(yùn)行器的實(shí)例。

遵循這些建議,你可以使這個(gè)簡(jiǎn)單的 CI 系統(tǒng)更加健壯并且容錯(cuò)率更高,并且具有與其他系統(tǒng)(比如一個(gè)網(wǎng)頁版的報(bào)告查看器)集成的能力。

如果你希望了解現(xiàn)在的持續(xù)集成系統(tǒng)可以實(shí)現(xiàn)到什么樣的靈活性,我建議你去看看 Jenkins,這是一個(gè)用 Java 編寫的非常強(qiáng)大的開源 CI 系統(tǒng)。它提供了一個(gè)基本的 CI 系統(tǒng),同時(shí)也允許使用插件進(jìn)行擴(kuò)展。你可以通過 GitHub 訪問其源代碼。另一個(gè)推薦的項(xiàng)目是Travis CI ,它是用 Ruby 編寫的,其源代碼也可以通過 GitHub 得到。

這是了解 CI 系統(tǒng)如何工作以及如何自己構(gòu)建 CI 系統(tǒng)的嘗試。現(xiàn)在你應(yīng)該對(duì)制作一個(gè)可靠的分布式系統(tǒng)所需的內(nèi)容有了更深入的了解,希望你可以利用這些知識(shí)開發(fā)更復(fù)雜的解決方案。


  1. 使用 bash 是因?yàn)槲覀冃枰獧z查文件是否存在、創(chuàng)建文件和使用 Git,而 shell 腳本是實(shí)現(xiàn)這一點(diǎn)最直接、最簡(jiǎn)單的方法?;蛘撸憧梢允褂每缙脚_(tái)的 Python 包;例如,Python 的 os 內(nèi)置模塊可以用于訪問文件系統(tǒng),GitPython 可以用于 Git 訪問,但它們執(zhí)行操作的方式?jīng)]那么直接。 ?

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容