首页 > 编程语言 >使用Python协程并发测试cdn响应速度

使用Python协程并发测试cdn响应速度

时间:2023-11-22 16:27:14浏览次数:41  
标签:协程 Python cdn url ui urls import swagger httpx

代码干净清爽才能看着赏心悦目:

#!/usr/bin/env python3.11
import time
from contextlib import contextmanager
from enum import StrEnum

import anyio
import httpx


@contextmanager
def timeit(msg: str):
    start = time.time()
    yield
    cost = time.time() - start
    print(msg, f"{cost = }")


class CdnHost(StrEnum):
    jsdelivr = "https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9.0/swagger-ui.css"
    unpkg = "https://unpkg.com/swagger-ui-dist@5.9.0/swagger-ui.css"
    cloudflare = (
        "https://cdnjs.cloudflare.com/ajax/libs/swagger-ui/5.9.0/swagger-ui.css"
    )


async def find_fastest_host(loop_interval=0.1) -> str:
    urls = list(CdnHost)
    results = [None] * len(urls)

    async def fetch(httpx_client, url, index):
        try:
            r = await httpx_client.get(url)
        except (httpx.ConnectError, httpx.ReadError):
            ...
        else:
            print(f"{url = }\n{r.elapsed = }")
            if r.status_code < 300:
                results[index] = r.content

    async with (anyio.create_task_group() as tg, httpx.AsyncClient() as client):
        for i, url in enumerate(urls):
            tg.start_soon(fetch, client, url, i)
        for _ in range(int(5 / loop_interval) + 1):
            if any(r is not None for r in results):
                tg.cancel_scope.cancel()
                break
            await anyio.sleep(0.1)
    for url, res in zip(urls, results):
        if res is not None:
            return url
    return urls[0]


async def main():
    with timeit("Sniff hosts"):
        url = await find_fastest_host()
    print("result:", url)


if __name__ == "__main__":
    anyio.run(main)

执行结果:

url = <CdnHost.unpkg: 'https://unpkg.com/swagger-ui-dist@5.9.0/swagger-ui.css'>
r.elapsed = datetime.timedelta(seconds=1, microseconds=418428)
Sniff hosts cost = 1.4775612354278564
result: https://unpkg.com/swagger-ui-dist@5.9.0/swagger-ui.css

标签:协程,Python,cdn,url,ui,urls,import,swagger,httpx
From: https://www.cnblogs.com/waketzheng/p/17849583.html

相关文章

  • python wordcloud生成词云
    #!/usr/bin/envpython#coding:utf-8#pipinstallwordcloud#pipinstallmatplotlibimportwordcloudimportmatplotlib.pyplotaspltimportnumpyasnpfromPILimportImagetext="""给你一瓶魔法药水喝下去就不需要氧气给你一瓶魔法药水喝下去就不怕身体......
  • Python全局解释器锁GIL机制
    全局解释器锁GlobalInterpreterLock,CPython在解释器级别的一把锁,叫GIL全局解释器锁。程序编译成字节码,程序想跑多线程,但是GIL保证CPython进程中,同一时刻只能有一个线程执行字节码。所以,哪怕是在多CPU的情况下,即使每个线程恰好调度到了每个CPU上,有了这把大锁,同时只能有一个CPU......
  • 《最新出炉》系列初窥篇-Python+Playwright自动化测试-32-JavaScript的调用执行-下篇
    1.简介 在实际工作中,我们需要对处理的元素进行高亮显示,或者有时候为了看清楚操作过程和步骤我们需要跟踪鼠标点击了哪些元素需要标记出来。虽然很少遇到,但是为了以后大家可以参考或者提供一种思路,今天宏哥就在这里把这种测试场景playwright是如何处理的讲解和分享一下。2.用法......
  • [951] Understanding the pattern of "(.*?)" in Python's re package
    InPython'sregularexpressions, (.*?)isacapturinggroupwithanon-greedyquantifier. Let'sbreakdownthecomponents:(and ):Parenthesesareusedtocreateacapturinggroup.Thisallowsustocaptureaportionofthematchedtext..*?:......
  • [950] Python RegEx (re library)
    ref:PythonRegExARegEx,orRegularExpression,isasequenceofcharactersthatformsasearchpattern.RegExcanbeusedtocheckifastringcontainsthespecifiedsearchpattern.RegExModulePythonhasabuilt-inpackagecalled re,whichcanbeu......
  • 19.python 创建一个本地web服务器
    编写一个server.py文件1importhttp.server2importsocketserver34PORT=800056Handler=http.server.SimpleHTTPRequestHandler78withsocketserver.TCPServer(("",PORT),Handler)ashttpd:9print("Serverstartedatlocalhos......
  • 聪明办法学python
    数据类型:整数(int)         浮点数(float)         布尔值(bool):truefalse         类型(type):print(type(2)) print(type(2.2))  print(type(2<2.2))         isinstance(a,int)常数:truefalsenone......
  • 大数据开发要学什么java还是python?
    在大数据开发领域,Java和Python都是备受青睐的编程语言。它们分别具有各自独特的特点和优势,在大数据处理方面也有不同的应用场景。以下是对Java和Python在大数据开发中的应用、优势以及学习建议的详细描述。Java在大数据开发中的应用和优势1.应用场景Hadoop生态圈:Java广泛......
  • Python在使用pandas时内存使用过大导致服务器宕机,有哪些优化方法?
    当使用pandas处理大规模数据时,内存使用量可能会迅速增加,导致服务器宕机。为了解决这个问题,可以采用以下几个优化方法:数据类型优化:使用更小的数据类型,例如将int64转换为int32或int16,节省内存空间。对于字符串类型,尽量使用'category'类型,它会使用更少的内存。分块处理:使......
  • python+pytest写测试用例后置清理数据操作
    一、teardown_function函数是为了在每个测试函数def执行后进行数据清理。#引入DbConnect类或者确保它已经被定义fromyour_db_moduleimportDbConnectdefteardown_function():try:print("后置操作-做数据清理,把批注通知删掉")db......