首页 > 编程语言 >python并发与并行(八) ———— 用协程实现高并发的I/O

python并发与并行(八) ———— 用协程实现高并发的I/O

时间:2024-08-31 15:27:08浏览次数:13  
标签:协程 get python self 用协程 并发 state grid def


在前面几条里,我们以生命游戏为例,试着用各种方案解决I/O并行问题,这些方案在某些情况下确实可行,但如果同时需要执行的I/O任务有成千上万个,那么这些方案的效率就不太理想了

像这种在并发方面要求比较高的I/O需求,可以用Python的协程(coroutine)来解决。协程能够制造出一种效果,让我们觉得Python程序好像真的可以同时执行大量任务。这种效果需要使用async与await关键字来实现,它的基本原理与生成器(generator)类似,也就是不立刻给出所有的结果,而是等需要用到的时候再一项一项地获取

启动协程是有代价的,就是必须做一次函数调用。协程激活之后,只占用不到1KB内存,所以只要内存足够,协程稍微多一些也没关系。与线程类似,协程所要执行的任务也是用一个函数来表示的,在执行这个函数的过程中,协程可以从执行环境里面获取输入值,并把输出结果放到这个执行环境之中。协程与线程的区别在于,它不会把这个函数从头到尾执行完,而是每遇到一个await表达式,就暂停一次,下次继续执行的时候,它会先等待await所针对的那项awaitable操作有了结果(那项操作是用async函数表示的),然后再推进到下一个await表达式那里(这跟生成器函数的运作方式有点像,那种函数也是一遇到yield就暂停)。

Python系统可以让数量极多的async函数各自向前推进,看起来像很多条Python线程那样,能够并发地运行。然而,这些协程并不会像线程那样占用大量内存,启动和切换的开销也比较小,而且不需要用复杂的代码来实现加锁或同步。这种强大的机制是通过事件循环(event loop)打造的,只要把相关的函数写对,这种循环就可以穿插着执行许多个这样的函数,并且执行得相当快,从而高效地完成并发式的I/O任务。

现在就用协程来实现生命游戏。我们的目标是让游戏能够高效地执行game_logic函数里面的I/O操作,同时又不像前面提到的Thread方案与Queue方案那样,有那么多缺点。首先修改game_logic函数,这次必须在定义函数所用的那个def关键字前面,加上async,表示该函数是一个协程,这样我们就可以在函数里面用await做I/O了(例如从套接字(socket)之中异步读取一份数据)。同理,给step_cell函数也添上async关键字,把它变为协程,并在调用game_logic的那个地方使用await关键字。

ALIVE = '*'
EMPTY = '-'

class Grid:
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)

    def get(self, y, x):
        return self.rows[y % self.height][x % self.width]

    def set(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state

    def __str__(self):
        output = ''
        for row in self.rows:
            for cell in row:
                output += cell
            output += '\n'
        return output

def count_neighbors(y, x, get):
    n_ = get(y - 1, x + 0)  # North
    ne = get(y - 1, x + 1)  # Northeast
    e_ = get(y + 0, x + 1)  # East
    se = get(y + 1, x + 1)  # Southeast
    s_ = get(y + 1, x + 0)  # South
    sw = get(y + 1, x - 1)  # Southwest
    w_ = get(y + 0, x - 1)  # West
    nw = get(y - 1, x - 1)  # Northwest
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count

# async def game_logic(state, neighbors):
#     # Do some input/output in here:
#     data = await my_socket.read(50)

async def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2:
            return EMPTY     # Die: Too few
        elif neighbors > 3:
            return EMPTY     # Die: Too many
    else:
        if neighbors == 3:
            return ALIVE     # Regenerate
    return state


async def step_cell(y, x, get, set):
    state = get(y, x)
    neighbors = count_neighbors(y, x, get)
    next_state = await game_logic(state, neighbors)
    set(y, x, next_state)

simulate函数也同样需要变为协程。

import asyncio

async def simulate(grid):
    next_grid = Grid(grid.height, grid.width)

    tasks = []
    for y in range(grid.height):
        for x in range(grid.width):
            task = step_cell(
                y, x, grid.get, next_grid.set)      # Fan out
            tasks.append(task)

    await asyncio.gather(*tasks)                    # Fan in

    return next_grid

async版本的simulate函数,有以下几个地方需要解释:
▪ 第一,它在调用step_cell的时候,系统并不会立刻执行这个函数,而是会返回一个协程实例,稍后会把这个实例写在await表达式里面。这里的step_cell,好比那种用yield写的生成器函数一样,调用时并不立刻执行它,而是返回一个生成器实例。这样就可以实现任务fan-out(分派)模式了。
▪ 第二,内置的asyncio模块提供了gather函数,可以用来实现fan-in(归集)模式。把gather写在await表达式里面,可以让系统用事件循环去并发地执行那些step_cell协程,并在全部执行完之后,再往下推进simulate协程。
▪ 第三,由于这些代码都是在同一条线程里执行的,因此不需要给Grid(网格)实例加锁,至于怎样让这些I/O操作表现出平行的效果,则是由asyncio所提供的事件循环来负责的。

最后,要调整原范例之中用来推动游戏流程的那段代码。我们只需要修改一行代码,也就是把simulate(grid)这个协程交给asyncio.run去运行,从而利用事件循环机制去执行推进单元格状态所需的那些I/O操作。

class ColumnPrinter:
    def __init__(self):
        self.columns = []

    def append(self, data):
        self.columns.append(data)

    def __str__(self):
        row_count = 1
        for data in self.columns:
            row_count = max(
                row_count, len(data.splitlines()) + 1)

        rows = [''] * row_count
        for j in range(row_count):
            for i, data in enumerate(self.columns):
                line = data.splitlines()[max(0, j - 1)]
                if j == 0:
                    padding = ' ' * (len(line) // 2)
                    rows[j] += padding + str(i) + padding
                else:
                    rows[j] += line

                if (i + 1) < len(self.columns):
                    rows[j] += ' | '

        return '\n'.join(rows)

logging.getLogger().setLevel(logging.ERROR)

grid = Grid(5, 9)
grid.set(0, 3, ALIVE)
grid.set(1, 4, ALIVE)
grid.set(2, 2, ALIVE)
grid.set(2, 3, ALIVE)
grid.set(2, 4, ALIVE)

columns = ColumnPrinter()
for i in range(5):
    columns.append(str(grid))
    grid = asyncio.run(simulate(grid))   # Run the event loop

print(columns)

协程的优点是,能够把那些与外部环境交互的代码(例如I/O调用)与那些实现自身需求的代码(例如事件循环)解耦。这让我们可以把重点放在实现需求所用的逻辑上面,而不用专门花时间去写一些代码来确保这些需求能够并发地执行。

我们同样按之前的测一下性能。

配置

性能(s)

单线程

线程方案实现多线程

队列方案实现多线程

ThreadPoolExecutor方案实现多线程

协程方式

Grid(500,900)

step 100次

线程数 5

55.45792198181152

170.44810271263123

6410.6107659339905

382.2381818294525

Grid(50,90)

step 100次

线程数 5

0.6782510280609131

14.29249095916748

2.5422348976135254

5.175195217132568

2.2674009799957275


标签:协程,get,python,self,用协程,并发,state,grid,def
From: https://blog.51cto.com/u_15302822/11882938

相关文章

  • python并发与并行(五.1) ———— 不要在每次fan-out时都新建一批Thread实例
    我们使用康威生命游戏的例子来解释这个专题。首先我们要实现一个康威生命游戏。这是个经典的有限状态自动机。它的规则很简单:在任意长宽的二维网格中,每个单元格都必须处于ALIVE或EMPTY状态,前者表示这个单元格里有生命存在,后者表示这里没有生物(或者原有生物已经死亡)。时钟每走一格......
  • Selenium+Python自动化测试环境搭建
    1.什么是Selenium?        Selenium主要用于web应用程序的自动化测试,但并不局限于此,它还支持所有基于web的管理任务自动化。2、selenium自动化流程如下:自动化程序调用Selenium客户端库函数(比如点击按钮元素)客户端库会发送Selenium命令给浏览器的驱动程序浏览......
  • 【Python-办公自动化】1秒解决海量查找替换难题
    欢迎来到"花花ShowPython",一名热爱编程和分享知识的技术博主。在这里,我将与您一同探索Python的奥秘,分享编程技巧、项目实践和学习心得。无论您是编程新手还是资深开发者,都能在这里找到有价值的信息和灵感。自我介绍:我热衷于将复杂的技术概念以简单易懂的方式呈现给大家,......
  • 02python
    1.布尔类型和比较运算符1.1布尔(bool)类型布尔(bool)表达现实生活中的逻辑,即真和假:True表示真;False表示假。True本质上是一个数字记作1,False记作01.1.1布尔类型字面量True表示真(是、肯定)False表示假(否、否定)1.1.2定义变量存储布尔类型数据变量名称=布尔类型字面量布尔类型不......
  • A-计算机毕业设计定制:10508民大校园美食推荐系统的设计与实现(免费领源码)可做计算机毕
    摘要 随着数字化时代的到来,校园美食推荐系统的设计与实现具有重要意义。针对民大校园中商家、普通用户和管理员之间的信息交互和服务需求,开发这样一个系统能够有效促进校园内美食资源的共享和利用,提供美食介绍和美食推荐的渠道,提高校园内美食行业的服务水平,增强校园内外用户......
  • A-计算机毕业设计定制:18099居家养老服务系统(免费领源码)可做计算机毕业设计JAVA、PHP
    摘  要1绪论1.1研究背景1.2研究意义1.3主要研究内容1.4论文章节安排2 相关技术介绍2.1Node.JS编程语言2.2MySQL数据库3 系统分析3.1可行性分析3.1.1技术可行性分析3.1.2经济可行性分析3.1.3操作可行性分析3.2系统流程分析3.2.1 ......
  • [开题报告]flask框架的殡仪馆信息管理系统设计与实现(python+程序+论文)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着社会的进步和人口老龄化的加剧,殡葬服务行业面临着前所未有的挑战与机遇。传统的手工记录与管理方式已难以满足现代殡仪馆高效、规范、......
  • [开题报告]flask框架的毕业生求职系统的设计与实现k2r16(程序+论文+python)
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表开题报告内容研究背景随着高等教育的普及和就业市场的日益竞争,毕业生在求职过程中面临着信息获取不对称、求职渠道有限、面试流程繁琐等挑战。传统的求职方式往......
  • MacOS 中 python无法正常使用turtle或tkinter 解决方案(备份文章)
    将以前在win机子上写的python文件拿到mac上复习时发现的问题直接运行turtle文件出现了以下报错原文:DEPRECATIONWARNING:ThesystemversionofTkisdeprecatedandmayberemovedinafuturerelease.Pleasedon’trelyonit.SetTK_SILENCE_DEPRECATION=1t......
  • 使用python基于fastapi发布接口(二)-连接mysql数据库查询数据
    上一章在这里操作MySQL数据库使用mysql-connector-python库安装mysql-connector-pythonpipinstallmysql-connector-python代码编写在原来代码基础上添加数据库连接配置fromtypingimportUnionfromfastapiimportFastAPIapp=FastAPI(......