Code
- ts.py
#!/usr/bin/env python
"""
FastDFS并发测试脚本
Usage::
$ python <me>.py 200 --show
"""
import functools
import itertools
import json
import os
import pickle
import sys
import time
from pathlib import Path
from typing import Callable, NoReturn, TypeVar
# pip install -U asynctor httpx rich fastdfs-client tqdm
import tqdm
from asynctor import bulk_gather, run, timeit
from asynctor.tasks import ThreadGroup
from httpx import AsyncClient
from rich import print
from fastdfs_client import FastdfsClient
T = TypeVar("T")
def catch_cost(func: Callable[..., T]) -> Callable[..., tuple[float, T]]:
@functools.wraps(func)
def wrapper(*args, **kw) -> tuple[float, T]:
start = time.time()
rv = func(*args, **kw)
cost = round(time.time() - start, 1)
return cost, rv
return wrapper
def catch_async_cost(func):
@functools.wraps(func)
async def wrapper(*args, **kw):
start = time.time()
rv = await func(*args, **kw)
cost = round(time.time() - start, 1)
print(rv)
return cost, rv
return wrapper
@timeit
async def show_result(output: Path, dfs: FastdfsClient) -> None:
"""展示上传结果,验证返回的URL"""
results = json.loads(output.read_bytes())
print("Upload result:")
print(results)
urls = [url for _, url in results]
if not (_nf := os.getenv("NO_FETCH")) or _nf == "0":
# 使用协程并发请求图片URL,验证是否能按预期拿到图片
async with AsyncClient(follow_redirects=True, timeout=80) as client:
checks = (client.get(i) for i in urls)
rs = await bulk_gather(checks, limit=50) # 同一时刻的并行协程数为50
print("URL concurrency result:\nidx\tstatus_code\telapsed\turl\tContentLength")
for i, r in enumerate(rs, 1):
print(
i,
r.status_code,
r.elapsed,
r.url,
len(r.content) if r.status_code == 200 else r.text,
)
else:
print(f"{len(results) = }")
if "-d" in sys.argv or "--delete" in sys.argv:
print("=" * 20)
await delete_all(urls, dfs)
@timeit
async def delete_all(urls: list[str], dfs: FastdfsClient) -> None:
# results = multi_threads_delete(urls, dfs)
results = await bulk_gather([catch_async_cost(dfs.delete)(url) for url in urls])
for res in results:
print(res)
print(f"total={len(results)}; success={sum(isinstance(i, tuple) for i in results)}")
def multi_threads_delete(urls, dfs):
"""使用多线程批量删除远程文件"""
with ThreadGroup(max_workers=30) as tg: # 控制并发线程数为50
for url in urls:
tg.soonify(catch_cost(dfs.delete_file))(url)
return tg.results
def abort(msg: str) -> NoReturn:
print(f"[red]ERROR:[/red] {msg}")
sys.exit(1)
def multi_threads_upload(client, total, images):
# 多线程并发上传文件
with ThreadGroup() as tg:
for index, p in tqdm.tqdm(zip(range(total), itertools.cycle(images))):
tg.soonify(catch_cost(client.upload_as_url))(p.read_bytes())
return tg.results
async def upload_many(client, total, images):
return await bulk_gather(
[
catch_async_cost(client.upload)(p.read_bytes())
for _, p in tqdm.tqdm(zip(range(total), itertools.cycle(images)))
]
)
@timeit
async def main() -> None:
total = 10
client = FastdfsClient(["dfs.waketzheng.top"])
if args := sys.argv[1:]:
if (a1 := args[0]).isdigit():
total = int(a1)
elif (p := Path(a1)).is_file():
await show_result(p, client)
return
else:
abort("Invalid argument `{a1}`! Must be int or filepath.")
d = Path.home() / "Pictures"
assert d.exists(), f"文件夹({d})不存在"
images = list(d.rglob("*.jp*g")) + list(d.rglob("*.JP*G"))
assert images, f"{d}中没有jpeg图片"
# results = multi_threads_upload(client, total, images)
results = await upload_many(client, total, images)
try:
res = json.dumps(results)
except TypeError:
print(results)
success = [i for i in results if isinstance(i, tuple)]
print(f"total={len(results)}; success={len(success)}")
p = Path("err.pickle")
size = p.write_bytes(pickle.dumps(results))
print(f"Failed to dump results: Write err info to {p} with {size=}")
res = json.dumps(success)
(p := Path("output.json")).write_text(res)
print(f"{total = }\nSave results to '{p}'.")
if "--show" in args:
await show_result(p, client)
if __name__ == "__main__":
run(main)
Usage
- Upload
python ts.py
- Delete uploaded files
python ts.py output.json -d
标签:trio,python,results,client,print,import,total,def
From: https://www.cnblogs.com/waketzheng/p/18303886