首页 > 其他分享 >Day 18 18.1 并发爬虫之协程实现

Day 18 18.1 并发爬虫之协程实现

时间:2023-03-22 20:55:47浏览次数:42  
标签:之协程 task 18 print time 18.1 async page asyncio

并发爬虫之协程实现

  • 协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。

  • 协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:

  • 协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。

【1】yield与协程


def foo():
    print("OK1")
    yield 100  # 切换: 保存/恢复的功能
    print("OK2")
    yield 1000


def bar():
    print("OK3")
    yield 200
    print("OK4")
    yield 2000


gen = foo()
ret = next(gen)    # gen.__next__()
print(ret)

gen2 = bar()
ret2 = next(gen2)  # gen.__next__()
print(ret2)

ret = next(gen)    # gen.__next__()
print(ret)

ret2 = next(gen2)  # gen.__next__()
print(ret2)

【2】asyncio模块

  • asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。

  • 为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法asyncawait,可以让coroutine的代码更简洁易读。

  • asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

  • asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。

import asyncio


async def task(i):
    print(f"task {i} start")
    await asyncio.sleep(1)
    print(f"task {i} end")


# 创建事件循环对象
loop = asyncio.get_event_loop()
# 直接将协程对象加入时间循环中
tasks = [task(1), task(2)]
# asyncio.wait:将协程任务进行收集,功能类似后面的asyncio.gather
# run_until_complete阻塞调用,直到协程全部运行结束才返回
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
  • async: 定义一个方法(函数),这个方法在后面的调用中不会被立即执行而是返回一个协程对象;

  • coroutine: 协程对象,也可以将协程对象添加到时间循环中,它会被事件循环调用;

  • event_loop: 事件循环,相当于一个无限循环,可以把一些函数添加到这个事件中,函数不会立即执行, 而是满足某些条件的时候,函数就会被循环执行;

  • await: 用来挂起阻塞方法的执行;

  • task: 任务,对协程对象的进一步封装,包含任务的各个状态;asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数loop.create_task() 或 asyncio.ensure_future()创建。

import asyncio, time


async def work(i, n):  # 使用async关键字定义异步函数
    print('任务{}等待: {}秒'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print('任务{}在{}秒后返回结束运行'.format(i, n))
    return i + n


start_time = time.time()  # 开始时间

tasks = [asyncio.ensure_future(work(1, 1)),
         asyncio.ensure_future(work(2, 2)),
         asyncio.ensure_future(work(3, 3))]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

print('运行时间: ', time.time() - start_time)
for task in tasks:
    print('任务执行结果: ', task.result())
  • future: 代表以后执行或者没有执行的任务,实际上和task没有本质区别;这里就不做代码展示;

2.1 异步IO示例

  • run_until_complete():

    • 阻塞调用,直到协程运行结束才返回。参数是future,传入协程对象时内部会自动变为future
  • asyncio.sleep():

    • 模拟IO操作,这样的休眠不会阻塞事件循环,前面加上await后会把控制权交给主事件循环,在休眠(IO操作)结束后恢复这个协程。

提示:若在协程中需要有延时操作,应该使用 await asyncio.sleep(),而不是使用time.sleep(),因为使用time.sleep()后会释放GIL,阻塞整个主线程,从而阻塞整个事件循环。

import asyncio

async def test():
    print('异步任务test!')
c = test()  # 调用异步函数,得到协程对象coroutine--> c
loop = asyncio.get_event_loop()  # 创建事件循环
loop.run_until_complete(c)  # 把协程对象丢给循环,并执行异步函数内部代码

import asyncio


async def test():
    print('异步任务test')

if __name__ == '__main__':
    cor_test = test()   # 协程对象
    loop = asyncio.get_event_loop()
    # 创建任务
    # task = asyncio.ensure_future(cor_test)
    task = loop.create_task(cor_test)
    # 把任务注册到事件循环上
    loop.run_until_complete(task)
import asyncio, time


async def work(i, n):  # 使用async关键字定义异步函数
    print('任务{}等待: {}秒'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print('任务{}在{}秒后返回结束运行'.format(i, n))
    return i + n


start_time = time.time()  # 开始时间

tasks = [asyncio.ensure_future(work(1, 1)),
         asyncio.ensure_future(work(2, 2)),
         asyncio.ensure_future(work(3, 3))]
loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

print('运行时间: ', time.time() - start_time)
for task in tasks:
    print('任务执行结果: ', task.result())

2.2 py3.8版本+

运行协程的三种基本方式
async.run() 运行协程
async.create_task()创建task
async.gather()获取返回值

# 用create_task()创建task

import asyncio, time

async def work(i, n):  # 使用async关键字定义异步函数
    print('任务{}等待: {}秒'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print('任务{}在{}秒后返回结束运行'.format(i, n))
    return i + n


tasks = []
async def main():
    global tasks
    tasks = [asyncio.create_task(work(1, 1)),
             asyncio.create_task(work(2, 2)),
             asyncio.create_task(work(3, 3))]

    await asyncio.wait(tasks) # 阻塞


start_time = time.time()  # 开始时间
asyncio.run(main())
print('运行时间: ', time.time() - start_time)
for task in tasks:
    print('任务执行结果: ', task.result())

PS:

必须先通过asyncio.create_task将task创建到event loop中,

再通过await等待,

如果直接用await等待则会导致异步变同步

asyncio.create_task() 函数在 Python 3.7 中被加入。

# 用gather()收集返回值


import asyncio, time


async def work(i, n):  # 使用async关键字定义异步函数
    print('任务{}等待: {}秒'.format(i, n))
    await asyncio.sleep(n)  # 休眠一段时间
    print('任务{}在{}秒后返回结束运行'.format(i, n))
    return i + n


tasks = []


async def main():
    global tasks
    tasks = [asyncio.create_task(work(1, 1)),
             asyncio.create_task(work(2, 2)),
             asyncio.create_task(work(3, 3))]

    await asyncio.wait(tasks)
    response = await asyncio.gather(tasks[0], tasks[1], tasks[2])  # 将task作为参数传入gather,等异步任务都结束后返回结果列表
    print(response)


start_time = time.time()  # 开始时间

asyncio.run(main())

print('运行时间: ', time.time() - start_time)

【3】aiohttp

  • 我们之前学习过爬虫最重要的模块requests,但它是阻塞式的发起请求,每次请求发起后需阻塞等待其返回响应,不能做其他的事情。

    • 本文要介绍的aiohttp可以理解成是和requests对应Python异步网络请求库,它是基于 asyncio 的异步模块,可用于实现异步爬虫,有点就是更快于 requests 的同步爬虫。
    • 安装方式,pip install aiohttp。
  • aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于asyncio的异步库。

    • asyncio可以实现单线程并发IO操作,其实现了TCP、UDP、SSL等协议,
    • aiohttp就是基于asyncio实现的http框架。
import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("http://httpbin.org/headers") as response:
            print(await response.text())

asyncio.run(main())

【4】应用案例(异步爬虫)

  • 基于asyncio异步爬取斗图网的表情包
import requests
import os
import re
from lxml import etree
from fake_useragent import UserAgent
import time
import asyncio
import aiohttp

fake_ua = UserAgent()

headers = {
    "User-Agent": fake_ua.random
}


async def get_every_page_url():
    page_urls = []
    for i in range(1, 4):
        if i == 1:
            page_url = "https://www.pkdoutu.com/photo/list/"
            page_urls.append(page_url)
        else:
            page_url = f"https://www.pkdoutu.com/photo/list/?page={i}"
            # https://www.pkdoutu.com/photo/list/?page=2
            page_urls.append(page_url)
    return page_urls


async def get_first_page(page_url):
    print(f'------当前页的page_url是:{page_url}------')
    filename = 'dtb'
    if not os.path.exists(filename):
        os.mkdir(filename)
    async with aiohttp.ClientSession() as session:
        async with session.get(page_url, ssl=False, headers=headers) as response:
            # response.encoding = 'utf-8'
            page_text = await response.content.read()
            tree = etree.HTML(page_text)
            # img_urls = tree.xpath('//li[@class="list-group-item"]/div/div/a/img[@data-backup]/@data-backup')
            a_lists = tree.xpath('//*[@id="pic-detail"]/div/div[2]/div[2]/ul/li/div/div/a')
            img_urls = []
            for a in a_lists:
                img_src = a.xpath('./img/@data-backup')[0]
                img_urls.append(img_src)

    return img_urls


async def download_img(url):
    img_title = os.path.basename(url)
    filepath = os.path.join('dtb', img_title)
    async with aiohttp.ClientSession() as session:
        async with session.get(url, ssl=False) as response:
            response.encoding = 'utf-8'
            with open(filepath, 'wb') as f:
                f.write(await response.content.read())
                print(f'当前图片{img_title}下载完成')


async def main():
    # async版本
    page_urls = await get_every_page_url()
    for page_url in page_urls:
        img_urls = await get_first_page(page_url)
        tasks = []
        for url in img_urls:
            t = asyncio.create_task(download_img(url))
            tasks.append(t)
        await asyncio.wait(tasks)


async def main1():
    start = time.time()
    page_urls = await get_every_page_url()
    for page_url in page_urls:
        img_urls = await get_first_page(page_url)
        print(img_urls)


if __name__ == '__main__':
    start = time.time()
    # asyncio.run(main1())

    asyncio.run(main())
    print(f'当前总耗时为{time.time() - start}s')
    # 当前总耗时为45.654601097106934s

标签:之协程,task,18,print,time,18.1,async,page,asyncio
From: https://www.cnblogs.com/dream-ze/p/17245386.html

相关文章

  • 【技巧】Windows Server 2012/2016/2018桌面显示我的电脑图标
    从WindowsServer2012开始,微软取消了服务器桌面个性化设置功能,WindowsServer2012/2016用户桌面上默认只有一个回收站的图标,这让习惯于使用我的电脑图标功能的用户很不适......
  • 001-ksum 求符合条件的 k 个数 1. Two Sum/15. 3Sum/18. 4Sum/
    推荐阅读000-从零开始的数据结构与算法001-01-ksum求符合条件的k个数1.TwoSum/15.3Sum/18.4Sum/002-两数相加addtwonumbers003-无重复字符的最长子串Longe......
  • Zabbix“专家坐诊”第185期问答汇总
    问题一Q:Zabbix5.0版本,如图,请问这里怎么修改回localhost?  A:找到文件conf/zabbix.conf.php,改下图这个位置   问题二Q:大家好,我有个疑问请教下,zabbix用脚本监......
  • Ubuntu18.04设置bond0
    Ubuntu18.04设置bond6有两种方法,一是在安装系统的设置网卡聚合,二是修改/etc/netplan/*.yaml这个配置文件。一,安装系统时设置bond61,正常进入Ubuntu18.04的安装界面2,选择键......
  • 2023/03/18(六)雨,下一天;应该睡一天
    下雨天还有些冷,继续收拾屋子;休息日也就没喝咖啡,昏昏沉沉的;北京说是天儿很好。把小宝的写字台和我的对调了一下,这样他和姐姐的桌子面积一样了,再慢慢收拾其他的东西。前几......
  • esxi主机安装完毕后漏洞:CVE-2018-3646解决方法
    [解决方案]由于此漏洞属于芯片级漏洞,更新固件会造成较大的性能损失,在私有云环境下,此漏洞的影响范围可控,我们可以选择禁用此提示,暂缓漏洞的修复。esxi主机安装完毕后漏洞:C......
  • vue2 element ui 使用i18n 国际化配置
    前提npminstall vue-i18n第一步:vue项目下新建locals文件夹,里面配置index.js内容如下,同时新建zh.js和en.js作为语言配置文件importVuefrom'vue'importVueI18nf......
  • 离线安装 Nginx 1.18
    离线安装Nginx1.18安装依赖:gcc、gcc-c++、ssl、pcre、zlib。注意:一定要先安装gcc,再安装gcc-c++。然后再安装其他,其他的没有先后顺序。一、安装依赖gcc、gcc-c++等依赖......
  • 218服务器设置防火墙
    (1)启动防火墙systemctlstartiptables.service(2)禁止所有IPTCP连接端口2181iptables-IINPUT-ptcp--dport2181-jDROP  (3)配置保存serviceiptablessave(4)设置......
  • ubuntu18.04 脚本自启动方法
    1,修改/lib/systemd/system中文件rc-local.service文章末位加入这三行数据[Install]WantedBy=multi-user.targetAlias=rc-local.service 2,编辑/etc/rc.local添加需要......