异步爬虫
基础知识
阻塞
阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则该程序在操作上是阻塞的。
常见的阻塞形式有:网络I/O阻塞、磁盘I/O阻塞、用户输入阻塞等。阻塞是无处不在的,包括在CPU切换上下文时,所有进程都无法真正干事情,它们也会被阻塞。在多核CPU的情况下,正在执行上下文切换操作的核不可被利用
非阻塞
程序在等待某操作的过程中,自身不被阻塞,可以继续干别的事情,则称该程序在该操作系统上是非阻塞的。
非阻塞并不是在任何程序级别、任何情况下都存在的。仅当程序封装的级别可以囊括独立的子程序单元时,程序才能存在非阻塞状态。
非阻塞因阻塞的存在而存在,正因为阻塞导致程序运行的耗时增加与效率低下,我们才需要将它变为非阻塞的。
同步
不同程序单元为了共同完成某个任务,在执行过程中需要靠某种通信方式保持协调一致,此时这些程序单元是同步执行的。
例如在购物系统中更新商品库存时,需要用“行锁”作为通信信号,强制让不同的更新请求排队并按顺序执行,这里的更新库存操作就是同步的。
异步
为了完成某个任务,不同程序单元之间无需通信协调也能完成任务,此时不限同的程序单元之间可以是异步的。
例如,爬取下载网页。调度程序调用下载程序后,既可调度其他任务,无须与该下载任务保持通信以协调行为。不同网页的下载,保存等操作都是无关的,也无需相互通知协调。
多进程
多进程利用CPU的多核优势,在同一时间并行执行多个任务,可以大大提高执行效率
协程
协程是一种运行在用户态的轻量级线程。
协程拥有自己的寄存器上下文和栈。协程在调度切换时,将寄存器上下文和栈保存到其他地方,等切回来的时候,在恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入,就相当于进入上一次调用的状态。
协程本质上是一个单进程,相对多线程来说,它没有线程上下文切换的开销,没有原子操作锁定及同步到开销。
定义协程
- event_loop:事件循环,相当于一个无限循环,可以把一些函数注册到这个事件循环上,当满足发生条件的时候,就调用对应的处理方法。
- coroutine:协程。在Python中指协程对象,将协程对象注册到事件循环中,它会被事件循环调用。可以使用async关键字定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
- task:任务,这是对协程对象的进一步封装,包含协程对象的各个状态
- future:代表将来执行或者没有执行的任务的结果,实际上和task没有本质区别
import asyncio
async def execute(x):
print('Number', x)
# 返回一个协程对象
coroutine = execute(1)
print('Coroutine', coroutine)
print('After calling execute')
# 创建一个时间循环loop
loop = asyncio.get_event_loop()
# 将协程对象注册到事件循环中启动
loop.run_until_complete(coroutine)
print('After calling loop')
# Coroutine <coroutine object execute at 0x1045e5be0>
# After calling execute
# Number 1
# After calling loop
# async定义的方法会变成一个无法直接执行的协程对象,必须要注册到事件循环中才能运行
使用task封装对象
import asyncio
async def execute(x):
print('Number', x)
return x
# 返回一个协程对象
coroutine = execute(1)
print('Coroutine', coroutine)
print('After calling execute')
# 创建一个时间循环loop
loop = asyncio.get_event_loop()
# 将协程对象转换为task对象
task = loop.create_task(coroutine)
print('Task:',task)
# 将协程对象注册到事件循环中启动
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')
# Coroutine <coroutine object execute at 0x102891be0>
# After calling execute
# Task: <Task pending name='Task-1' coro=<execute() running at >
# Number 1
# Task: <Task finished name='Task-1' coro=<execute() done, defined at result=1>
# After calling loop
# 将task对象打印,从pending状态变为了finished,并且result变成了1
调用asyncio包的ensure_future方法,返回结果也为task对象,这样可以不用借助loop对象。即使没有声明loop,也可以提前定义task
import asyncio
async def execute(x):
print('Number', x)
return x
# 返回一个协程对象
coroutine = execute(1)
print('Coroutine', coroutine)
print('After calling execute')
# 使用方法创建一个task对象
task = asyncio.ensure_future(coroutine)
print('Task:',task)
# 创建一个时间循环loop
loop = asyncio.get_event_loop()
# 将协程对象注册到事件循环中启动
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')
多任务协程
import asyncio
tasks = [asyncio.ensure_future(req_baidu()) for _ in range(5)]
print('Tasks:', tasks)
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
print('Task Result:', task.result())
# Tasks: [<Task pending name='Task-1' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-2' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-3' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-4' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-5' coro=<req_baidu() running at asysncioDemo.py:15>>]
# Task Result: <Response [200]>
# Task Result: <Response [200]>
# Task Result: <Response [200]>
# Task Result: <Response [200]>
# Task Result: <Response [200]>
协程实现
当遇到一个网站需要等待页面响应返回结果时,合理的利用协程
import asyncio
import requests
import time
start = time.time()
async def req():
url = 'https://www.httpbin.org/delay/5'
print('Waiting for',url)
response = request.get(url)
print('Get response from',url,'response',response)
tasks = [asyncio.ensure_future(req_baidu()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time:',end - start )
请求耗时66秒,并没有实现异步处理,使用await关键字将耗时等待挂起,让出控制权,如果协程在执行的时候遇到await,事件循环就会将本次协程挂起,转而执行别的协程,直到其他协程挂起或执行完毕。
async def req():
url = 'https://www.httpbin.org/delay/5'
print('Waiting for',url)
response = await requests.get(url)
print('Get response from',url,'response',response)
# Task exception was never retrieved
# future: <Task finished coro=<request() done,defined at ...
# TypeError: object Response can't be used in 'await' expression
错误吓死requests返回的Response对象不能喝await一起使用:
- 一个原生协程对象
- 一个由types.coroutine修饰的生成器,这个生成器可以返回协程对象
- 由一个包含**_await _ **方法的对象返回的迭代器
可以将async请求改成一个协程对象
async def req_baidu():
return requests.get('https://www.baidu.com/')
async def await_demo():
url = 'https://www.httpbin.org/delay/5'
print('Waiting for', url)
response = await req_baidu()
print('Get response from', url, 'response', response)
start = time.time()
tasks = [asyncio.ensure_future(await_demo()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time', end - start)
aiohttp
import asyncio
import time
import aiohttp
start = time.time()
async def get(url):
session = aiohttp.ClientSession()
response = await session.get(url)
await response.text()
await session.close()
return response
async def request():
url = 'https://www.httpbin.org/delay/5'
print('Waiting for', url)
response = await get(url)
print('Get response from', url, 'response:', response)
if __name__ == '__main__':
tasks = [asyncio.ensure_future(request()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time', end - start)
开始运行时,事件循环会运行第一个task,对于第一个task来说,当执行到第一个await跟着的get方法时,它会被挂起,但这个get方法第一步的执行是非阻塞的,挂起之后会立马被唤醒,立即又进入执行。接着遇到第二个await,调用Session.get请求,然后被挂起。然后事件循环会寻找当前未被挂起的协程继续执行,依次类推。
基本爬取
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url,ssl=False) as response:
return await response.text(), response.status
async def main():
async with aiohttp.ClientSession() as session:
html, status = await fetch(session, 'https://www.baidu.com')
print(f'html:{html[:100]}')
print(f'status:{status}')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
这里完成了一次基本的HTTP请求,和requests请求不同的地方:
- 除了必须引入aiohtto库,还需要引入asyncio。因为要实现异步爬取需要启动协程,而协程则需要借助asyncio里的事件循环才能执行。除了事件循环asyncio也提供了很多基础的异步操作
- 异步爬取方法的没个前面都要统一的加async来修饰
- with as语句同样需要async来修饰,在Python中,with as语句用于声明一个上下文管理器,帮助我们自动分配和释放资源。在异步方法中,with as前面加上async代表声明一个支持异步的上下文管理器
- 对于返回协程对象的操作,前面需要加await来修饰。例如Response调用的text()方法,返回的是一个协程对象,而状态码则是一串数值。
URL参数设置
import aiohttp
import asyncio
async def main():
# 利用params参数传入一个字典
params={'name':'kang','age',18}
async with aiohttp.ClientSession() as session:
async with session.get('https://www.htpbin.org/get',params=params) as response:
print(await response.text())
if __name__=='__main__':
asyncio.get_event_loop().run_until_complete(main())
{
"args": {
"age": "18",
"name": "kang"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.11 aiohttp/3.8.4",
"X-Amzn-Trace-Id": "Root=1-6454e482-0599efda04b2a55f6a7a0e14"
},
"origin": "139.5.108.229",
"url": "https://www.httpbin.org/get?name=kang&age=18"
}
其他类型请求
# 与requests库中的请求方法类似
session.post('http://www.httpbin.org/post',data=b'data')
session.put('http://www.httpbin.org/put',data=b'data')
session.delete('http://www.httpbin.org/delete',data=b'data')
session.head('http://www.httpbin.org/get')
session.options('http://www.httpbin.org/options')
session.patch('http://www.httpbin.org/patch',data=b'data')
POST请求
对于post表单提交,对于请求头中的Content-Type=application/x-www-form-urlencoded
在session请求中改为
data={'name':'kang','age':18}
session.post('https://www.httpbin.org/post',data=data)
json数据提交对应请求头中的Content-type=application/json
session.post('https://www.httpbin.org/post',json=data)
响应
import aiohttp
import asyncio
async def main():
# 利用params参数传入一个字典
params={'name':'kang','age',18}
async with aiohttp.ClientSession() as session:
async with session.get('https://www.htpbin.org/get',params=params) as response:
print('status:', response.status)
print('body:', await response.text())
print('headers:', response.headers)
print('bytes:', await response.read())
print('json:', await response.json())
if __name__=='__main__':
asyncio.get_event_loop().run_until_complete(main())
status: 200
body: {
"args": {
"age": "18",
"name": "kang"
},
"headers": {
"Accept": "*/*",
"Accept-Encoding": "gzip, deflate",
"Host": "www.httpbin.org",
"User-Agent": "Python/3.11 aiohttp/3.8.4",
"X-Amzn-Trace-Id": "Root=1-6454e730-7a6e883f275648b50e2d39c6"
},
"origin": "139.5.108.229",
"url": "https://www.httpbin.org/get?name=kang&age=18"
}
headers: <CIMultiDictProxy('Date': 'Fri, 05 May 2023 11:23:31 GMT', 'Content-Type': 'application/json', 'Content-Length': '375', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
bytes: b'{\n "args": {\n "age": "18", \n "name": "kang"\n }, \n "headers": {\n "Accept": "*/*", \n "Accept-Encoding": "gzip, deflate", \n "Host": "www.httpbin.org", \n "User-Agent": "Python/3.11 aiohttp/3.8.4", \n "X-Amzn-Trace-Id": "Root=1-6454e730-7a6e883f275648b50e2d39c6"\n }, \n "origin": "139.5.108.229", \n "url": "https://www.httpbin.org/get?name=kang&age=18"\n}\n'
json: {'args': {'age': '18', 'name': 'kang'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.11 aiohttp/3.8.4', 'X-Amzn-Trace-Id': 'Root=1-6454e730-7a6e883f275648b50e2d39c6'}, 'origin': '139.5.108.229', 'url': 'https://www.httpbin.org/get?name=kang&age=18'}
在返回一个协程对象时,有些字段需要使用await修饰。具体如下:
- 发送请求的方法名前需要加上
await
关键字,例如:await session.get(url)
。- 响应的数据需要使用
await
修饰的部分,包括:
response.text()
: 用于获取响应体中的文本内容。response.json()
: 用于获取响应体中的JSON格式数据。response.read()
: 用于获取响应体的原始字节流。
- 释放底层连接资源和关闭会话对象时,需要使用
await
修饰的方法,包括:
response.release()
: 用于释放底层连接资源。response.raise_for_status()
: 用于检查响应状态码是否为200,并在不是200的情况下抛出异常。session.close()
: 用于关闭会话对象并释放所有底层资源。
aiohttp 官方文档
超时设置
利用ClienTimeout对象设置超时 timeout=aiohttp.ClientTimeout(total=1)
,aiohttp.ClientSession(timeout=timeout)
设置一秒的超时时间,如果超时会抛出TimeoutError,其类型为asyncio.TimeoutError
并发限制
import asyncio
import aiohttp
# 最大并发量
CONCURRENCY = 5
URL = 'https://www.baidu.com/'
semaphore = asyncio.Semaphore(CONCURRENCY)
session = None
async def scrape_api():
async with semaphore:
print('scraping', URL)
async with session.get(URL) as response:
await asyncio.sleep(1)
return await response.text()
async def main():
global session
session = aiohttp.ClientSession()
scrape_index_tasks = [asyncio.ensure_future(con) for _ in range(100)]
await asyncio.gather(*scrape_index_tasks)
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())
# 声明了100个task,传入gather方法后如果不限制将会同时执行,设置最大并发量后,会被控制在5个
标签:异步,协程,get,python,爬虫,print,response,loop,asyncio From: https://www.cnblogs.com/kangkin/p/17375243.html
asyncio.gather()
可以接受零个或多个协程对象作为输入,并返回一个协程对象,该协程对象将在所有输入协程完成后运行完成。如果有任何输入协程引发异常,则asyncio.gather()
返回的协程也会引发相同的异常。