HttpUser類
使用此類時(shí),表示進(jìn)行壓力測(cè)試的Http虛擬用戶,每個(gè)用戶都獲得一個(gè)client屬性,可用于發(fā)出Http請(qǐng)求,使用get、post、put、delete等方法。在該類下,可以直接利用@task裝飾器聲明任務(wù),也可由tasks屬性定義。
@task裝飾器
它在HttpUser類中起到聲明任務(wù)的作用,可選擇設(shè)置權(quán)重,從而控制執(zhí)行率。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
@task(10)
def one(self):
self.client.get("/one")
@task
def two(self):
self.client.get("/two")
上述腳本表示執(zhí)行one的概率是two的10倍
@tag裝飾器
@tag裝飾器用于標(biāo)識(shí)在運(yùn)行壓力測(cè)試時(shí)執(zhí)行什么任務(wù)。例如下方腳本中,啟動(dòng)測(cè)試時(shí)應(yīng)用--tags test參數(shù),則僅會(huì)執(zhí)行test任務(wù),不會(huì)執(zhí)行test2任務(wù)。
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
@tag('test')
@task
def one(self):
self.client.get("/one")
@tag('test2')
@task(2)
def two(self):
self.client.get("/two")
on_start/on_stop
開始/停止執(zhí)行所在任務(wù)列表時(shí)調(diào)用。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
def on_start(self):
self.client.post("/about",{"username":"name","password":"password"})
print("1111111")
def on_stop(self):
print("2222222")
@task
def test(self):
print("55555555")
上述腳本表示,開始運(yùn)行時(shí)直接執(zhí)行on_start任務(wù),進(jìn)行過程中執(zhí)行test任務(wù),結(jié)束任務(wù)時(shí)執(zhí)行on_stop任務(wù)??捎糜诨诘卿洸僮鞯牧鞒虦y(cè)試。
weight
在一個(gè)文件中,建立多個(gè)HttpUser類時(shí),weight代表執(zhí)行該類下測(cè)試的可能性,數(shù)值越大,執(zhí)行時(shí)被選擇的可能性越高。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
weight=3
...
class startUser2(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
weight=1
執(zhí)行startUser1的概率是startUser2的3倍。
tasks
tasks屬性可以是列表,每次隨機(jī)選擇任務(wù);也可以是字典,按設(shè)置的比例執(zhí)行任務(wù)。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
@task
def one(self):
self.client.get("/one")
def two(self):
self.client.get("/two")
tasks=[one,two]
#tasks ={one:3,two:1}
#one及two代表已建立TaskSet任務(wù)實(shí)例,執(zhí)行可能性是two的3倍
TaskSet類
目前可在HttpUser中直接定義任務(wù),也可在TaskSet中定義壓力測(cè)試中用戶執(zhí)行的任務(wù)。
注意如果是繼承TaskSet定義任務(wù),則需要引入User類,而后在啟動(dòng)執(zhí)行User類或HttpUser類,否則只是聲明了任務(wù),執(zhí)行會(huì)報(bào)錯(cuò)No User Class Found。
開始運(yùn)行時(shí),會(huì)從tasks屬性中選擇一個(gè)任務(wù),休眠wait_time定義的時(shí)間長度后,執(zhí)行下個(gè)任務(wù)。
interrupt(reschedule=True)
TaskSet可以嵌套,在tasks屬性中可以包含另一個(gè)TaskSet,如果需要中斷,則需要在子TaskSet中調(diào)用interrupt,中斷任務(wù)并將執(zhí)行控制移交給父TaskSet。
如果reschedule為True(默認(rèn)),則父TaskSet將立即重新安排并執(zhí)行新任務(wù)。
from locust import HttpUser,task,TaskSet,constant
class startUser1(TaskSet):
wait_time = constant(1)
@task
def onetask(self):
self.client.get("/")
print("55555555")
@task
def stop(self):
self.interrupt()
class startUser2(HttpUser):
host='https://www.baidu.com'
wait_time = constant(1)
tasks = [startUser1]
@task
def twotask(self):
self.client.get("/")
print("33333")
SequentialTaskSet類
SequentialTaskSet類其實(shí)是TaskSet類,但該類下的任務(wù)是按順序執(zhí)行。
from locust import User,task,SequentialTaskSet,constant
class startUser1(SequentialTaskSet):
wait_time = constant(1)
@task
def fist_task(self):
print("1")
@task
def second_task(self):
print("2")
@task
def third_task(self):
print("3")
class startUser2(User):
wait_time = constant(1)
tasks=[startUser1]
#執(zhí)行任務(wù)時(shí),會(huì)依次打印出1、2、3
手動(dòng)控制請(qǐng)求成功or失敗
通過使用catch_response參數(shù)(catch_response值為布爾類型,如果設(shè)置為True, 允許該請(qǐng)求被標(biāo)記為成功或失?。┖?with 語句,設(shè)定請(qǐng)求失敗or成功條件。
標(biāo)記失敗
雖然有時(shí)候最終接口訪問正常,但仍可設(shè)計(jì)一些條件,將其標(biāo)記為失敗。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
@task
def one(self):
with self.client.get("/",catch_response=True) as response:
if response.elapsed.total_seconds() > 0.01:
response.failure("Request took too long")
請(qǐng)求時(shí)長大于0.01秒,標(biāo)記失敗
elapsed表示從發(fā)送請(qǐng)求到響應(yīng)到達(dá)之間經(jīng)過的時(shí)間
標(biāo)記成功
有的時(shí)候雖然返回4**的狀態(tài)碼,但是是符合預(yù)期的,這個(gè)時(shí)候就需要我們主動(dòng)將其標(biāo)記為成功。
from locust import HttpUser,constant
class startUser1(HttpUser):
host='https://www.***.com'
wait_time = constant(1)
@task
def fifth_task(self):
with self.client.get("/",catch_response=True) as response:
if response.status_code==404:
response.success()
明明是之前整理的知識(shí)點(diǎn),但是重新寫一次仍然用了很久....甚至比新寫的用的時(shí)間還久....再次復(fù)習(xí)了一次...但是自己已經(jīng)寫毛了....
自我記錄,有錯(cuò)誤歡迎指正~