首页 > 编程问答 >Avro:在 Python 中使用多处理进行解码

Avro:在 Python 中使用多处理进行解码

时间:2024-08-02 04:16:39浏览次数:11  
标签:python apache-kafka python-multiprocessing avro concurrent.futures

就最佳性能和更少延迟而言,哪种异步方法更适合解码从 Kafka 主题消耗的 Avro 消息?

我正在使用 Avro 库的并发 future,并且我仍然可以看到类似的执行时间与不使用并发 future 的情况相比。

from avro.io import BinaryDecoder, DatumReader
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient

class DoIt():
    def some_method():
        with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
                msg_cnt_type_futures = [executor.submit(CatalogExporter.decode_avro, avro, self.schema_url) for avro in avro_list]


    @staticmethod
    def decode_avro(payload_tuple, schema_url):
        # print("Process ID:", os.getpid())
        msg_id, current_msg_offset, payload = payload_tuple
        magic, schema_id = struct.unpack('>bi', payload[:5])
        register_client = CachedSchemaRegistryClient(url=schema_url)

        # Get Schema registry
        # Avro value format
        if magic == MAGIC_BYTES:
            schema = register_client.get_by_id(schema_id)
            reader = DatumReader(schema)
            output = BinaryDecoder(io.BytesIO(payload[5:]))
            decoded = reader.read(output)
            return msg_id, current_msg_offset, decoded, schema.name
        # no magic bytes, something is wrong
        else:
            raise ValueError


正在使用 Python 的 concurrent.futures.ProcessPoolExecutor 进行 Avro 解码,这是一个良好的开端,可以利用多处理来提高性能。但是,可能无法看到显著的速度提升,原因有很多:

  1. 开销过大: 进程创建和数据序列化/反序列化会带来开销。如果正在解码的小消息,则此开销可能会超过并行处理带来的任何性能提升。

  2. Schema Registry 延迟: decode_avro 函数中,正在为每条消息调用 register_client.get_by_id(schema_id) 。这可能会导致网络延迟,尤其是在 Schema Registry 远程托管的情况下。

  3. GIL 争用: 尽管正在使用进程池,但 Avro 库的某些部分(或其依赖项)可能仍会在解释器锁 (GIL) 上存在争用,从而限制了真正的并行性。

  4. CPU 饱和: 如果的 CPU 已经满负荷运行,则添加更多进程可能无济于事,甚至可能由于上下文切换增加而降低性能。

以下是可以采取的一些措施来提高 Avro 解码性能:

1. 批量解码: 与其单独解码每条消息,不如将消息批处理到更大的块中,然后将这些块传递给的进程池。这可以通过减少进程创建和数据序列化/反序列化的开销来提高性能。

2. 缓存 Schema: 避免为每条消息都从 Schema Registry 中获取 Schema。可以实现一个本地缓存来存储 Schema,并在需要时检索它们。

3. 使用 Avro SpecificRecord: 如果可能,请使用 Avro SpecificRecord 来反序列化的消息。这比使用 DatumReader 更快,因为它是代码生成的,并且避免了运行时类型检查。

4. 尝试其他并行处理方法: 除了 ProcessPoolExecutor 之外,还可以探索其他并行处理方法,例如: - 多线程: 如果 Avro 库不受 GIL 限制,则可以使用 ThreadPoolExecutor 。 - 异步 IO: 如果正在处理大量并发连接,则可以使用 asyncio 库来实现异步 IO。

5. 性能分析: 使用 Python 分析器(例如 cProfile py-spy )来识别代码中的性能瓶颈。这将帮助专注于需要优化的领域。

以下是使用批量解码和 Schema 缓存的改进代码示例:

from avro.io import BinaryDecoder, DatumReader
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from cachetools import TTLCache

MAGIC_BYTES = 0

class DoIt():
    def __init__(self, schema_url):
        self.schema_url = schema_url
        self.schema_cache = TTLCache(maxsize=100, ttl=3600)  # Cache schemas for 1 hour
        self.register_client = CachedSchemaRegistryClient(url=schema_url)

    def some_method(self):
        # ... your code to consume messages from Kafka ...

        # Batch messages for decoding
        batch_size = 100
        message_batches = [avro_list[i:i + batch_size] for i in range(0, len(avro_list), batch_size)]

        with concurrent.futures.ProcessPoolExecutor() as executor:
            futures = [executor.submit(self.decode_avro_batch, batch) for batch in message_batches]
            for future in concurrent.futures.as_completed(futures):
                # Process decoded messages
                pass

    def decode_avro_batch(self, payload_tuples):
        decoded_messages = []
        for payload_tuple in payload_tuples:
            msg_id, current_msg_offset, payload = payload_tuple
            magic, schema_id = struct.unpack('>bi', payload[:5])

            # Get schema from cache or registry
            schema = self.schema_cache.get(schema_id)
            if schema is None:
                schema = self.register_client.get_by_id(schema_id)
                self.schema_cache[schema_id] = schema

            if magic == MAGIC_BYTES:
                reader = DatumReader(schema)
                output = BinaryDecoder(io.BytesIO(payload[5:]))
                decoded = reader.read(output)
                decoded_messages.append((msg_id, current_msg_offset, decoded, schema.name))
            else:
                raise ValueError

        return decoded_messages

请记住,最佳方法将取决于的特定用例和数据特征。建议尝试不同的方法并分析其性能,以确定最适合的方法。

标签:python,apache-kafka,python-multiprocessing,avro,concurrent.futures
From: 78822346

相关文章

  • 白盒测试基础与实践:Python示例及流程图设计
    文章目录前言一、白盒测试是什么?主要特点常用方法优点缺点二、白盒测试常用技术语句覆盖判定覆盖条件覆盖判定/条件覆盖条件组合覆盖路径覆盖三、程序流程图设计四、测试用例设计1.基本路径法2.语句覆盖3.判断覆盖4.条件覆盖5.判断/条件覆盖6.条件组合覆盖总结......
  • 【python的语法特点,如注释规则、代码缩进、编写规范等】
    介绍一下python的语法特点,如注释规则、代码缩进、编写规范等Python是一种广泛使用的高级编程语言,以其简洁易读的语法、丰富的标准库和强大的第三方库而闻名。下面我将详细介绍Python的一些基本语法特点,包括注释规则、代码缩进、以及编写规范等。一、注释规则Python......
  • 深圳大学-数据科学导论实验-python数据探索
    实验目的与要求掌握python编程基础。掌握数据探索基本操作。实验环境WindowsPyCharm实验数据salaries.csv"","rank","discipline","yrs.since.phd","yrs.service","sex","salary""1","Prof","B",......
  • 基于Python+Django协同过滤算法的招聘信息推荐系统设计与实现(源码+数据库+讲解)
    文章目录前言详细视频演示项目运行截图技术框架后端采用Django框架前端框架Vue可行性分析系统测试系统测试的目的系统功能测试数据库表设计代码参考数据库脚本为什么选择我?获取源码前言......
  • python 栈帧沙箱逃逸
    基础理论什么是生成器生成器是python中的一种特殊的迭代器,在每次生成值以后会保留当前状态,以便下次调用可以继续生成值.python中生成器通过yield关键词进行定义,每次调用的时候返回一个值,并保持当前状态的同时暂停函数的执行.当下一次调用生成器的时候,函数会从上次暂停的位......
  • Transformer预测模型及其Python和MATLAB实现
    ###一、背景在自然语言处理(NLP)领域,传统的序列到序列(Seq2Seq)模型大多依赖于循环神经网络(RNN)和长短期记忆(LSTM)网络。这些模型虽然在许多任务中取得了成功,但由于其计算效率低下以及长距离依赖关系处理的不足,导致模型训练时间漫长,并在处理较长文本时效果不佳。2017年,Vaswani等人......
  • 随机森林(Random Forest)预测模型及其特征分析(Python和MATLAB实现)
    ##一、背景在大数据和机器学习的快速发展时代,数据的处理和分析变得尤为重要。随着多个领域积累了海量数据,传统的统计分析方法常常无法满足复杂问题的需求。在这种背景下,机器学习方法开始广泛应用。随机森林(RandomForest)作为一种强大的集成学习方法,因其高效性和较强的泛化能......
  • 【python脚本打包成exe】
    python项目打包成exe安装包分为三部分:1.python项目打包成可执行文件,依赖于一个python插件包pyinstaller        通过pipinstallpyinstaller安装即可。2.将python项目打包成可执行的exe文件及其依赖包        2.1在项目根目录下,输出python项目的配置文件(x......
  • 在Python中,如果想要获取昨天和前天的日期时间,使用datetime模块中的datetime.now()来获
    问:在Python中,如果想要获取昨天和前天的日期时间,使用datetime模块中的datetime.now()来获取当前时间,然后通过减去相应的时间差来得到昨天和前天的日期时间答:在Python中,如果你想要获取昨天和前天的日期时间,你可以使用datetime模块中的datetime.now()来获取当前时间,然后通过减去相......
  • python中  datetime.now() 获取当前时间 例如:2023-04-01 12:34:56.789012
    问:python中 datetime.now()获取当前时间例如:2023-04-0112:34:56.789012答:在Python中,datetime.now()函数是用来获取当前日期和时间的。但是,需要注意的是,这个函数是datetime模块中datetime类的一个方法,因此你需要从datetime模块中导入datetime类(尽管这看起来有点......