首页 > 编程语言 >Python3 使用confluent_kafka实现异步向Kafka中写入数据

Python3 使用confluent_kafka实现异步向Kafka中写入数据

时间:2023-07-06 14:49:20浏览次数:51  
标签:__ self confluent Kafka topic ._ import kafka

使用的python包

pip install confluent-kafka

创建topic && 扩充partition

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time:2023/2/6 16:48
# @Software:PyCharm
__author__ = "JentZhang"

import json

from confluent_kafka.admin import AdminClient, NewPartitions, NewTopic


client = AdminClient({
    'bootstrap.servers': "172.25.114.8:9093,172.25.114.14:9094,172.25.114.57:9095",
})

topics = client.list_topics()
print(topics.topics)
# # 创建topic
new_topics = [NewTopic(topic, num_partitions=3, replication_factor=1) for topic in ["topic1", "topic2"]]
# Note: In a multi-cluster production scenario, it is more typical to use a replication_factor of 3 for durability.

# Call create_topics to asynchronously create topics. A dict
# of <topic,future> is returned.
fs = client.create_topics(new_topics)

# Wait for each operation to finish.
for topic, f in fs.items():
    try:
        f.result()  # The result itself is None
        print("Topic {} created".format(topic))
    except Exception as e:
        print("Failed to create topic {}: {}".format(topic, e))
# print(json.dumps(topics.topics, indent=4))

# 添加新的partition
client.create_partitions([NewPartitions("test01", 10)])

同步生产数据

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time:2023/2/6 13:40
# @Software:PyCharm
__author__ = "JentZhang"

import logging
# 引入生产者、消费者
import random

from confluent_kafka import Consumer, Producer
# 引入指针和kafka异常数据机制
from confluent_kafka import TopicPartition, KafkaError
import json


conf = {'bootstrap.servers': "172.25.114.8:9093,172.25.114.14:9094,172.25.114.57:9095", 'client.id': "zt01"}
# 初始化producer对象
producer = Producer(conf)


# 推送数据 topic为主题,json_data 为传送的数据
# 生产数据异常处理机制
def __publish_delivery_report(err, msg) -> None:
    """
    发布消息记录
    :param err: 消息的错误信息
    :param msg: 消息
    :return:
    """
    try:
        if err is not None:
            print('Message delivery failed: {}'.format(err))
        else:
            print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
    except Exception as e:
        print(e.args)


for i in range(100000):
    need_push = {
        "name": f"张三-{i}",
        "age": random.randint(19, 59)
    }

    # json数据格式转化
    data = json.dumps(need_push)
    # # 推送数据 publisher_topic推送主题, data 数据, callback 召回处理机制
    producer.produce("test01", data, callback=__publish_delivery_report)
    producer.poll()  # kafka_producer_timeout 为超时时间
    producer.flush()

异步生产数据

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# @Time:2023/2/6 13:40
# @Software:PyCharm
__author__ = "JentZhang"

import random
import asyncio
from threading import Thread
# 引入生产者、消费者
from confluent_kafka import Consumer, Producer, KafkaException
# 引入指针和kafka异常数据机制
from confluent_kafka import TopicPartition, KafkaError
import json


class AIOProducer:
    def __init__(self, configs, loop=None):
        self._loop = loop or asyncio.get_event_loop()
        self._producer = Producer(configs)
        self._cancelled = False
        self._poll_thread = Thread(target=self._poll_loop)
        self._poll_thread.start()

    def _poll_loop(self):
        while not self._cancelled:
            self._producer.poll(0.1)

    def close(self):
        self._cancelled = True
        self._poll_thread.join()

    def produce(self, topic, value):
        result = self._loop.create_future()

        def ack(err, msg):
            if err:
                self._loop.call_soon_threadsafe(
                    result.set_exception, KafkaException(err))
            else:
                self._loop.call_soon_threadsafe(
                    result.set_result, msg)

        self._producer.produce(topic, value, on_delivery=ack)
        return result


if __name__ == '__main__':
    conf = {'bootstrap.servers': "172.25.114.8:9093,172.25.114.14:9094,172.25.114.57:9095", 'client.id': "zt01"}
    producer = AIOProducer(conf)
    for i in range(10000):
        need_push = {
            "name": f"张三-{i}",
            "age": random.randint(19, 59)
        }

        # json数据格式转化
        data = json.dumps(need_push)
        # # 推送数据 publisher_topic推送主题, data 数据, callback 召回处理机制
        producer.produce("test01", data)

消费数据


import logging
import json
# 引入生产者、消费者
from confluent_kafka import Consumer, Producer
# 引入指针和kafka异常数据机制
from confluent_kafka import TopicPartition, KafkaError

conf = {'bootstrap.servers': "172.25.114.8:9093,172.25.114.14:9094,172.25.114.57:9095",  # 地址接口host1:9092
        'group.id': "test-group-id",  # 分组号
        'enable.auto.commit': True,  # 是否自动提交偏移量
        'topic.metadata.refresh.interval.ms': "3000",
        'auto.commit.interval.ms': "3000",
        'default.topic.config': {'auto.offset.reset': 'smallest'}  # 默认设置topic的消费的方式
        # 'default.topic.config': {'auto.offset.reset': 'latest'}
        }
subscriber = Consumer(conf)

subscriber.subscribe(["test01"])

while True:
    msg = subscriber.poll(1)
    if msg is None:
        continue
    else:
        if msg.error():
            print(msg.error())
        else:
            message = msg.value()
            print(msg.partition(), msg.offset())
            print(json.loads(message))

标签:__,self,confluent,Kafka,topic,._,import,kafka
From: https://www.cnblogs.com/JentZhang/p/17532043.html

相关文章

  • kafka安装和配置
    先安装zookeeper:Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeperDocker安装zookeeper:拉取镜像:dockerpullzookeeper:3.4.14创建容器:dockerrun-d--namezookeeper-p2181:2181zookeeper:3.4.14Docker安装kafka:......
  • Jmeter学习之八_测试kafka
    Jmeter学习之八_测试kafka背景最近在持续学习.昨天学习了grafana展示Jmeter测试数据库的结果今天想着能够测试一下kafka验证一下kafka的吞吐量等信息说干就干的.遇到的坑本来计划使用pepper-box或者是kafkameter进行相关的测试工作但是发现资料都比较陈旧,耗费了非......
  • docker安装kafka集群
    搭建docker基本环境搭建docker环境非本处详细讲解,了解或查阅资料即可拉取镜像(zookeeper以及kafka的)dockerpullwurstmeister/zookeeperdockerpullwurstmeister/kafka启动启动zookeeper容器dockerrun-d--namezookeeper-p2181:2181-twurstmeister/zookeep......
  • Kafka概述
    消息中间件对比: 选择建议:  Kafka:是一个分布式流媒体平台类似于消息队列或企业消息传递系统。kafka官网:http://kafka.apache.org/                            ......
  • Kafka 消费者
    目录消费者简介pull模式消费者消费者群组消费流程消费者API创建消费者订阅主题轮询获取消息手动提交偏移量同步提交异步提交重试异步提交同步和异步组合提交提交特定的偏移量从特定偏移量处开始处理关闭连接分区再均衡什么是分区再均衡何时生分区再均衡分区再均衡的过程如何判定......
  • Kafka 生产者
    目录生产者简介生产者传输实体生产者发送流程生产者API创建生产者同步发送异步发送异步响应发送关闭连接生产者的连接何时创建TCP连接何时关闭TCP连接序列化分区什么是分区为什么要分区分区策略自定义分区策略压缩Kafka的消息格式Kafka的压缩流程压缩过程解压缩的过程压缩......
  • 使用Kafka,实现Windows主机和Linux虚拟机的传输
    要在Windows主机和Linux虚拟机之间使用Kafka进行数据传输,需要按照以下详细步骤进行操作:在Windows主机上:1. 下载并安装Java Development Kit(JDK),确保在Windows上安装了Java。2. 下载Apache Kafka,可以从官方网站(https://kafka.apache.org/downloads)下载最新版本的Kafka。3. 解......
  • 在Windows下安装使用Kafka
    准备工作Java环境Kafka安装包(已包含zookeeper)安装步骤1Java安装自行百度2下载、安装Kafka 打开 下载地址 选择下图红框中的版本,Kafka包名组成:Scala版本-Kafka自身版本下载完成之后解压,目录如下图:3启动服务3.1启动ZooKeeper打开kafka_2.12-2.1.0\bin\w......
  • 记一次python消费kafka进程持续消耗内存问题
    前提:python写了一个kafka消费的脚本,脚本中消费kafka消息并将消费到的数据放在一个线程池中进行业务代码处理,使用supervisor管理这个脚本进程遇到问题:这个进程占用的内存会越来越大,知道将机器内存消耗完排查:网上找了一堆内存分析工具,好像都需要预埋代码,或者重新启动一个进程,全扯......
  • Kafka—生产者和消费者的内部结构
    生产者将数据发布到Kafka主题的应用程序称为生产者。应用程序集成了一个Kafka客户端库来写入Kafka。编写过程从创建ProducerRecird开始。KafkaProducers中的组件/流程拦截器——可以在发送之前改变记录的拦截器,例如Claim-check-interceptor。生产者元数据——管理生产者所需......