首页 > 其他分享 >asyncio应用

asyncio应用

时间:2022-10-10 18:01:55浏览次数:60  
标签:__ get url 应用 time loop asyncio

#1

#asyncio 没有提供http协议的接口 aiohttp
'''
import asyncio
import socket
from urllib.parse import urlparse


async def get_url(url):
    # 通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    reader,writer=await asyncio.open_connection(host,80)
    writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
    all_lines=[]
    async for raw_line in reader:
        data=raw_line.decode('utf8')
        all_lines.append(data)
    html="\n".join(all_lines)
    return html


if  __name__=="__main__":
    import time
    start_time=time.time()
    loop=asyncio.get_event_loop()
    tasks=[]
    for url in range(20):
        url="http://shop.projectsedu.com/goods/{}/".format(url)
        tasks.append(get_url(url))
    loop.run_until_complete(asyncio.wait(tasks))
    print(time.time()-start_time)
'''
import asyncio
import socket
from urllib.parse import urlparse


async def get_url(url):
    #通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    #建立socket连接
    reader, writer= await asyncio.open_connection(host,80)
    writer.write("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))
    all_lines = []
    async for raw_line in reader:
        data = raw_line.decode("utf8")
        all_lines.append(data)
    html = "\n".join(all_lines)
    print(html)
    return html

async def main():
    tasks=[]
    for url in range(20):
        url = "http://shop.projectsedu.com/goods/{}/".format(url)
        tasks.append(asyncio.ensure_future(get_url(url)))
    for task in asyncio.as_completed(tasks):
        result=await task
        print(result)




# async def main():
#     tasks = []
#     for url in range(20):
#         url = "http://shop.projectsedu.com/goods/{}/".format(url)
#         tasks.append(asyncio.ensure_future(get_url(url)))
#     for task in asyncio.as_completed(tasks):
#         result = await task
#         print(result)

if __name__ == "__main__":
    # import time
    # start_time = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    # print(time.time() - start_time)

#2 asyncio lock

'''
total=0

async def add():
    #1.dosomething
    #2.io操作
    #1.dosomenthing3
    global total
    for i in range(1000):
        total+=1
async def desc():
    global total
    for i in range(1000):
        total -= 1
if __name__=="__main__":
    import asyncio
    tasks=[add(),desc()]
    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    print(total)
'''

import asyncio,aiohttp
from asyncio import Lock,Queue
cache={}
lock=Lock()

async def get_stuff(url):
    async with lock:
        if url in cache:
            return cache[url]
        stuff=await aiohttp.request ('GET',url)
        cache[url]=stuff
        return stuff
async def parse_stuff():
    stuff=await get_stuff()
    #do some parsing

async def use_stuff():

#3 asyncio scrapy

#asyncio 爬虫、去重、入库
#asyncio爬虫、去重、入库
import asyncio
import re

import aiohttp
import aiomysql
from pyquery import PyQuery

stopping = False
start_url = "http://www.jobbole.com/"
waitting_urls = []
seen_urls = set()

sem = asyncio.Semaphore(3)

async def fetch(url, session):
    async with sem:
        try:
            async with session.get(url) as resp:
                print("url status: {}".format(resp.status))
                if resp.status in [200, 201]:
                    data = await resp.text()
                    return data
        except Exception as e:
            print(e)

def extract_urls(html):
    urls = []
    pq = PyQuery(html)
    for link in pq.items("a"):
        url = link.attr("href")
        if url and url.startswith("http") and url not in seen_urls:
            urls.append(url)
            waitting_urls.append(url)
    return urls

async def init_urls(url, session):
    html = await fetch(url, session)
    seen_urls.add(url)
    extract_urls(html)

async def article_handler(url, session, pool):
    #获取文章详情并解析入库
    html = await fetch(url, session)
    seen_urls.add(url)
    extract_urls(html)
    pq = PyQuery(html)
    title = pq("title").text()
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            insert_sql = "insert into article_test(title) values('{}')".format(title)
            await cur.execute(insert_sql)

async def consumer(pool):
    async with aiohttp.ClientSession() as session:
        while not stopping:
            if len(waitting_urls) == 0:
                await asyncio.sleep(0.5)
                continue

            url = waitting_urls.pop()
            print("start get url: {}".format(url))
            if re.match('http://.*?jobbole.com/\d+/', url):
                if url not in seen_urls:
                    asyncio.ensure_future(article_handler(url, session, pool))
            else:
                if url not in seen_urls:
                    asyncio.ensure_future(init_urls(url, session))


async def main(loop):
    #等待mysql连接建立好
    pool = await aiomysql.create_pool(host='127.0.0.1', port=3306,
                                      user='root', password='root',
                                      db='aiomysql_test', loop=loop,
                                      charset="utf8", autocommit=True)

    async with aiohttp.ClientSession() as session:
        html = await fetch(start_url, session)
        seen_urls.add(start_url)
        extract_urls(html)

    asyncio.ensure_future(consumer(pool))


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    asyncio.ensure_future(main(loop))
    loop.run_forever()



'''


import asyncio
import re

import aiohttp
import aiomysql
from pyquery import PyQuery

start_url="http://www.jobbole.com"
waiting_urls=[]
seen_urls=set()

stopping=False
async def fetch(url,session):
    # async with aiohttp.ClientSession() as session:
    async with session.get(url) as resp:
        try:
            print('url status:{}'.format(resp.status))
            # print(resp.status)
            # print(await resp.text())
            if resp.status in [200,201]:
                data=await resp.text()
                return data
        except Exception as e:
            print(e)

def extract_urls(html):
    urls=[]
    pq=PyQuery(html)
    for link in pq.items("a"):
        url=link.attr("href")
        if url and url.startswith("http") and url not in seen_urls:
            urls.append(url)
            waiting_urls.append(url)
    return urls



async def init_urls(url,session):
    async with aiohttp.ClientSession() as session:
        html=await fetch(url,session)
        seen_urls.add(url)
        extract_urls(html)

async def article_handler(url,session,pool):
    #获取文章详情并解析入库

    html=await fetch(url,session)
    extract_urls(html)
    pq=PyQuery(html)
    title=pq("title").text()
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            await cur.execute("SELECT 42;")
            insert_sql="insert into article_test(title) values('{}')".format(title)
            await cur.execute(insert_sql)
            # print(cur.description)
            # (r,)=await cur.fetchone()
            # assert r==42


async def consumer(pool):
    while not stopping:
        if len(waiting_urls)==0:
            await asyncio.sleep(0.5)
            continue

        url=waiting_urls.pop()
        print("start get url:{}".format(url))
        if re.match('http://.*?jobbole.com/\d+/',url):
            if url not in seen_urls:
                asyncio.ensure_future(article_handler(url,session,pool))
                await asyncio.sleep(2)
        # else:
        #     if url not in seen_urls:
        #         asyncio.ensure_future(init_urls(url,session))


async def main(loop):
    #等待mysql连接建立好
    pool=await aiomysql.create_pool(host='127.0.0.1',port=3306,
                                    user='root',password='123',
                                    db='aiomysql_test',loop=loop,
                                    charset="utf8",autocommit=True)

    async with aiohttp.ClientSession() as session:
        html = await fetch(start_url, session)
        seen_urls.add(start_url)
        extract_urls(html)
    asyncio.ensure_future(consumer(pool))

        # asyncio.ensure_future(init_urls(start_url,session))
        # asyncio.ensure_future(consumer(pool))

if __name__=="__main__":
    loop=asyncio.get_event_loop()
    asyncio.ensure_future(main(loop))
    loop.run_forever()

'''

#4 call_test

"""
import asyncio

def callback(sleep_times):
    print("sleep{} success".format(sleep_times))

def stoploop(loop):
    loop.stop()
'''

#call_later
if __name__=="__main__":
    loop=asyncio.get_event_loop()
    # loop.call_soon(callback,2)
    # loop.call_soon(stoploop,loop)
    loop.call_later(2,callback,2)
    loop.call_later(1,callback,1)
    loop.call_later(3,callback,3)
    loop.call_soon(callback,2)
    loop.run_forever()
'''
if __name__=="__main__":
    loop=asyncio.get_event_loop()
    now=loop.time()
    # loop.call_soon(callback,2)
    # loop.call_soon(stoploop,loop)
    loop.call_at(now+2,callback,2)
    loop.call_at(now+1,callback,1)
    loop.call_at(now+3,callback,3)
    loop.call_soon(callback,4)
    loop.run_forever()
"""
import asyncio

def callback(sleep_times,loop):
    print("success {}".format(loop.time()))

def stoploop(loop):
    loop.stop()
'''

#call_later
if __name__=="__main__":
    loop=asyncio.get_event_loop()
    # loop.call_soon(callback,2)
    # loop.call_soon(stoploop,loop)
    loop.call_later(2,callback,2)
    loop.call_later(1,callback,1)
    loop.call_later(3,callback,3)
    loop.call_soon(callback,2)
    loop.run_forever()
'''
if __name__=="__main__":
    loop=asyncio.get_event_loop()
    now=loop.time()
    # loop.call_soon(callback,2)
    # loop.call_soon(stoploop,loop)
    loop.call_at(now+2,callback,2,loop)
    loop.call_at(now+1,callback,1,loop)
    loop.call_at(now+3,callback,3,loop)
    loop.call_soon(callback,4,loop)
    loop.run_forever()

#coroutine_nest   coroutine_nest

#1.run_until_complete
# import asyncio
# loop=asyncio.get_event_loop()
# loop.run_forever()
# loop.run_until_complete()

#1.loop会被放到future中
#2.取消future(task)
import asyncio
import time

# async def get_html(sleep_times):
#     print('waiting')
#     await asyncio.sleep(sleep_times)
#     print("done after {}s".format(sleep_times))
#
# if __name__=="__main__":
#     task1=get_html(3)
#     task2=get_html(3)
#     task3=get_html(3)
#
#     tasks=[task1,task2,task3]
#
#     loop=asyncio.get_event_loop()
#     try:
#         loop.run_until_complete(asyncio.wait(tasks))
#     except KeyboardInterrupt as e:
#         all_tasks=asyncio.Task.all_tasks()
#         for task in all_tasks:
#             print("cancel task")
#             print(task.cancel())
#         loop.stop()
#         # loop.run_forever()
#     finally:
#         loop.close()



import asyncio
import time

async def get_html(sleep_times):
    print("waiting")
    await asyncio.sleep(sleep_times)
    print("done after {}s".format(sleep_times))


if __name__ == "__main__":
    task1 = get_html(2)
    task2 = get_html(3)
    task3 = get_html(3)

    tasks = [task1, task2, task3]

    loop = asyncio.get_event_loop()

    try:
        loop.run_until_complete(asyncio.wait(tasks))
    except KeyboardInterrupt as e:
        all_tasks = asyncio.Task.all_tasks()
        for task in all_tasks:
            print("cancel task")
            print(task.cancel())
        loop.stop()
        loop.run_forever()
    finally:
        loop.close()

#loop_test

#事件循环+回调(驱动生成器)+epoll(IO多路复用)
#asyncio是python用于解决异步io编程的一整套解决方案
#tornmado,gevent,twisted(scrapy,django channels)
#tornado(实现web服务器),django+flask(uwsgi,gunicorn+nginx)
#tornado可以直接部署,nginx+tornado

#使用 asyncio
'''
import asyncio
import time ,datetime

async def get_html(url):
    print("start get url")
    # await asyncio.sleep(2)
    time.sleep(2)
    print("end get url")
if __name__=="__main__":
    start_time=time.time()
    loop=asyncio.get_event_loop()
    tasks=[get_html("http://www.baidu.com") for i in range(5)]
    # loop.run_until_complete(get_html("http://www.baidu.com"))
    loop.run_until_complete(asyncio.wait(tasks))
    print(time.time()-start_time)
'''

'''
#获取协程的返回值
import asyncio
import time ,datetime

async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    # time.sleep(2)
    return "bobby"
if __name__=="__main__":
    start_time=time.time()
    loop=asyncio.get_event_loop()
    # get_future=asyncio.ensure_future(get_html("http://www.baidu.com"))
    # loop.create_task()
    # loop.run_until_complete(asyncio.wait(get_future))
    # print(get_future.result())
    # print(time.time()-start_time)
    task=loop.create_task(get_html("http://www.baidu.com"))
    loop.run_until_complete(asyncio.wait(task))
    print(task.result())

'''
# import asyncio
# import time
# async def main():
#     print('hello')
#     await asyncio.sleep(1)
#     print('world')
# asyncio.run(main())
'''

import asyncio
import time

async def say_after(delay,what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f'started at {time.strftime("%X")}')
    await say_after(1,'hello')
    await say_after(2,'world')
    # await asyncio.sleep(1)
    print(f'finished at{time.strftime("%X")}')

asyncio.run(main())

'''
'''
import asyncio
import time

async def say_after(delay,what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    task1=asyncio.create_task(
        say_after(1,'hello')
    )
    task2=asyncio.create_task(
        say_after(2, 'world')
    )
    print(f'started at {time.strftime("%X")}')
    await task1
    await task2
    # await asyncio.sleep(1)
    print(f'finished at{time.strftime("%X")}')

asyncio.run(main())

'''

# import asyncio
# import time
# async def get_html(url):
#     print("start get url")
#     await asyncio.sleep(2)
#     print("end get url")
#
# if __name__ == "__main__":
#     start_time = time.time()
#     loop = asyncio.get_event_loop()
#     tasks = [get_html("http://www.imooc.com") for i in range(10)]
#     loop.run_until_complete(asyncio.wait(tasks))
#     print(time.time()-start_time)

#获取协程的返回值
# import asyncio
# import time
# from functools import partial
# async def get_html(url):
#     print("start get url")
#     await asyncio.sleep(2)
#     return "bobby"
#
# def callback(url, future):
#     print(url)
#     print("send email to bobby")
#
# if __name__ == "__main__":
#     start_time = time.time()
#     loop = asyncio.get_event_loop()
#     # get_future = asyncio.ensure_future(get_html("http://www.imooc.com"))
#     task = loop.create_task(get_html("http://www.imooc.com"))
#     task.add_done_callback(partial(callback, "http://www.imooc.com"))
#     loop.run_until_complete(task)
#     print(task.result())


# import asyncio
# import time ,datetime
# from functools import partial
# async def get_html(url):
#     print("start get url")
#     await asyncio.sleep(2)
#     # time.sleep(2)
#     return "bobby"
# def callback(url,future):
#     print(url)
#     print("send mail to bobby")
#
# if __name__=="__main__":
#     start_time=time.time()
#     loop=asyncio.get_event_loop()
#     # get_future=asyncio.ensure_future(get_html("http://www.imooc.com"))
#     # loop.run_until_complete(get_future)
#     # print(get_future.result())
#     # print(time.time()-start_time)
#     task=loop.create_task(get_html("http://www.baidu.com"))
#     task.add_done_callback(partial(callback,"http://www.imooc.com"))
#     loop.run_until_complete(task)
#     print(task.result())




#await 和 gather
import asyncio
import time

async def get_html(url):
    print("start get url")
    await asyncio.sleep(2)
    print("end get url")


if __name__ == "__main__":
    start_time = time.time()
    loop = asyncio.get_event_loop()
    # tasks = [get_html("http://www.imooc.com") for i in range(10)]
    # loop.run_until_complete(asyncio.gather(*tasks))
    # print(time.time()-start_time)
    # gather和wait的区别
    # gather更加high-level
    group1=[get_html("http://projectsedu.com") for i in range(2)]
    group2=[get_html("http://www.imooc.com") for i in range(3)]
    group1=asyncio.gather(*group1)
    group2=asyncio.gather(*group2)
    group2.cancel()
    loop.run_until_complete(asyncio.gather(group1,group2))
    print(time.time()-start_time)

#hread_asyncio

#使用多线程:在携程中集成阻塞io
import asyncio
import socket
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse


def get_url(url):
    #通过socket请求html
    url = urlparse(url)
    host = url.netloc
    path = url.path
    if path == "":
        path = "/"

    #建立socket连接
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    # client.setblocking(False)
    client.connect((host, 80)) #阻塞不会消耗cpu

    #不停的询问连接是否建立好, 需要while循环不停的去检查状态
    #做计算任务或者再次发起其他的连接请求

    client.send("GET {} HTTP/1.1\r\nHost:{}\r\nConnection:close\r\n\r\n".format(path, host).encode("utf8"))

    data = b""
    while True:
        d = client.recv(1024)
        if d:
            data += d
        else:
            break

    data = data.decode("utf8")
    html_data = data.split("\r\n\r\n")[1]
    print(html_data)
    client.close()

if __name__ == "__main__":
    import time
    start_time=time.time()
    loop=asyncio.get_event_loop()
    executor=ThreadPoolExecutor()
    tasks=[]
    for url in range(20):
        url="http://shop.projectsedu.com/goods/{}/".format(url)
        task=loop.run_in_executor(executor,get_url,url)
        tasks.append(task)
    loop.run_until_complete(asyncio.wait(tasks))
    print(time.time()-start_time)

 

标签:__,get,url,应用,time,loop,asyncio
From: https://www.cnblogs.com/mengdie1978/p/16776663.html

相关文章

  • 大数据采集技术工具及应用场景
       大数据采集可以细分为数据抽取、数据清洗、数据集成、数据转换等过程,将分散、零乱、不统一的数据整合到一起,以一种结构化、可分析的形态加载到数据仓库中,从而为后......
  • web安全 应用表单密码类型输入启用了自动完成操作
    https://blog.csdn.net/CHS007chs/article/details/52525326在web应用form表单中,如果input标签没有指定“autocomplete”属性为“off”,则“autocomplete”的属性会自动默......
  • Fast.Framework(ORM) 应用
    前言 有小伙伴让我出一个应用到框架的博客,我抽空花了点时间,大致的项目搭建起来,为了大家更好的理解采用经典的三层架构。了解CQRS架构的小伙伴也可以自己尝试一下。演示......
  • 【AGC+FPGA】基于FPGA的数字AGC自适应增益设计,应用在BPSK调制解调系统中
           AGC测试,这里我们主要通过产生一个信号,输入到AGC中,来分析AGC的工作效果,其仿真结果如下图所示: 这里,我们使用测试信号的时候,通过输入一个正弦信号,实现AGC的功......
  • 小波变换在图像分割中的应用
    1.1 空域图像分割空域是指图像平面本身,空域图像分割就是直接对图像的像素进行处理分割。研究者经过几十年的研究与努力,研究出了很多种空域图像分割方法。归纳起来大致包括......
  • Matlab在双边带调幅(DSB-SC)和解调的应用
    调制和解调单边带调制和解调的方法有多种,其中最常用的是滤波法。用滤波法实现单边带调制,是分双边带信号形成和无用边带抑制两步完成的。双边带信号由平衡调制器形成。由于调......
  • 光纤温度传感器在电力系统的应用
    温度是电力设备的重要运行参数,通过监测电力设备温度信息获取电力设备的运行状况是电力系统故障预报与诊断的研究热点,研究内容包括各种新型的温度传感器的应用、电力设备的故......
  • 实战 | 实时的目标检测与识别简单应用
    吃粽子迎端午计算机视觉研究院专栏作者:Edison_G最近总是有很多入门的朋友问我,我进入计算机视觉这个领域难不难?是不是要学习很多知识?到底哪个方向比较好?长按扫描二维码关注我......
  • SQL操作:WITH表达式及其应用
    SQL标准1999中,在传统SQL语法的基础上增加了with表达式的使用,使得SQL语句的编程可以更加灵活和具备可扩展性。本文将围绕with,以及更高阶的withrecursive表达式介绍其语法特......
  • DFP 数据转发协议应用实例 3.LoRA 中继-使用频道实现
    DFP数据转发协议应用实例 3.LoRA中继-使用频道实现DFP是什么?稳控科技编写的一套数据转发规则,取自“自由转发协议FFP(FreeForwardProtocol)”,或者DFP(DoubleFProt......