首页 > 编程语言 >python异步爬虫

python异步爬虫

时间:2023-05-05 20:12:27浏览次数:40  
标签:异步 协程 get python 爬虫 print response loop asyncio

异步爬虫

基础知识

阻塞

​ 阻塞状态指程序未得到所需计算资源时被挂起的状态。程序在等待某个操作完成期间,自身无法继续干别的事情,则该程序在操作上是阻塞的。

​ 常见的阻塞形式有:网络I/O阻塞、磁盘I/O阻塞、用户输入阻塞等。阻塞是无处不在的,包括在CPU切换上下文时,所有进程都无法真正干事情,它们也会被阻塞。在多核CPU的情况下,正在执行上下文切换操作的核不可被利用

非阻塞

​ 程序在等待某操作的过程中,自身不被阻塞,可以继续干别的事情,则称该程序在该操作系统上是非阻塞的。

​ 非阻塞并不是在任何程序级别、任何情况下都存在的。仅当程序封装的级别可以囊括独立的子程序单元时,程序才能存在非阻塞状态。

​ 非阻塞因阻塞的存在而存在,正因为阻塞导致程序运行的耗时增加与效率低下,我们才需要将它变为非阻塞的。

同步

不同程序单元为了共同完成某个任务,在执行过程中需要靠某种通信方式保持协调一致,此时这些程序单元是同步执行的。

例如在购物系统中更新商品库存时,需要用“行锁”作为通信信号,强制让不同的更新请求排队并按顺序执行,这里的更新库存操作就是同步的。

异步

​ 为了完成某个任务,不同程序单元之间无需通信协调也能完成任务,此时不限同的程序单元之间可以是异步的。

​ 例如,爬取下载网页。调度程序调用下载程序后,既可调度其他任务,无须与该下载任务保持通信以协调行为。不同网页的下载,保存等操作都是无关的,也无需相互通知协调。

多进程

多进程利用CPU的多核优势,在同一时间并行执行多个任务,可以大大提高执行效率

协程

​ 协程是一种运行在用户态的轻量级线程。

​ 协程拥有自己的寄存器上下文和栈。协程在调度切换时,将寄存器上下文和栈保存到其他地方,等切回来的时候,在恢复先前保存的寄存器上下文和栈。因此,协程能保留上一次调用时的状态,即所有局部状态的一个特定组合,每次过程重入,就相当于进入上一次调用的状态。

​ 协程本质上是一个单进程,相对多线程来说,它没有线程上下文切换的开销,没有原子操作锁定及同步到开销。

定义协程

  1. event_loop:事件循环,相当于一个无限循环,可以把一些函数注册到这个事件循环上,当满足发生条件的时候,就调用对应的处理方法。
  2. coroutine:协程。在Python中指协程对象,将协程对象注册到事件循环中,它会被事件循环调用。可以使用async关键字定义一个方法,这个方法在调用时不会立即被执行,而是返回一个协程对象。
  3. task:任务,这是对协程对象的进一步封装,包含协程对象的各个状态
  4. future:代表将来执行或者没有执行的任务的结果,实际上和task没有本质区别
import asyncio

async def execute(x):
    print('Number', x)

# 返回一个协程对象    
coroutine = execute(1)
print('Coroutine', coroutine)
print('After calling execute')

# 创建一个时间循环loop
loop = asyncio.get_event_loop()
# 将协程对象注册到事件循环中启动
loop.run_until_complete(coroutine)
print('After calling loop')

# Coroutine <coroutine object execute at 0x1045e5be0>
# After calling execute
# Number 1
# After calling loop

# async定义的方法会变成一个无法直接执行的协程对象,必须要注册到事件循环中才能运行

使用task封装对象

import asyncio

async def execute(x):
    print('Number', x)
    return x

# 返回一个协程对象    
coroutine = execute(1)
print('Coroutine', coroutine)
print('After calling execute')

# 创建一个时间循环loop
loop = asyncio.get_event_loop()
# 将协程对象转换为task对象
task = loop.create_task(coroutine)
print('Task:',task)

# 将协程对象注册到事件循环中启动
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

# Coroutine <coroutine object execute at 0x102891be0>
# After calling execute
# Task: <Task pending name='Task-1' coro=<execute() running at >
# Number 1
# Task: <Task finished name='Task-1' coro=<execute() done, defined at  result=1>
# After calling loop

# 将task对象打印,从pending状态变为了finished,并且result变成了1

调用asyncio包的ensure_future方法,返回结果也为task对象,这样可以不用借助loop对象。即使没有声明loop,也可以提前定义task

import asyncio

async def execute(x):
    print('Number', x)
    return x

# 返回一个协程对象    
coroutine = execute(1)
print('Coroutine', coroutine)
print('After calling execute')

# 使用方法创建一个task对象
task = asyncio.ensure_future(coroutine)
print('Task:',task)
# 创建一个时间循环loop
loop = asyncio.get_event_loop()

# 将协程对象注册到事件循环中启动
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

多任务协程

import asyncio

tasks = [asyncio.ensure_future(req_baidu()) for _ in range(5)]
print('Tasks:', tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
    print('Task Result:', task.result())
    
# Tasks: [<Task pending name='Task-1' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-2' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-3' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-4' coro=<req_baidu() running at asysncioDemo.py:15>>, <Task pending name='Task-5' coro=<req_baidu() running at asysncioDemo.py:15>>]
# Task Result: <Response [200]>
# Task Result: <Response [200]>
# Task Result: <Response [200]>
# Task Result: <Response [200]>
# Task Result: <Response [200]>

协程实现

当遇到一个网站需要等待页面响应返回结果时,合理的利用协程

import asyncio
import requests
import time

start = time.time()

async def req():
  	url = 'https://www.httpbin.org/delay/5'
    print('Waiting for',url)
    response = request.get(url)
    print('Get response from',url,'response',response)

tasks = [asyncio.ensure_future(req_baidu()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:',end - start )

请求耗时66秒,并没有实现异步处理,使用await关键字将耗时等待挂起,让出控制权,如果协程在执行的时候遇到await,事件循环就会将本次协程挂起,转而执行别的协程,直到其他协程挂起或执行完毕。

async def req():
  	url = 'https://www.httpbin.org/delay/5'
    print('Waiting for',url)
    response = await requests.get(url)
    print('Get response from',url,'response',response)

# Task exception was never retrieved 
# future: <Task finished coro=<request() done,defined at ...
# TypeError: object Response can't be used in 'await' expression 

错误吓死requests返回的Response对象不能喝await一起使用:

  • 一个原生协程对象
  • 一个由types.coroutine修饰的生成器,这个生成器可以返回协程对象
  • 由一个包含**_await _ **方法的对象返回的迭代器

可以将async请求改成一个协程对象

async def req_baidu():
    return requests.get('https://www.baidu.com/')

async def await_demo():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for', url)
    response = await req_baidu()
    print('Get response from', url, 'response', response)

start = time.time()
tasks = [asyncio.ensure_future(await_demo()) for _ in range(10)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
end = time.time()
print('Cost time', end - start)

aiohttp

import asyncio
import time

import aiohttp

start = time.time()


async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    await response.text()
    await session.close()
    return response


async def request():
    url = 'https://www.httpbin.org/delay/5'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'response:', response)


if __name__ == '__main__':
    tasks = [asyncio.ensure_future(request()) for _ in range(10)]
    loop = asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    end = time.time()
    print('Cost time', end - start)

开始运行时,事件循环会运行第一个task,对于第一个task来说,当执行到第一个await跟着的get方法时,它会被挂起,但这个get方法第一步的执行是非阻塞的,挂起之后会立马被唤醒,立即又进入执行。接着遇到第二个await,调用Session.get请求,然后被挂起。然后事件循环会寻找当前未被挂起的协程继续执行,依次类推。

基本爬取

import asyncio

import aiohttp

async def fetch(session, url):
    async with session.get(url,ssl=False) as response:
        return await response.text(), response.status


async def main():
    async with aiohttp.ClientSession() as session:
        html, status = await fetch(session, 'https://www.baidu.com')
        print(f'html:{html[:100]}')
        print(f'status:{status}')
        
if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

这里完成了一次基本的HTTP请求,和requests请求不同的地方:

  1. 除了必须引入aiohtto库,还需要引入asyncio。因为要实现异步爬取需要启动协程,而协程则需要借助asyncio里的事件循环才能执行。除了事件循环asyncio也提供了很多基础的异步操作
  2. 异步爬取方法的没个前面都要统一的加async来修饰
  3. with as语句同样需要async来修饰,在Python中,with as语句用于声明一个上下文管理器,帮助我们自动分配和释放资源。在异步方法中,with as前面加上async代表声明一个支持异步的上下文管理器
  4. 对于返回协程对象的操作,前面需要加await来修饰。例如Response调用的text()方法,返回的是一个协程对象,而状态码则是一串数值。

URL参数设置

import aiohttp
import asyncio

async def main():
  	# 利用params参数传入一个字典
  	params={'name':'kang','age',18}
    async with aiohttp.ClientSession() as session:
      	async with session.get('https://www.htpbin.org/get',params=params) as response:
          	print(await response.text())
            
if __name__=='__main__':
  	asyncio.get_event_loop().run_until_complete(main())
{
  "args": {
    "age": "18", 
    "name": "kang"
  }, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Host": "www.httpbin.org", 
    "User-Agent": "Python/3.11 aiohttp/3.8.4", 
    "X-Amzn-Trace-Id": "Root=1-6454e482-0599efda04b2a55f6a7a0e14"
  }, 
  "origin": "139.5.108.229", 
  "url": "https://www.httpbin.org/get?name=kang&age=18"
}

其他类型请求

# 与requests库中的请求方法类似
session.post('http://www.httpbin.org/post',data=b'data')
session.put('http://www.httpbin.org/put',data=b'data')
session.delete('http://www.httpbin.org/delete',data=b'data')
session.head('http://www.httpbin.org/get')
session.options('http://www.httpbin.org/options')
session.patch('http://www.httpbin.org/patch',data=b'data')

POST请求

对于post表单提交,对于请求头中的Content-Type=application/x-www-form-urlencoded

在session请求中改为 data={'name':'kang','age':18} session.post('https://www.httpbin.org/post',data=data)

json数据提交对应请求头中的Content-type=application/json

session.post('https://www.httpbin.org/post',json=data)

响应

import aiohttp
import asyncio

async def main():
  	# 利用params参数传入一个字典
  	params={'name':'kang','age',18}
    async with aiohttp.ClientSession() as session:
      	async with session.get('https://www.htpbin.org/get',params=params) as response:
          	print('status:', response.status)
            print('body:', await response.text())
            print('headers:', response.headers)
            print('bytes:', await response.read())
            print('json:', await response.json())
            
if __name__=='__main__':
  	asyncio.get_event_loop().run_until_complete(main())
status: 200
body: {
  "args": {
    "age": "18", 
    "name": "kang"
  }, 
  "headers": {
    "Accept": "*/*", 
    "Accept-Encoding": "gzip, deflate", 
    "Host": "www.httpbin.org", 
    "User-Agent": "Python/3.11 aiohttp/3.8.4", 
    "X-Amzn-Trace-Id": "Root=1-6454e730-7a6e883f275648b50e2d39c6"
  }, 
  "origin": "139.5.108.229", 
  "url": "https://www.httpbin.org/get?name=kang&age=18"
}

headers: <CIMultiDictProxy('Date': 'Fri, 05 May 2023 11:23:31 GMT', 'Content-Type': 'application/json', 'Content-Length': '375', 'Connection': 'keep-alive', 'Server': 'gunicorn/19.9.0', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Credentials': 'true')>
bytes: b'{\n  "args": {\n    "age": "18", \n    "name": "kang"\n  }, \n  "headers": {\n    "Accept": "*/*", \n    "Accept-Encoding": "gzip, deflate", \n    "Host": "www.httpbin.org", \n    "User-Agent": "Python/3.11 aiohttp/3.8.4", \n    "X-Amzn-Trace-Id": "Root=1-6454e730-7a6e883f275648b50e2d39c6"\n  }, \n  "origin": "139.5.108.229", \n  "url": "https://www.httpbin.org/get?name=kang&age=18"\n}\n'
json: {'args': {'age': '18', 'name': 'kang'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate', 'Host': 'www.httpbin.org', 'User-Agent': 'Python/3.11 aiohttp/3.8.4', 'X-Amzn-Trace-Id': 'Root=1-6454e730-7a6e883f275648b50e2d39c6'}, 'origin': '139.5.108.229', 'url': 'https://www.httpbin.org/get?name=kang&age=18'}

在返回一个协程对象时,有些字段需要使用await修饰。具体如下:

  1. 发送请求的方法名前需要加上await关键字,例如:await session.get(url)
  2. 响应的数据需要使用await修饰的部分,包括:
  • response.text(): 用于获取响应体中的文本内容。
  • response.json(): 用于获取响应体中的JSON格式数据。
  • response.read(): 用于获取响应体的原始字节流。
  1. 释放底层连接资源和关闭会话对象时,需要使用await修饰的方法,包括:
  • response.release(): 用于释放底层连接资源。
  • response.raise_for_status(): 用于检查响应状态码是否为200,并在不是200的情况下抛出异常。
  • session.close(): 用于关闭会话对象并释放所有底层资源。

aiohttp 官方文档

超时设置

利用ClienTimeout对象设置超时 timeout=aiohttp.ClientTimeout(total=1),aiohttp.ClientSession(timeout=timeout)

设置一秒的超时时间,如果超时会抛出TimeoutError,其类型为asyncio.TimeoutError

并发限制

import asyncio

import aiohttp

# 最大并发量
CONCURRENCY = 5
URL = 'https://www.baidu.com/'

semaphore = asyncio.Semaphore(CONCURRENCY)
session = None

async def scrape_api():
    async with semaphore:
         print('scraping', URL)
         async with session.get(URL) as response:
             await asyncio.sleep(1)
             return await response.text()

async def main():              
    global session
    session = aiohttp.ClientSession()
    scrape_index_tasks = [asyncio.ensure_future(con) for _ in range(100)]
    await asyncio.gather(*scrape_index_tasks)   
    
if __name__ == '__main__':
		asyncio.get_event_loop().run_until_complete(main())    
    
# 声明了100个task,传入gather方法后如果不限制将会同时执行,设置最大并发量后,会被控制在5个

asyncio.gather() 可以接受零个或多个协程对象作为输入,并返回一个协程对象,该协程对象将在所有输入协程完成后运行完成。如果有任何输入协程引发异常,则 asyncio.gather() 返回的协程也会引发相同的异常。

标签:异步,协程,get,python,爬虫,print,response,loop,asyncio
From: https://www.cnblogs.com/kangkin/p/17375243.html

相关文章

  • python 串口读取gps
    #coding:utf-8#lastmodified:20220824importtimeimportserialimportre utctime=''lat=''ulat=''lon=''ulon=''numSv=''msl=''cogt=''cogm='&#......
  • Python 脚本部署和发布 Django 应用程序的示例代码及注释
    代码说明:1、在脚本中定义了要部署的Django应用程序名称、Docker镜像名称和标签。2、使用字符串模板定义了KubernetesDeployment和Service的YAML文件。在字符串模板中使用了变量,用于替换实际的值。3、将Deployment和Service的YAML文件保存到本地文件中,并使用kube......
  • python如何表格对齐, 表格输出对齐, 输出内容对齐
    #我们输出的要求是对齐,但是名字的长度不一样defdeal_data(var):foriinrange(12-len(var)):#为什么是12呢?必须保证里面单个长度不能大于12,具体多少可以看自己的每个单元格的数据,不超过就可以var+=""returnvarlist1=[["Jacky","79","1......
  • 一Python flask框架
    一:Pythonflask框架前言1.Python面向对象的高级编程语言,以其语法简单、免费开源、免编译扩展性高,同时也可以嵌入到C/C++程序和丰富的第三方库,Python运用到大数据分析、人工智能、web后端等应用场景上。2.Python目前主要流行的web框架:flask、Django、Tornado补充一下,我们......
  • Python简易学生管理系统
    Python简易学生管理系统Projectmain.pymanager.pystudent.pystudents_info.json1、学生文件student.py#学生类classStudent(object):#存放学生信息student_info={}#学生初始化方法def__init__(self,id,name,addr,tel):self.id=......
  • Python+Pandas批量合并大量excel文件
    requirments.txtet-xmlfile==1.1.0numpy==1.24.3openpyxl==3.1.2pandas==2.0.1python-dateutil==2.8.2pytz==2023.3six==1.16.0tzdata==2023.3main.pyimportosimportpandasaspddir_path=os.path.dirname(os.path.abspath(__file__))source_location=o......
  • Python实现遍历读取文件或文件夹
    参考:https://www.jb51.net/article/258341.htmos.walk本身已经是遍历读取,包含所有的子文件(夹)path=u'.'#文件路径defnewWalkFile2(file):#main_dir当前路径,sub_dir_list当前路径下的子文件夹是个数组,sub_file_list当前路径下具体文件formain_dir,sub_dir_l......
  • Python自动寻路算法
    一、题目描述在一个迷宫游戏里,有一些小怪物要去攻击主角,现在希望给这些小怪物加上聪明的AI,让他们可以自动绕过迷宫中的障碍物,寻找到主角所在。二、解题思路迷宫游戏里的场景通常都是由小方格组成。假设我们有一个7*5大小的迷宫,图中红色格子是终点,绿色格子是起点,蓝色格子是一堵墙......
  • Python教程:pandas读写txt文件——DataFrame和Series
    大家用pandas一般都是读写csv文件或者tsv文件,读写txt文件时一般就withopen了,其实pandas数据类型操作起来更加方便,还是建议全用pandas这一套。读txt文件代码如下,主要是设置正则表达式的分隔符(sep参数),和列名取消(header参数),以及不需要列索引(index_col)。1df=pd.read_csv("workl......
  • python笔记-数据类型
    获取数据类型type(val)iftype(1)==int:print('1是int类型')iftype('hello')==str:print('1是字符串类型')iftype(1.5)==float:print('1是float类型')iftype([1,2])==list:print('1是list类型')类型转换prin......