首页 > 编程语言 >locust:Python 分布式压力测试(带WebUI)

locust:Python 分布式压力测试(带WebUI)

时间:2024-05-09 11:55:06浏览次数:22  
标签:task Python self locust -- WebUI time def

Locust 介绍

它采用纯 Python 实现,是一个分布式用户负载测试的工具。 使用基于 Requests 库的客户端发起请求,使编写脚本大大简化; 在模拟并发方面摒弃进程和线程,完全基于时间驱动,采用协程(gevent)提供的非阻塞 IO 和 coroutine 来实现网络层的并发请求。因此单台压力机也能产生数千并发请求数。

Locust官网

特点:

  1. 基于 python 的 Requests 和 Gevent , 脚本编写简单易读
  2. c/s 架构有一个简洁的用户界面,实时显示相关测试结果
  3. 支持分布式测试, 能跨平台易于扩展

安装及使用

安装

支持Python版本: 3.6, 3.7, 3.8

pip install -U locust==2.5.1
#pip install -U locust==2.5.1 -i https://pypi.tuna.tsinghua.edu.cn/simple

快速使用

  • locust 中通过 HttpUser 来表示一个用户,我们需要自定类继承自 HttpUser
  • HttpUser 的 client 属性可以模拟客户端请求,支持 get,post 等待方法,并且自动保持 session。(实际上他就是 requests 包的 session 对象)
    • client 的 get,post 等方法传递 url 地址时,不需要携带 http://www.example.com 这个前缀,只需要提供路径就行
  • wait_time 下一个任务执行之前等待的时间,用于模拟用户的思考时间,
    • wait_time = between(1, 5) 间隔是1-5秒随机值
    • wait_time = constant(1) 间隔固定1秒
  • host 表示服务器的地址,例如: http://www.baidu.com
  • 通过 @task 装饰一个方法,这个方法表示一个用户可能执行的动作,一个用户会有多种动作;
    • 一个用户下一步具体要执行什么动作,是随机的,也就是 locust 会从动作列表中随机选择一个
    • @task 接受一个参数,表示权重,数值越大,这个动作被选中的概率就越高
  • on_start 测试之前执行的操作
  • on_stop 测试结束执行的操作
from locust import HttpUser, task, between

# 相当于模拟一个用户
class WebUser(HttpUser):
    # 服务器的地址
    host = 'http://localhost:8000'

    # 下一个任务执行之前等待的时间,用于模式用户的思考时间
    wait_time = between(3, 5)  # 这里使用随机 3,5 秒钟

    # 定义一个测试任务,类似于一个取样器
    @task(1)
    def info(self):
        # 获取用户信息
        self.client.get('/info/')

    @task(10)
    def addresses(self):
        # 获取用户地址
        self.client.get('/addresses/')

    def on_start(self):
        # 测试之前执行的操作
        print('用户登录')
        auth = {'username': 'root', 'password': 'root1234'}
        self.client.post('/login/', json=auth)

    def on_stop(self):
        # 测试结束执行的操作
        print('用户退出')
        self.client.delete('/logout/')

启动locust

  • 在当前目录下, 可以直接运行locust 他会自动找到名为 locustfile.py 的文件
  • 为了子目录下或者不同级别目录下, 可以使用-f 命令启动 locust -f xxx/locustfile.py
# 直接运行
locust

# 指定文件
locust -f xxx/locustfile.py

然后打开浏览器, 直接在浏览器输入http://localhost:8089打开UI界面, 界面如下:

启动界面

测试报告

locust 无界面测试

locust -f locustfile.py --headless -u 100 -r 10 -t 10s

  • --headless 表示不启动界面
  • -u [数量] 设置虚拟用户数
  • -r [时间] 每秒钟启动虚拟用户数
  • -t [时间] 设置测试多长时间
    ----秒,数字后面加上 s ,例如 10s
    ----分钟,数字后面加上 m,例如 1m
    ----小时,数字后面加上 h,例如 1h

基本示例

from locust import User, events, task, between
import random


def success_call(name, recvText, total_time):
    events.request_success.fire(
        request_type="[Success]",
        name=name,
        response_time=total_time,
        response_length=len(recvText)
    )


def fail_call(name, total_time):
    events.request_failure.fire(
        request_type="[Fail]",
        name=name,
        response_time=total_time,
        response_length=0,
        exception=Exception(),
    )


class MyUser(User):
    wait_time = between(1, 5)

    @task(1)
    def job1(self):
        print('This is job 1')
        self.sendMsg("job1", 'This is job 2')

    @task(2)
    def job2(self):
        print('This is job 2')
        self.sendMsg("job2", 'This is job 2')

    def sendMsg(self, methodname, msg):
        num = random.randint(0, 2)
        time = random.randint(10, 50)
        if (num == 0):
            fail_call(methodname, time)
        else:
            success_call(methodname, msg, time);

执行

locust -f test4.py --autostart --autoquit 0 -u 1 -r 3 --run-time 10s

其中

  • request_type请求方式,这里为了方便理解,直接使用的[Success][Fail],来区分成功的请求和失败的请求,只有成功或失败的回调被调用后,locust才会对其统计。

注意:

所有的task不能是异步方法。

结果:

locust 分布式

运行多个进程的Locust, 使用 --master 命令启动主进程, 使用 --worker 启动从属进程

主进程控制从进程并收集从进程的测试结果;从进程负责执行测试,并反馈结果给主进程。

启动无界面主进程

locust -f locustfile.py --master --headless --master-bind-host=127.0.0.1 --master-bind-port=8090 -t 15s -u 5 -r 1
  • --master 指明当前为主进程
  • --master-bind-host=[ip 地址] 指明主进程绑定的地址
  • --master-bind-port=[端口号] 指明主进程的绑定的端口号

启动后,如果当前没有从进程的话会显示等待从进程。当从进程启动后便可以执行脚本了。

启动无界面的从进程:

locust -f locustfile.py --worker --master-host=127.0.0.1 --master-port=8090 --headless
  • --worker 指明当前为从进程
  • --master-host=[ip 地址] 指明需要连接的主进程的 ip 地址
  • --master-port=[端口号] 指明需要连接的主进程的端口

从进程启动后将开始执行测试,主进程将会统计结果。

动作集合 TaskSet

对于 web 网站来说,通常情况下,网站的一个页面中用户能够执行多个动作,并且这些动作通常是为了某个特定的目标,比如注册页面,用户可以执行的动作: 验证用户名,手机号码是否重复,更新图形验证码,获取手机验证码等。这些动作的目标就是为了完成用户注册。我们可以把这些动作组合在一起。

在 locust 中我们通过 TaskSet 类来组合多个动作,然后在 HttpUser 对象中引入 taskset ,引入方式是在 tasks 属性列表中,添加 taskset 类。

taskset 也是可以嵌套的,我们在 taskset 的 tasks 属性中添加要嵌套的其他 taskset 类。类似于: 网站首页包含用户登录页面,用户登录页面包含用户注册页面这种关系

  • 注意: taskset 相当于一个特殊的动作,如果 locust 进入到 taskset 中执行其中的动作,那么 locust 会一直执行 taskset 中的动作,那么 WebUser 中定义的动作就不会被执行。 需要在 taskset 中调用 self.ingterrupt() 来退出,一般可以定义一个动作,在这个动作中调用 self.interrupt()
import uuid

from locust import HttpUser, task, between, TaskSet

# 注册
class RegistTaskSet(TaskSet):
    # 用户名是否重复
    @task
    def username_count(self):
        username = 'zhangsan'
        self.client.get(f'/usernames/{username}/count/')

    # 手机号是否重复
    @task
    def mobile_count(self):
        mobile = '13712345678'
        self.client.get(f'/mobiles/{mobile}/count/')

    # 获取图形验证码
    @task
    def image_codes(self):
        image_code_id = str(uuid.uuid4())
        self.client.get(f'/image_codes/{image_code_id}/')

    @task
    def stop(self):
        print('退出当前动作集合')
        self.interrupt()

# 登录
class UserLoginTaskSet(TaskSet):
    # 嵌套用户注册
    tasks = [RegistTaskSet]

    @task
    def username_login(self):
        auth = {'username': 'root', 'password': 'root1234'}
        self.client.post('/login/', json=auth)

    @task
    def stop(self):
        print('退出当前动作集合')
        self.interrupt()

# 模拟用户
class WebUser(HttpUser):
    # 包含用户登录
    tasks = [UserLoginTaskSet]
    # 服务器的地址
    host = 'http://localhost:8000'

    # 下一个任务执行之前等待的时间,用于模式用户的思考时间
    wait_time = between(3, 5)  # 这里使用随机 3,5 秒钟

    @task
    def search(self):
        # 搜索
        self.client.get('/search/')

有序动作集合 SequentialTaskSet

我们在 HttpUser 以及 TaskSet 下定义动作后,locust 是从这些动作中随机选择一个动作来执行,有时候我们希望用户执行的动作是有序的,比如:

1.验证用户名是否重复
2.验证手机号码是否重复
3.验证获取图形验证码
4.获取手机验证码
5.注册
这时候就需要通过 SequentialTaskSet 来定义动作集合,他会按照定义 task 定义的顺序来一次调用。

  • 使用的方式和 TaskSet 一样,但是 task 装饰器的权重无效了
from locust import HttpUser, task, between, SequentialTaskSet

class RegistSeqTaskSet(SequentialTaskSet):
    # 用户名是否重复
    @task
    def username_count(self):
        print('验证用户名')
        username = 'zhangsan'
        self.client.get(f'/usernames/{username}/count/')

    # 手机号是否重复
    @task
    def mobile_count(self):
        print('验证手机号')
        mobile = '13712345678'
        self.client.get(f'/mobiles/{mobile}/count/')

    # 获取图形验证码
    @task
    def image_codes(self):
        print('获取图形验证码')
        image_code_id = str(uuid.uuid4())
        self.client.get(f'/image_codes/{image_code_id}/')

# 模拟用户
class WebUser(HttpUser):
    # 包含用户登录
    tasks = [RegistSeqTaskSet]
    # 服务器的地址
    host = 'http://localhost:8000'

    # 下一个任务执行之前等待的时间,用于模式用户的思考时间
    wait_time = between(3, 5)  # 这里使用随机 3,5 秒钟

断言

我们需要判断一个请求是失败还是成功,需要给请求参数携带 catch_response=True ,这样请求方法就会返回一个上下文管理器

这个上下文管理器返回的是响应对象,我们通过调用响应对象的 failure(消息) 来标记本次请求失败,

通过 catch_response=True 参数来进行断言。

from locust import HttpUser, task, between

class WebUser(HttpUser):
    wait_time = between(1, 5)
    host = 'http://localhost:8000'

    @task
    def info(self):
        with self.client.get('/info/', catch_response=True) as resp:
            res = resp.json()
            if res['code'] != 0:
                resp.failure(res['errmsg'])

WebSocket

压测WS前我们要先看看怎么连接WS

连接WS的库有的是支持异步IO的,项目中我们推荐这样的库,但是压测时还是要选择同步的库

异步

安装

pip install websockets

代码示例

import asyncio
import websockets
import json
import random

async def mytest():
    async with websockets.connect('wss://sockettest.xhkjedu.com/ws') as websocket:
        num = random.randint(0, 10000000)
        msg = {
            "b": {"num": num},
            "c": 123456,
        }

        msgstr = json.dumps(msg)

        await websocket.send(msgstr)
        print(f"↑: {msgstr}")

        greeting = await websocket.recv()
        print(f"↓: {greeting}")

asyncio.get_event_loop().run_until_complete(mytest())

同步

官网地址

https://pypi.org/project/websocket-client/

安装

pip install websocket-client

示例

from websocket import create_connection
import json
import random

ws = create_connection("wss://sockettest.xhkjedu.com/ws")

num = random.randint(0, 10000000)
msg = {
    "b": {"num": num},
    "c": 123456,
}

msgstr = json.dumps(msg)
print("Sending " + msgstr)
ws.send(msgstr)

result = ws.recv()
print("Received '%s'" % result)
ws.close()
import websocket

def on_message(ws, message):
    print(ws)
    print(message)

def on_error(ws, error):
    print(ws)
    print(error)

def on_close(ws):
    print(ws)
    print("### closed ###")

websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://127.0.0.1:8888/track",
                            on_message=on_message,
                            on_error=on_error,
                            on_close=on_close)

ws.run_forever()

Websocket压测

Jmeter要测试websocket接口,需要先下载安装一个websocket samplers by peter doornbosch的插件

而locust因为是代码实现,所以可以进行任何的测试,引用相应的库即可。

from locust import User, task, events
import time
from websocket import create_connection
import json
import random

def success_call(name, recvText, total_time):
    events.request_success.fire(
        request_type="[Success]",
        name=name,
        response_time=total_time,
        response_length=len(recvText)
    )

def fail_call(name, total_time, e):
    events.request_failure.fire(
        request_type="[Fail]",
        name=name,
        response_time=total_time,
        response_length=0,
        exception=e,
    )

class WebSocketClient(object):
    def __init__(self, host):
        self.host = host
        self.ws = None

    def connect(self, burl):
        self.ws = create_connection(burl)

    def recv(self):
        return self.ws.recv()

    def send(self, msg):
        self.ws.send(msg)

class WebsocketUser(User):
    abstract = True

    def __init__(self, *args, **kwargs):
        super(WebsocketUser, self).__init__(*args, **kwargs)
        self.client = WebSocketClient(self.host)
        self.client._locust_environment = self.environment

class ApiUser(WebsocketUser):
    host = "wss://sockettest.xhkjedu.com/"

    @task(1)
    def pft(self):
        # wss 地址
        self.url = 'wss://sockettest.xhkjedu.com/ws'
        print("连接前")
        start_time = time.time()
        try:
            self.client.connect(self.url)
            print("连接后")
            # 发送的订阅请求
            num = random.randint(0, 10000000)
            msg = {
                "b": {"num": num},
                "c": 123456,
            }

            msgstr = json.dumps(msg)

            self.client.send(msgstr)
            print(f"↑: {msgstr}")

            greeting = self.client.recv()
            print(f"↓: {greeting}")

        except Exception as e:
            total_time = int((time.time() - start_time) * 1000)
            fail_call("Send", total_time, e)
        else:
            total_time = int((time.time() - start_time) * 1000)
            success_call("Send", "success", total_time)

测试

locust -f main.py -u 5000 -r 300

网页上访问

http://localhost:8089/

或者不用图形化界面

locust -f main.py --autostart --autoquit 0 -u 1 -r 3 --run-time 10s

其中

  • --autostart 自动开始 不使用WebUI
  • --autoquit 0autostart搭配使用,测试完成后多长时间退出,后面的数字单位是秒,如果不设置只能CTRL+C才能退出
  • -u 1 最大用户数
  • -r 3 每秒创建的用户数,创建用户数和最大用户数一样后就不再创建
  • --run-time 10s 压测的执行时间

标签:task,Python,self,locust,--,WebUI,time,def
From: https://www.cnblogs.com/luckzack/p/18181824

相关文章

  • python适应不同场景的函数 - *args, **kwargs
    在Python中,*args和**kwargs是用来处理函数中不定数量的参数的特殊语法。*args和**kwargs介绍*args*args用于传递不定数量的位置参数,它会将传递给函数的位置参数收集到一个元组中。例如:defmy_func(*args):forarginargs:print(arg)my_func(1,2,3)在这个......
  • text-generation-webui 推理模型Qwen1.5-7B-Chat相关报错问题解决
    推理代码text-generation-webui推理模型Qwen1.5-7B-Chatsysinfo nvcc--versioncuda11.8importtorch>>>print(torch.__version__)1路径错误2依赖没安装ImportError:Thismodelingfilerequiresthefollowingpackagesthatwerenotfoundinyourenvironme......
  • python读写json文件
    1.新建json文件打开记事本,重命名为.json后缀使用的样例如下,注意看json文件格式:{"server":{"host":"example.com","port":443,"protocol":"https"},"authentication":{......
  • 解决 node-gyp 错误问题|node与python版本不匹配报错|node-gyp|vue
    项目要用到node-gyp(给爷爬)mac上没问题windows有问题看上面的日志,提到了python2.7看来这个模块用的上古时代的python2.7的技术windows的同学可以先安装python2这里下载:https://www.python.org/ftp/python/2.7/python-2.7.amd64.msi不要修改路径!!不要修改路径!!等......
  • python-利用ffmpeg快速提取视频的音频文件
    """利用ffmpeg快速提取视频的音频文件"""#参考"https://www.cnblogs.com/CodeAndMoe/p/13360011.htmlwindow安装ffmpeg并且设置环境变量https://zhuanlan.zhihu.com/p/118362010"importosdefmain(v_path):#v_path表示视频所在路径生成后的MP3音频文件会在该视频文件目录中......
  • Python字符串方法:字符串查找、替换、分割
    字符串查找Python提供了内置的字符串查找方法find(),利用该方法可以在一个较长的字符串中查找子字符串。如果该字符串中,有一个或者多个子字符串,则该方法返回第一个子串所在位置的最左端索引,若没有找到符合条件的子串,则返回-1。find()方法的基本使用语法如下:source_string.find(s......
  • python+opencv读取/保存图片
    读取图片导入opencv的包通过imread函数读取图片namedWindow创建一个窗口用于显示imshow将图片显示出来importcv2ascvsrc=cv.imread("D:\\myCode\\picture\\003.jpg")#参数为图片的路径#第一个参数会设置为窗口的标题同时他也会作为和下面imshow进行匹配,以确定将显......
  • [读书]-像计算机一样 思考python
    目录前述第二章:变量、表达式、和语句第三章:函数第五章:条件和递归第六章:有返回数值的函数第七章:迭代第八章:字符串第十章:列表第十一章:词典第十二章:tuple第十四章:文件14.2读和写14.3格式化字符串,两种方式14.4os模块14.5读写异常14.7pickle14.9模块相关14.10其他前述编程语......
  • 异常检测(Anomaly Detection)方法与Python实现
    异常检测(Anomalydetection)是机器学习中一种常见应用,其目标是识别数据集中的异常或不寻常模式。尽管通常被归类为非监督学习问题,异常检测却具有与监督学习相似的特征。在异常检测中,我们通常处理的是未标记的数据,即没有明确的标签指示哪些样本是异常的。相反,算法需要根据数据本身......
  • python在Scikit-learn中用决策树和随机森林预测NBA获胜者
    原文链接:http://tecdat.cn/?p=5222原文出处:拓端数据部落公众号 在本文中,我们将以Scikit-learn的决策树和随机森林预测NBA获胜者。美国国家篮球协会(NBA)是北美主要的男子职业篮球联赛,被广泛认为是首屈一指的男子职业篮球联赛在世界上。它有30个队(美国29个,加拿大1个)。在常规赛......