-
集合点
要求系统能够承受1000 人同时提交数据,可以通过在提交数据操作前面加入集合点,这样当虚拟用户运行到提交数据的集合点时,就检查同时有多少用户运行到集合点,如果不到1000 人,已经到集合点的用户在此等待,当在集合点等待的用户达到1000 人时,1000 人同时去提交数据,从而达到测试计划中的需求。
注意:框架本身没有直接封装集合点的概念 ,间接通过gevent并发机制,使用gevent的锁来实现
-
方法一
semaphore是一个内置的计数器: 每当调用acquire()时,内置计数器-1 每当调用release()时,内置计数器+1 计数器不能小于0,当计数器为0时,acquire()将阻塞线程直到其他线程调用release()
两步骤:
- all_locusts_spawned 创建钩子函数
- 将locust实例挂载到监听器 events.spawning_complete.add_listener
- Locust实例准备完成时触发
import os from locust import HttpUser, TaskSet, task,between,events from gevent._semaphore import Semaphore all_locusts_spawned = Semaphore() all_locusts_spawned.acquire()# 阻塞线程 def on_hatch_complete(**kwargs): """ Select_task类的钩子方法 :param kwargs: :return: """ all_locusts_spawned.release() # # 创建钩子方法 events.spawning_complete.add_listener(on_hatch_complete) #挂在到locust钩子函数(所有的Locust示例产生完成时触发) n = 0 class UserBehavior(TaskSet): def login(self): global n n += 1 print("%s个虚拟用户开始启动,并登录"%n) def logout(self): print("退出登录") def on_start(self): self.login() all_locusts_spawned.wait() # 同步锁等待 @task(4) def test1(self): url = '/list' param = { "limit":8, "offset":0, } with self.client.get(url,params=param,headers={},catch_response = True) as response: print("用户浏览登录首页") @task(6) def test2(self): url = '/detail' param = { 'id':1 } with self.client.get(url,params=param,headers={},catch_response = True) as response: print("用户同时执行查询") @task(1) def test3(self): """ 用户查看查询结果 :return: """ url = '/order' param = { "limit":8, "offset":0, } with self.client.get(url,params=param,headers={},catch_response = True) as response: print("用户查看查询结果") def on_stop(self): self.logout() class WebsiteUser(HttpUser): host = 'http://www.baidu.com' tasks = [UserBehavior] wait_time = between(1, 2) if __name__ == '__main__': os.system("locust -f locustfile08.py")
-
方法二:
from locust import HttpUser, task, between, SequentialTaskSet class UserBehavior(SequentialTaskSet): @task def login(self): self.client.post("/login", {"username": "test", "password": "password"}) self.wait_between(1, 3) @task def view_product_list(self): self.client.get("/product_list") self.wait_between(1, 2) @task def choose_product(self): self.client.get("/product/1") self.wait_between(2, 3) @task def fill_address(self): self.client.post("/address", {'address': 'xxx'}) self.wait_between(3, 4) @task def checkout(self): self.client.post("/checkout", {'confirm': 'yes'}) self.wait_between(4, 5) def wait_between(self, min_wait, max_wait): self.wait_time = between(min_wait, max_wait) class WebsiteUser(HttpUser): tasks = [UserBehavior] wait_time = between(1, 3)
在这里,我们首先定义了一个名为UserBehavior的任务集合点类,其中定义了几个测试任务:登录、浏览商品列表、选择商品、填写收货地址、确认购买。在每个测试任务中,我们使用client发送HTTP请求,并设置适当的等待时间。
然后,我们定义了一个名为WebsiteUser的用户类,继承自HttpUser并设置了一个等待时间的范围。在这个用户类中,我们指定了使用UserBehavior任务集合点类作为任务,并设置并发用户数为100。在测试过程中,每个用户将按照UserBehavior类中定义的测试任务顺序依次执行,直至所有测试任务完成或测试时间到达。
控制台执行以下命令:
locust -f your_file_name.py --host=http://127.0.0.1:8000 --web-host=127.0.0.1 --users 100 --spawn-rate 10
在这里,我们指定了测试文件、目标网站URL、web UI地址,并设置并发用户数为100,每秒生成10个用户。测试过程中,我们可以在web UI界面上实时监控性能指标,以确定系统的瓶颈和性能问题。
标签:task,self,Locust,集合点,设置,between,def,wait From: https://www.cnblogs.com/shukeshu/p/17460674.html