背景
当前的应用都使用了前后端分离的架构,前后端系统需要协同以实现各种功能。后端系统通常负责处理业务逻辑、数据存储和与其他服务的交互,而前端则负责用户界面和用户交互。而在前后端数据交互的过程中,各种异常和错误都有可能发生,确认异常发生时前后端系统的处理是否合理是测试验证中非常重要的一环。
在上一篇博客【Python】从同步到异步多核:测试桩性能优化,加速应用的开发和验证
中我们介绍了如何使用测试桩来隔离对环境的依赖,这次我们一起看看如何使用异常注入来应对联调中的异常场景。
使用iptables实现
在系统异常中,数据库连接失败、第三方服务不可用等都是比较典型的场景。常见的验证手段往往是前端的同学告知后台同学开启网络隔离,然后再进行验证。
利用iptables
丢弃某ip数据包
使用 -L 列出所有规则
具体操作:
$ iptables -L
Chain INPUT (policy ACCEPT)
target prot opt source destination
Chain FORWARD (policy ACCEPT)
target prot opt source destination
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
IP 连通性 通信 测试
# 检查发现能 是否能正常 ping通
$ ping {数据库/后端地址IP}
PING 10.71.170.100 (1.1.1.1) 56(84) bytes of data.
64 bytes from 1.1.1.*: icmp_seq=1 ttl=61 time=0.704 ms
64 bytes from 1.1.1.*: icmp_seq=2 ttl=61 time=0.802 ms
^C
--- 10.71.170.100 ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 1001ms
rtt min/avg/max/mdev = 0.704/0.753/0.802/0.049 ms
插入一条规则,丢弃此ip 的所有协议请求
$ iptables -I INPUT -p all -s {数据库/后端地址IP} -j DROP
列出所有规则
$ iptables -L
Chain INPUT (policy ACCEPT)
target prot opt source destination
DROP all -- 1.1.1.* anywhere
Chain FORWARD (policy ACCEPT)
target prot opt source destination
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
测试 丢弃规则内的IP 连通性
$ ping 1.1.1.*
PING 10.71.170.100 (1.1.1.*) 56(84) bytes of data.
^C
--- 10.71.170.100 ping statistics ---
85 packets transmitted, 0 received, 100% packet loss, time 84312ms
清除 规则列表的 限制
$ iptables -F
模拟ip进行丢包50%的处理。
iptables -I INPUT -s {后端IP} -m statistic --mode random --probability 0.5 -j DROP
上面这种方式能实现相关的联调,但是有两点可以改进的地方:
- 这个方式最大的限制是会影响所有的调用这个系统的测试人员。
- 需要人为的介入,每次都需要人为操作
那么有没有侵入更小,更优雅的异常场景验证方式呢?答案是肯定的。
mysql proxy 代理
更优雅的方式:写一个mysql proxy
代理,让后端svr 直接连接这个proxy,proxy再连接真实的mysql。
- 普通请求经过proxy时,proxy直接转发给真实mysql,并把mysql 的回包正常返回给调用端。
- 当收到某个关键字(
timeout
)时,直接断开TCP连接,模拟连接DB超时场景
proxy代码
import asyncio
# 真实mysqlIP端口
mysql_host = settings.MYSQL_HOST
mysql_port = settings.MYSQL_PORT
# 处理客户端连接
async def handle_connection(client_reader, client_writer):
# 连接到实际的 MySQL 服务器
mysql_reader, mysql_writer = await asyncio.open_connection(mysql_host, mysql_port)
# 转发握手包
handshake_packet = await mysql_reader.read(4096)
client_writer.write(handshake_packet)
await client_writer.drain()
# 处理客户端认证请求
auth_packet = await client_reader.read(4096)
mysql_writer.write(auth_packet)
await mysql_writer.drain()
# 转发认证响应
auth_response = await mysql_reader.read(4096)
client_writer.write(auth_response)
await client_writer.drain()
# 转发请求和响应
while True:
data = await client_reader.read(4096)
if not data:
break
sql_query = data[5:].decode('utf-8')
if "timeout" in sql_query: # sql 包含timeout 关键字时,直接关闭连接
await asyncio.sleep(1)
break
else:
mysql_writer.write(data)
await mysql_writer.drain()
response = await mysql_reader.read(4096)
client_writer.write(response)
await client_writer.drain()
# 关闭连接
client_writer.close()
mysql_writer.close()
async def main(host, port):
server = await asyncio.start_server(handle_connection, host, port)
async with server:
await server.serve_forever()
# 使用示例
if __name__ == "__main__":
# 本地监听 3307, svr 连接到3307
host = "0.0.0.0"
port = 3307
asyncio.run(main(host, port))
直接使用pymysql 测试
下面的代码会在执行到第二条select 语句时超时:
import pymysql
connection = pymysql.connect(
host="192.168.31.76",
port=8899,
# 真实mysql 账户信息
user="{user}",
password="{password}",
database="mydb",
)
curs = connection.cursor()
curs.execute("select * from user where name='bingo';")
print(curs.fetchall())
curs.execute("insert into user set name='bingtime';")
connection.commit()
curs.execute("select * from user where name='timeoutbingo';")
print(curs.fetchall())
curs.close()
connection.close()
Python 版本低于3.7
低版本的Python没有asyncio.run
和server.serve_forever()
需要修改main函数.
# 主函数,启动服务器并监听连接
async def main(host, port):
server = await asyncio.start_server(handle_connection, host, port)
try:
# Python 3.6.5 中没有 server.serve_forever() 方法,所以需要使用一个无限循环来保持服务器运行。
# 我们使用 asyncio.sleep() 来避免阻塞事件循环,使其可以处理其他任务,如新连接。
while True:
await asyncio.sleep(3600) # 1 hour
except KeyboardInterrupt:
pass
finally:
server.close()
await server.wait_closed()
# 使用示例
if __name__ == "__main__":
host = "0.0.0.0"
port = 3307
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main(host, port))
except KeyboardInterrupt:
pass
finally:
loop.close()
其他扩展
写一个proxy,监听来往的SQL 语句:
import pymysql
import socket
import threading
# 配置
SERVER_ADDRESS = (settings.MYSQL_HOST, settings.MYSQL_PORT) # 真实MySQL 服务器地址
PROXY_ADDRESS = ('0.0.0.0', 8899) # 监听代理服务器地址
def print_query(data):
try:
command = data[4]
if command == pymysql.constants.COMMAND.COM_QUERY:
query = data[5:].decode("utf-8")
print(f"SQL: {query}")
except Exception as e:
print(f"Error parsing packet: {e}")
def forward_data(src_socket, dst_socket, parser=None):
while True:
try:
data = src_socket.recv(4096)
if not data:
break
if parser:
parser(data)
dst_socket.sendall(data)
except OSError as e:
if e.errno == 9:
break
else:
raise
def main():
# 创建代理服务器套接字
proxy_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
proxy_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
proxy_socket.bind(PROXY_ADDRESS)
proxy_socket.listen(5)
print(f"Proxy server listening on {PROXY_ADDRESS}")
while True:
client_socket, client_address = proxy_socket.accept()
print(f"Client {client_address} connected")
# 连接到 MySQL 服务器
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.connect(SERVER_ADDRESS)
try:
# 创建线程转发客户端数据到服务器
client_to_server = threading.Thread(target=forward_data, args=(client_socket, server_socket, print_query))
client_to_server.start()
# 创建线程转发服务器数据到客户端
server_to_client = threading.Thread(target=forward_data, args=(server_socket, client_socket))
server_to_client.start()
# 等待线程完成
client_to_server.join()
server_to_client.join()
finally:
client_socket.close()
server_socket.close()
print(f"Client {client_address} disconnected")
if __name__ == "__main__":
main()
如果使用上面的pymysql测试代码执行,会打印如下的信息:
Client ('127.0.0.1', 57184) connected
SQL: SET AUTOCOMMIT = 0
SQL: select * from user where name='bingo';
SQL: insert into user set name='bing21211';
SQL: COMMIT
SQL: select * from user where name='bingo';
SQL: select * from user where name='bingo';
SQL: select * from user where name='timeoutbingo';
Client ('127.0.0.1', 57184) disconnected
总结
通过实现合适的异常处理机制,可以确保用户在遇到问题时获得有用的反馈,验证这些处理机制能提高系统的稳定性和安全性。iptables
功能强大但是需要手动操作,mysql proxy
代理功能直接,但是应用场景较为有限,大家可以根据实际情况进行选择。