就最佳性能和更少延迟而言,哪种异步方法更适合解码从 Kafka 主题消耗的 Avro 消息?
我正在使用 Avro 库的并发 future,并且我仍然可以看到类似的执行时间与不使用并发 future 的情况相比。
from avro.io import BinaryDecoder, DatumReader
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
class DoIt():
def some_method():
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
msg_cnt_type_futures = [executor.submit(CatalogExporter.decode_avro, avro, self.schema_url) for avro in avro_list]
@staticmethod
def decode_avro(payload_tuple, schema_url):
# print("Process ID:", os.getpid())
msg_id, current_msg_offset, payload = payload_tuple
magic, schema_id = struct.unpack('>bi', payload[:5])
register_client = CachedSchemaRegistryClient(url=schema_url)
# Get Schema registry
# Avro value format
if magic == MAGIC_BYTES:
schema = register_client.get_by_id(schema_id)
reader = DatumReader(schema)
output = BinaryDecoder(io.BytesIO(payload[5:]))
decoded = reader.read(output)
return msg_id, current_msg_offset, decoded, schema.name
# no magic bytes, something is wrong
else:
raise ValueError
正在使用 Python 的
concurrent.futures.ProcessPoolExecutor
进行 Avro 解码,这是一个良好的开端,可以利用多处理来提高性能。但是,可能无法看到显著的速度提升,原因有很多:
-
开销过大: 进程创建和数据序列化/反序列化会带来开销。如果正在解码的小消息,则此开销可能会超过并行处理带来的任何性能提升。
-
Schema Registry 延迟: 在
decode_avro
函数中,正在为每条消息调用register_client.get_by_id(schema_id)
。这可能会导致网络延迟,尤其是在 Schema Registry 远程托管的情况下。 -
GIL 争用: 尽管正在使用进程池,但 Avro 库的某些部分(或其依赖项)可能仍会在解释器锁 (GIL) 上存在争用,从而限制了真正的并行性。
-
CPU 饱和: 如果的 CPU 已经满负荷运行,则添加更多进程可能无济于事,甚至可能由于上下文切换增加而降低性能。
以下是可以采取的一些措施来提高 Avro 解码性能:
1. 批量解码: 与其单独解码每条消息,不如将消息批处理到更大的块中,然后将这些块传递给的进程池。这可以通过减少进程创建和数据序列化/反序列化的开销来提高性能。
2. 缓存 Schema: 避免为每条消息都从 Schema Registry 中获取 Schema。可以实现一个本地缓存来存储 Schema,并在需要时检索它们。
3. 使用 Avro SpecificRecord: 如果可能,请使用 Avro SpecificRecord 来反序列化的消息。这比使用 DatumReader 更快,因为它是代码生成的,并且避免了运行时类型检查。
4. 尝试其他并行处理方法:
除了
ProcessPoolExecutor
之外,还可以探索其他并行处理方法,例如:
-
多线程:
如果 Avro 库不受 GIL 限制,则可以使用
ThreadPoolExecutor
。
-
异步 IO:
如果正在处理大量并发连接,则可以使用
asyncio
库来实现异步 IO。
5. 性能分析:
使用 Python 分析器(例如
cProfile
或
py-spy
)来识别代码中的性能瓶颈。这将帮助专注于需要优化的领域。
以下是使用批量解码和 Schema 缓存的改进代码示例:
from avro.io import BinaryDecoder, DatumReader
from confluent_kafka.avro.cached_schema_registry_client import CachedSchemaRegistryClient
from cachetools import TTLCache
MAGIC_BYTES = 0
class DoIt():
def __init__(self, schema_url):
self.schema_url = schema_url
self.schema_cache = TTLCache(maxsize=100, ttl=3600) # Cache schemas for 1 hour
self.register_client = CachedSchemaRegistryClient(url=schema_url)
def some_method(self):
# ... your code to consume messages from Kafka ...
# Batch messages for decoding
batch_size = 100
message_batches = [avro_list[i:i + batch_size] for i in range(0, len(avro_list), batch_size)]
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(self.decode_avro_batch, batch) for batch in message_batches]
for future in concurrent.futures.as_completed(futures):
# Process decoded messages
pass
def decode_avro_batch(self, payload_tuples):
decoded_messages = []
for payload_tuple in payload_tuples:
msg_id, current_msg_offset, payload = payload_tuple
magic, schema_id = struct.unpack('>bi', payload[:5])
# Get schema from cache or registry
schema = self.schema_cache.get(schema_id)
if schema is None:
schema = self.register_client.get_by_id(schema_id)
self.schema_cache[schema_id] = schema
if magic == MAGIC_BYTES:
reader = DatumReader(schema)
output = BinaryDecoder(io.BytesIO(payload[5:]))
decoded = reader.read(output)
decoded_messages.append((msg_id, current_msg_offset, decoded, schema.name))
else:
raise ValueError
return decoded_messages
请记住,最佳方法将取决于的特定用例和数据特征。建议尝试不同的方法并分析其性能,以确定最适合的方法。
标签:python,apache-kafka,python-multiprocessing,avro,concurrent.futures From: 78822346