# *_*coding:utf-8 *_* # @Author : zyb # HttpUser :保持会话。FastHttpUser:高性能 from locust import TaskSet, task, HttpUser,FastHttpUser, between, constant_throughput, constant_pacing, constant #argument_parser 这个可以修改执行参数的 from gevent._semaphore import Semaphore all_locust_spawend = Semaphore() all_locust_spawend.acquire() #@seq_task(2)顺序执行,1-->10 1先执行 #依赖调用首要执行的函数执行后调用下面的方法 #self.schedule_task('依赖的task') #定义任务集类 class proBehavior(TaskSet): def on_start(self): #添加集合点、登录等场景,只执行一次 print('pro登录开始') # all_locust_spawend.wait(20) @task def my_task_one(self): url = '/pro/' #catch_response=True,允许请求标记为失败 with self.client.get(url=url,name='pro采购信息', verify=False,timeout=10,catch_response=True) as response: if response.status_code==200: response.success() else: response.failure("Failed!") @task def my_task_two(self): url = '/users/' # catch_response=True,允许请求标记为失败 with self.client.get(url=url, name='pro获取用户', verify=False, timeout=10, catch_response=True) as response: if response.status_code == 200: response.success() else: response.failure("Failed!") def on_stop(self): print('pro结束后每个用户执行一次') #定义用户类 class WebsiteUser(FastHttpUser): tasks = [proBehavior] wait_time = between(1, 5) # 恒定吞吐量无论任务执行时间如何,任务将始终每10秒执行一次: # wait_time = constant_throughput(0.1) if __name__ == "__main__": import os file_path = os.path.abspath(__file__) os.system(f"locust -f {file_path} --host=http://127.0.0.1:8000") os.system(f"locust -f --host=http://127.0.0.1:8000 --no-web -c 1000 -r 100 --run-time 1h30m --stop-timeout 99") #分布式 #主机: --mastert #执行机: --slave --mastert-host=主机ip地址 #命令行执行参数: # -f xxx.py:执行文件 # --host=https://www.xx.com :执行域名 # --web-host=0.0.0.0 :web界面的域名 # --no-web:不使用web界面 # -c:用户数 # -r:每秒孵化率(每秒启动数) # --expect-slaves:执行机数量 # --run-time:运行时间:(h:小时,m:分钟,s:秒) # --csv=:保存执行结果
标签:task,--,demo,self,locust,执行,response From: https://www.cnblogs.com/Mr-Simple001/p/18598874