壓力測(cè)試-Locust的一些常用類及方法

HttpUser類

使用此類時(shí),表示進(jìn)行壓力測(cè)試的Http虛擬用戶,每個(gè)用戶都獲得一個(gè)client屬性,可用于發(fā)出Http請(qǐng)求,使用getpost、put、delete等方法。在該類下,可以直接利用@task裝飾器聲明任務(wù),也可由tasks屬性定義。

@task裝飾器

它在HttpUser類中起到聲明任務(wù)的作用,可選擇設(shè)置權(quán)重,從而控制執(zhí)行率。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    @task(10)
    def one(self):
        self.client.get("/one")

    @task
    def two(self):
        self.client.get("/two")

上述腳本表示執(zhí)行one的概率是two的10倍

@tag裝飾器

@tag裝飾器用于標(biāo)識(shí)在運(yùn)行壓力測(cè)試時(shí)執(zhí)行什么任務(wù)。例如下方腳本中,啟動(dòng)測(cè)試時(shí)應(yīng)用--tags test參數(shù),則僅會(huì)執(zhí)行test任務(wù),不會(huì)執(zhí)行test2任務(wù)。

class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    @tag('test')
    @task
    def one(self):
        self.client.get("/one")
    @tag('test2')
    @task(2)
    def two(self):
        self.client.get("/two")

on_start/on_stop

開始/停止執(zhí)行所在任務(wù)列表時(shí)調(diào)用。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    def on_start(self):
        self.client.post("/about",{"username":"name","password":"password"})
        print("1111111")
    def on_stop(self):
        print("2222222")
    @task
    def test(self):
        print("55555555")

上述腳本表示,開始運(yùn)行時(shí)直接執(zhí)行on_start任務(wù),進(jìn)行過程中執(zhí)行test任務(wù),結(jié)束任務(wù)時(shí)執(zhí)行on_stop任務(wù)??捎糜诨诘卿洸僮鞯牧鞒虦y(cè)試。

weight

在一個(gè)文件中,建立多個(gè)HttpUser類時(shí),weight代表執(zhí)行該類下測(cè)試的可能性,數(shù)值越大,執(zhí)行時(shí)被選擇的可能性越高。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    weight=3
    ...
class startUser2(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    weight=1

執(zhí)行startUser1的概率是startUser2的3倍。

tasks

tasks屬性可以是列表,每次隨機(jī)選擇任務(wù);也可以是字典,按設(shè)置的比例執(zhí)行任務(wù)。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    @task
    def one(self):
        self.client.get("/one")
    def two(self):
        self.client.get("/two")
    tasks=[one,two]
    #tasks ={one:3,two:1} 
    #one及two代表已建立TaskSet任務(wù)實(shí)例,執(zhí)行可能性是two的3倍

TaskSet類

目前可在HttpUser中直接定義任務(wù),也可在TaskSet中定義壓力測(cè)試中用戶執(zhí)行的任務(wù)。

注意如果是繼承TaskSet定義任務(wù),則需要引入User類,而后在啟動(dòng)執(zhí)行User類或HttpUser類,否則只是聲明了任務(wù),執(zhí)行會(huì)報(bào)錯(cuò)No User Class Found

開始運(yùn)行時(shí),會(huì)從tasks屬性中選擇一個(gè)任務(wù),休眠wait_time定義的時(shí)間長度后,執(zhí)行下個(gè)任務(wù)。

interrupt(reschedule=True)

TaskSet可以嵌套,在tasks屬性中可以包含另一個(gè)TaskSet,如果需要中斷,則需要在子TaskSet中調(diào)用interrupt,中斷任務(wù)并將執(zhí)行控制移交給父TaskSet
如果rescheduleTrue(默認(rèn)),則父TaskSet將立即重新安排并執(zhí)行新任務(wù)。

from locust import  HttpUser,task,TaskSet,constant
class startUser1(TaskSet):
    wait_time = constant(1)
    @task
    def onetask(self):
        self.client.get("/")
        print("55555555")
    @task
    def stop(self):
        self.interrupt()

class startUser2(HttpUser):
    host='https://www.baidu.com'
    wait_time = constant(1)
    tasks = [startUser1]
    @task
    def twotask(self):
        self.client.get("/")
        print("33333")

SequentialTaskSet類

SequentialTaskSet類其實(shí)是TaskSet類,但該類下的任務(wù)是按順序執(zhí)行。

from locust import  User,task,SequentialTaskSet,constant
class startUser1(SequentialTaskSet):
    wait_time = constant(1)
    @task
    def fist_task(self):
        print("1")
    @task
    def second_task(self):
        print("2")
    @task
    def third_task(self):
        print("3")
class startUser2(User):
    wait_time = constant(1)
    tasks=[startUser1]
#執(zhí)行任務(wù)時(shí),會(huì)依次打印出1、2、3

手動(dòng)控制請(qǐng)求成功or失敗

通過使用catch_response參數(shù)(catch_response值為布爾類型,如果設(shè)置為True, 允許該請(qǐng)求被標(biāo)記為成功或失?。┖?with 語句,設(shè)定請(qǐng)求失敗or成功條件。

標(biāo)記失敗

雖然有時(shí)候最終接口訪問正常,但仍可設(shè)計(jì)一些條件,將其標(biāo)記為失敗。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    @task
    def one(self):
        with self.client.get("/",catch_response=True) as response:
            if response.elapsed.total_seconds() > 0.01:
                response.failure("Request took too long")

請(qǐng)求時(shí)長大于0.01秒,標(biāo)記失敗
elapsed表示從發(fā)送請(qǐng)求到響應(yīng)到達(dá)之間經(jīng)過的時(shí)間

標(biāo)記成功

有的時(shí)候雖然返回4**的狀態(tài)碼,但是是符合預(yù)期的,這個(gè)時(shí)候就需要我們主動(dòng)將其標(biāo)記為成功。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    @task
    def fifth_task(self):
        with self.client.get("/",catch_response=True) as response:
            if response.status_code==404:
                response.success()

明明是之前整理的知識(shí)點(diǎn),但是重新寫一次仍然用了很久....甚至比新寫的用的時(shí)間還久....再次復(fù)習(xí)了一次...但是自己已經(jīng)寫毛了....

自我記錄,有錯(cuò)誤歡迎指正~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容