壓力測試-Locust的一些常用類及方法

HttpUser類

使用此類時,表示進行壓力測試的Http虛擬用戶,每個用戶都獲得一個client屬性,可用于發(fā)出Http請求,使用getpost、put、delete等方法。在該類下,可以直接利用@task裝飾器聲明任務,也可由tasks屬性定義。

@task裝飾器

它在HttpUser類中起到聲明任務的作用,可選擇設置權重,從而控制執(zhí)行率。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    @task(10)
    def one(self):
        self.client.get("/one")

    @task
    def two(self):
        self.client.get("/two")

上述腳本表示執(zhí)行one的概率是two的10倍

@tag裝飾器

@tag裝飾器用于標識在運行壓力測試時執(zhí)行什么任務。例如下方腳本中,啟動測試時應用--tags test參數(shù),則僅會執(zhí)行test任務,不會執(zhí)行test2任務。

class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    @tag('test')
    @task
    def one(self):
        self.client.get("/one")
    @tag('test2')
    @task(2)
    def two(self):
        self.client.get("/two")

on_start/on_stop

開始/停止執(zhí)行所在任務列表時調用。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    def on_start(self):
        self.client.post("/about",{"username":"name","password":"password"})
        print("1111111")
    def on_stop(self):
        print("2222222")
    @task
    def test(self):
        print("55555555")

上述腳本表示,開始運行時直接執(zhí)行on_start任務,進行過程中執(zhí)行test任務,結束任務時執(zhí)行on_stop任務??捎糜诨诘卿洸僮鞯牧鞒虦y試。

weight

在一個文件中,建立多個HttpUser類時,weight代表執(zhí)行該類下測試的可能性,數(shù)值越大,執(zhí)行時被選擇的可能性越高。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    weight=3
    ...
class startUser2(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    weight=1

執(zhí)行startUser1的概率是startUser2的3倍。

tasks

tasks屬性可以是列表,每次隨機選擇任務;也可以是字典,按設置的比例執(zhí)行任務。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    @task
    def one(self):
        self.client.get("/one")
    def two(self):
        self.client.get("/two")
    tasks=[one,two]
    #tasks ={one:3,two:1} 
    #one及two代表已建立TaskSet任務實例,執(zhí)行可能性是two的3倍

TaskSet類

目前可在HttpUser中直接定義任務,也可在TaskSet中定義壓力測試中用戶執(zhí)行的任務。

注意如果是繼承TaskSet定義任務,則需要引入User類,而后在啟動執(zhí)行User類或HttpUser類,否則只是聲明了任務,執(zhí)行會報錯No User Class Found。

開始運行時,會從tasks屬性中選擇一個任務,休眠wait_time定義的時間長度后,執(zhí)行下個任務。

interrupt(reschedule=True)

TaskSet可以嵌套,在tasks屬性中可以包含另一個TaskSet,如果需要中斷,則需要在子TaskSet中調用interrupt,中斷任務并將執(zhí)行控制移交給父TaskSet。
如果rescheduleTrue(默認),則父TaskSet將立即重新安排并執(zhí)行新任務。

from locust import  HttpUser,task,TaskSet,constant
class startUser1(TaskSet):
    wait_time = constant(1)
    @task
    def onetask(self):
        self.client.get("/")
        print("55555555")
    @task
    def stop(self):
        self.interrupt()

class startUser2(HttpUser):
    host='https://www.baidu.com'
    wait_time = constant(1)
    tasks = [startUser1]
    @task
    def twotask(self):
        self.client.get("/")
        print("33333")

SequentialTaskSet類

SequentialTaskSet類其實是TaskSet類,但該類下的任務是按順序執(zhí)行。

from locust import  User,task,SequentialTaskSet,constant
class startUser1(SequentialTaskSet):
    wait_time = constant(1)
    @task
    def fist_task(self):
        print("1")
    @task
    def second_task(self):
        print("2")
    @task
    def third_task(self):
        print("3")
class startUser2(User):
    wait_time = constant(1)
    tasks=[startUser1]
#執(zhí)行任務時,會依次打印出1、2、3

手動控制請求成功or失敗

通過使用catch_response參數(shù)(catch_response值為布爾類型,如果設置為True, 允許該請求被標記為成功或失敗)和 with 語句,設定請求失敗or成功條件。

標記失敗

雖然有時候最終接口訪問正常,但仍可設計一些條件,將其標記為失敗。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    @task
    def one(self):
        with self.client.get("/",catch_response=True) as response:
            if response.elapsed.total_seconds() > 0.01:
                response.failure("Request took too long")

請求時長大于0.01秒,標記失敗
elapsed表示從發(fā)送請求到響應到達之間經過的時間

標記成功

有的時候雖然返回4**的狀態(tài)碼,但是是符合預期的,這個時候就需要我們主動將其標記為成功。

from locust import HttpUser,constant
class startUser1(HttpUser):
    host='https://www.***.com'
    wait_time = constant(1)
    @task
    def fifth_task(self):
        with self.client.get("/",catch_response=True) as response:
            if response.status_code==404:
                response.success()

明明是之前整理的知識點,但是重新寫一次仍然用了很久....甚至比新寫的用的時間還久....再次復習了一次...但是自己已經寫毛了....

自我記錄,有錯誤歡迎指正~

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容