并发爬虫之协程实现
-
协程,又称微线程,纤程。英文名Coroutine。一句话说明什么是线程:协程是一种用户态的轻量级线程。
-
协程拥有自己的寄存器上下文和栈。协程调度切换时,将寄存器上下文和栈保存到其他地方,在切回来的时候,恢复先前保存的寄存器上下文和栈。因此:
-
协程能保留上一次调用时的状态(即所有局部状态的一个特定组合),每次过程重入时,就相当于进入上一次调用的状态,换种说法:进入上一次离开时所处逻辑流的位置。
【1】yield与协程
def foo():
print("OK1")
yield 100 # 切换: 保存/恢复的功能
print("OK2")
yield 1000
def bar():
print("OK3")
yield 200
print("OK4")
yield 2000
gen = foo()
ret = next(gen) # gen.__next__()
print(ret)
gen2 = bar()
ret2 = next(gen2) # gen.__next__()
print(ret2)
ret = next(gen) # gen.__next__()
print(ret)
ret2 = next(gen2) # gen.__next__()
print(ret2)
【2】asyncio模块
-
asyncio即Asynchronous I/O是python一个用来处理并发(concurrent)事件的包,是很多python异步架构的基础,多用于处理高并发网络请求方面的问题。
-
为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。
-
asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
-
asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择。
import asyncio
async def task(i):
print(f"task {i} start")
await asyncio.sleep(1)
print(f"task {i} end")
# 创建事件循环对象
loop = asyncio.get_event_loop()
# 直接将协程对象加入时间循环中
tasks = [task(1), task(2)]
# asyncio.wait:将协程任务进行收集,功能类似后面的asyncio.gather
# run_until_complete阻塞调用,直到协程全部运行结束才返回
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
-
async
: 定义一个方法(函数),这个方法在后面的调用中不会被立即执行而是返回一个协程对象; -
coroutine
: 协程对象,也可以将协程对象添加到时间循环中,它会被事件循环调用; -
event_loop
: 事件循环,相当于一个无限循环,可以把一些函数添加到这个事件中,函数不会立即执行, 而是满足某些条件的时候,函数就会被循环执行; -
await
: 用来挂起阻塞方法的执行; -
task
: 任务,对协程对象的进一步封装,包含任务的各个状态;asyncio.Task用于实现协作式多任务的库,且Task对象不能用户手动实例化,通过下面2个函数loop.create_task() 或 asyncio.ensure_future()
创建。
import asyncio, time
async def work(i, n): # 使用async关键字定义异步函数
print('任务{}等待: {}秒'.format(i, n))
await asyncio.sleep(n) # 休眠一段时间
print('任务{}在{}秒后返回结束运行'.format(i, n))
return i + n
start_time = time.time() # 开始时间
tasks = [asyncio.ensure_future(work(1, 1)),
asyncio.ensure_future(work(2, 2)),
asyncio.ensure_future(work(3, 3))]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
print('运行时间: ', time.time() - start_time)
for task in tasks:
print('任务执行结果: ', task.result())
future
: 代表以后执行或者没有执行的任务,实际上和task
没有本质区别;这里就不做代码展示;
2.1 异步IO示例
-
run_until_complete()
:- 阻塞调用,直到协程运行结束才返回。参数是future,传入协程对象时内部会自动变为future
-
asyncio.sleep()
:- 模拟IO操作,这样的休眠不会阻塞事件循环,前面加上await后会把控制权交给主事件循环,在休眠(IO操作)结束后恢复这个协程。
提示:若在协程中需要有延时操作,应该使用 await asyncio.sleep(),而不是使用time.sleep(),因为使用time.sleep()后会释放GIL,阻塞整个主线程,从而阻塞整个事件循环。
import asyncio
async def test():
print('异步任务test!')
c = test() # 调用异步函数,得到协程对象coroutine--> c
loop = asyncio.get_event_loop() # 创建事件循环
loop.run_until_complete(c) # 把协程对象丢给循环,并执行异步函数内部代码
import asyncio
async def test():
print('异步任务test')
if __name__ == '__main__':
cor_test = test() # 协程对象
loop = asyncio.get_event_loop()
# 创建任务
# task = asyncio.ensure_future(cor_test)
task = loop.create_task(cor_test)
# 把任务注册到事件循环上
loop.run_until_complete(task)
import asyncio, time
async def work(i, n): # 使用async关键字定义异步函数
print('任务{}等待: {}秒'.format(i, n))
await asyncio.sleep(n) # 休眠一段时间
print('任务{}在{}秒后返回结束运行'.format(i, n))
return i + n
start_time = time.time() # 开始时间
tasks = [asyncio.ensure_future(work(1, 1)),
asyncio.ensure_future(work(2, 2)),
asyncio.ensure_future(work(3, 3))]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
print('运行时间: ', time.time() - start_time)
for task in tasks:
print('任务执行结果: ', task.result())
2.2 py3.8版本+
运行协程的三种基本方式
async.run()
运行协程
async.create_task()
创建task
async.gather()
获取返回值
# 用create_task()创建task
import asyncio, time
async def work(i, n): # 使用async关键字定义异步函数
print('任务{}等待: {}秒'.format(i, n))
await asyncio.sleep(n) # 休眠一段时间
print('任务{}在{}秒后返回结束运行'.format(i, n))
return i + n
tasks = []
async def main():
global tasks
tasks = [asyncio.create_task(work(1, 1)),
asyncio.create_task(work(2, 2)),
asyncio.create_task(work(3, 3))]
await asyncio.wait(tasks) # 阻塞
start_time = time.time() # 开始时间
asyncio.run(main())
print('运行时间: ', time.time() - start_time)
for task in tasks:
print('任务执行结果: ', task.result())
PS:
必须先通过asyncio.create_task将task创建到event loop中,
再通过await等待,
如果直接用await等待则会导致异步变同步
asyncio.create_task() 函数在 Python 3.7 中被加入。
# 用gather()收集返回值
import asyncio, time
async def work(i, n): # 使用async关键字定义异步函数
print('任务{}等待: {}秒'.format(i, n))
await asyncio.sleep(n) # 休眠一段时间
print('任务{}在{}秒后返回结束运行'.format(i, n))
return i + n
tasks = []
async def main():
global tasks
tasks = [asyncio.create_task(work(1, 1)),
asyncio.create_task(work(2, 2)),
asyncio.create_task(work(3, 3))]
await asyncio.wait(tasks)
response = await asyncio.gather(tasks[0], tasks[1], tasks[2]) # 将task作为参数传入gather,等异步任务都结束后返回结果列表
print(response)
start_time = time.time() # 开始时间
asyncio.run(main())
print('运行时间: ', time.time() - start_time)
【3】aiohttp
-
我们之前学习过爬虫最重要的模块requests,但它是阻塞式的发起请求,每次请求发起后需阻塞等待其返回响应,不能做其他的事情。
- 本文要介绍的aiohttp可以理解成是和requests对应Python异步网络请求库,它是基于 asyncio 的异步模块,可用于实现异步爬虫,有点就是更快于 requests 的同步爬虫。
- 安装方式,pip install aiohttp。
-
aiohttp是一个为Python提供异步HTTP 客户端/服务端编程,基于
asyncio
的异步库。- asyncio可以实现单线程并发IO操作,其实现了TCP、UDP、SSL等协议,
- aiohttp就是基于asyncio实现的http框架。
import aiohttp
import asyncio
async def main():
async with aiohttp.ClientSession() as session:
async with session.get("http://httpbin.org/headers") as response:
print(await response.text())
asyncio.run(main())
【4】应用案例(异步爬虫)
- 基于asyncio异步爬取斗图网的表情包
import requests
import os
import re
from lxml import etree
from fake_useragent import UserAgent
import time
import asyncio
import aiohttp
fake_ua = UserAgent()
headers = {
"User-Agent": fake_ua.random
}
async def get_every_page_url():
page_urls = []
for i in range(1, 4):
if i == 1:
page_url = "https://www.pkdoutu.com/photo/list/"
page_urls.append(page_url)
else:
page_url = f"https://www.pkdoutu.com/photo/list/?page={i}"
# https://www.pkdoutu.com/photo/list/?page=2
page_urls.append(page_url)
return page_urls
async def get_first_page(page_url):
print(f'------当前页的page_url是:{page_url}------')
filename = 'dtb'
if not os.path.exists(filename):
os.mkdir(filename)
async with aiohttp.ClientSession() as session:
async with session.get(page_url, ssl=False, headers=headers) as response:
# response.encoding = 'utf-8'
page_text = await response.content.read()
tree = etree.HTML(page_text)
# img_urls = tree.xpath('//li[@class="list-group-item"]/div/div/a/img[@data-backup]/@data-backup')
a_lists = tree.xpath('//*[@id="pic-detail"]/div/div[2]/div[2]/ul/li/div/div/a')
img_urls = []
for a in a_lists:
img_src = a.xpath('./img/@data-backup')[0]
img_urls.append(img_src)
return img_urls
async def download_img(url):
img_title = os.path.basename(url)
filepath = os.path.join('dtb', img_title)
async with aiohttp.ClientSession() as session:
async with session.get(url, ssl=False) as response:
response.encoding = 'utf-8'
with open(filepath, 'wb') as f:
f.write(await response.content.read())
print(f'当前图片{img_title}下载完成')
async def main():
# async版本
page_urls = await get_every_page_url()
for page_url in page_urls:
img_urls = await get_first_page(page_url)
tasks = []
for url in img_urls:
t = asyncio.create_task(download_img(url))
tasks.append(t)
await asyncio.wait(tasks)
async def main1():
start = time.time()
page_urls = await get_every_page_url()
for page_url in page_urls:
img_urls = await get_first_page(page_url)
print(img_urls)
if __name__ == '__main__':
start = time.time()
# asyncio.run(main1())
asyncio.run(main())
print(f'当前总耗时为{time.time() - start}s')
# 当前总耗时为45.654601097106934s
标签:之协程,task,18,print,time,18.1,async,page,asyncio
From: https://www.cnblogs.com/dream-ze/p/17245386.html