首页 > 编程语言 >python操作kafka

python操作kafka

时间:2024-03-21 16:44:27浏览次数:31  
标签:python self value kafka topic 操作 message consumer

目录

一、python 操作 kafka

说明:关于 kafka 的启动与安装,命令行的使用,请查看前面的几篇文章。本篇文章主要描述python 如何操作 kafka

1. python 使用 kafka 生产者

  • 说明:python 在操作 kafka 写入数据的时候,分为发送往已经存在的主题或者是不存在的主题,当主题不存在的时候,生产者会自动创建该主题,并将消息存贮在默认的 0 分区;

  • 使用 python 操作 kafka 首先安装如下的包

 # pip install kafka  # 可以不安装
 pip install kafka-python  # 由于 python 3.7 后的版本中 async 的关键字发生了变化,因此需要安装该包;
  • 常规的使用主要就是根据,第三方包的介绍使用,网上有许多基本的案例,此处不做介绍,下面直接将常用的方法进行封装;
 import json
 import kafka

 
 class Producer(object):
     """ kafka 的生产者模型
     """
 
     _coding = "utf-8"
 
     def __init__(self,
                  broker='192.168.74.136:9092',
                  topic="add_topic",
                  max_request_size=104857600,
                  batch_size=0,  # 即时发送,提高并发可以适当增加,但是会造成消息的延迟;
                  **kwargs):
         """初始化设置 kafka 生产者连接对象;参数不存在的情况下使用配置文件中的默认连接;
         """
         self.broker = broker
         self.topic = topic
         self.max_request_size = max_request_size
         # 实例化生产者对象
         self.producer_json = kafka.KafkaProducer(
             bootstrap_servers=self.broker,
             max_request_size=self.max_request_size,
             batch_size=batch_size,
             key_serializer=lambda k: json.dumps(k).encode(self._coding),  # 设置键的形式使用匿名函数进行转换
             value_serializer=lambda v: json.dumps(v).encode(self._coding),  # 当需要使用 json 传输地时候必须加上这两个参数
             **kwargs
         )
 
         self.producer = kafka.KafkaProducer(
             bootstrap_servers=broker,
             max_request_size=self.max_request_size,
             batch_size=batch_size,
             api_version=(0, 10, 1),
             **kwargs
         )
 
     def send(self, message: bytes, partition: int = 0):
         """
         写入普通的消息;
         Args:
             message: bytes; 字节流数据;将字符串编码成 utf-8的格式;
             partition: int; kafka 的分区,将消息发送到指定的分区之中;
         Returns:
             None
         """
         future = self.producer.send(self.topic, message, partition=partition)
         record_metadata = future.get(timeout=30)
         if future.failed():  # 发送失败,记录异常到日志;
             raise Exception("send message failed:%s)" % future.exception)
 
     def send_json(self, key: str, value: dict, partition: int = 0):
         """
         发送 json 形式的数据;
         Args:
             key: str; kafka 中键的值
             value: dict; 发送的具体消息
             partition: int; 分区的信息
         Returns:
             None
         """
         future = self.producer_json.send(self.topic, key=key, value=value, partition=partition)
         record_metadata = future.get(timeout=30)
         if future.failed():  # 发送失败记录异常;
             raise Exception("send json message failed:%s)" % future.exception)
 
     def close(self):
         """
         关闭kafka的连接。
         Returns:
             None
         """
         self.producer_json.close()
         self.producer.close()
 
 
 if __name__ == '__main__':
     '''脚本调用执行;'''
     kafka_obj = Producer()
     print(kafka_obj.broker)
     kafka_obj.send("自动生成".encode())
 
  • 发送的消息,主要是普通的字符串消息,和字典形式的消息,方便对接kafka

2. python 使用 kafka 消费者

  • 由于 kafka 消费者的特性,阻塞循环是一个必然的过程,可以使用 python 中的生成器进行优化,但是循环阻塞是无可避免的;

  • 操作 kafka 的消费者依旧只需要安装上述的两个第三方依赖包;

  • 封装指定的操作

 import json
 
 from kafka import KafkaConsumer, KafkaProducer
 from kafka.structs import TopicPartition
 
 
 class KConsumer(object):
     """kafka 消费者; 动态传参,非配置文件传入;
        kafka 的消费者应该尽量和生产者保持在不同的节点上;否则容易将程序陷入死循环中;
     """
 
     _encode = "UTF-8"
 
     def __init__(self, topics="start_server", bootstrap_server=None, group_id="start_task", partitions=None, **kwargs):
         """ 初始化kafka的消费者;
             1. 设置默认 kafka 的主题, 节点地址, 消费者组 id(不传入的时候使用默认的值)
             2. 当需要设置特定参数的时候可以直接在 kwargs 直接传入,进行解包传入原始函数;
             3. 手动设置偏移量
         Args:
             topics: str; kafka 的消费主题;
             bootstrap_server: list; kafka 的消费者地址;
             group_id: str; kafka 的消费者分组 id,默认是 start_task 主要是接收并启动任务的消费者,仅此一个消费者组id;
             partitions: int; 消费的分区,当不使用分区的时候默认读取是所有分区;
             **kwargs: dict; 其他原生kafka消费者参数的;
         """
 
         if bootstrap_server is None:
             bootstrap_server = ["192.168.74.136:9092", ]
         self.consumer = KafkaConsumer(bootstrap_servers=bootstrap_server)
         exist = self.exist_topics(topics)
         if not exist:  # 需要的主题不存在;
             # 创建一条
             self.create_topics(topics)
         if partitions is not None:
             self.consumer = KafkaConsumer(
                 bootstrap_servers=bootstrap_server,
                 group_id=group_id,  
                 # 目前只有一个消费者,根据情况是否需要进行修改;当扩展多个消费者的时候需要进行扩展;
                 **kwargs
             )
             # print("指定分区信息:", partitions, topics, type(partitions))
             self.topic_set = TopicPartition(topics, int(partitions))
             self.consumer.assign([self.topic_set])
         else:
             # 默认读取主题下的所有分区, 但是该操作不支持自定义 offset, 因为 offset 一定是在指定的分区中进行的;
             self.consumer = KafkaConsumer(
                 topics,
                 bootstrap_servers=bootstrap_server,
                 group_id=group_id,
                 **kwargs
             )
 
     def exist_topics(self, topics):
         """
         检查 kafka 中的主题是否存在;
         Args:
             topics: 主题名称;
 
         Returns:
             bool: True/False ; True,表示存在,False 表示不存在;
         """
         topics_set = set(self.consumer.topics())
         if topics not in topics_set:
             return False
         return True
 
     @staticmethod
     def create_topics(topics):
         """
         创建相关的 kafka 主题信息;说明本方法可以实现用户自定义 kafka 的启动服务,默认是使用的是 start_server;
         Args:
             topics: str; 主题的名字;
 
         Returns:
             None
         """
         producer = KafkaProducer(
             bootstrap_servers='192.168.74.136:9092',
             key_serializer=lambda k: json.dumps(k).encode('utf-8'),
             value_serializer=lambda v: json.dumps(v).encode("utf-8")
         )
         producer.send(topics, key="start", value={"msg": "aaaa"})
         producer.close()
 
     def recv(self):
         """
         接收消费中的数据
         Returns:
             使用生成器进行返回;
         """
         for message in self.consumer:  
             # 这是一个永久阻塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都会有偏移
             # print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % (
             #     message.topic, message.partition, message.offset, message.key, message.value))
             yield {"topic": message.topic, "partition": message.partition, "key": message.key,
                    "value": message.value.decode(self._encode)}
 
     def recv_seek(self, offset):
         """
         接收消费者中的数据,按照 offset 的指定消费位置;
         Args:
             offset: int; kafka 消费者中指定的消费位置;
 
         Returns:
             generator; 消费者消息的生成器;
         """
         self.consumer.seek(self.topic_set, offset)
         for message in self.consumer:
             # print("主题:%s 分区:%d:连续值:%d: 键:key=%s 值:value=%s" % (
             #     message.topic, message.partition, message.offset, message.key, message.value))
             yield {"topic": message.topic, "partition": message.partition, "key": message.key,
                    "value": message.value.decode(self._encode)}
 
 
 if __name__ == '__main__':
     """ 测试使用;
     """
 
     obj = KConsumer("exist_topic", bootstrap_server=['192.168.74.136:9092'])
     for i in obj.recv():
         print(i)
  • 该消费者封装时多增加了一个需求,消费的主题不存在的时候会默认创建,下次就可以继续消费

3. 使用 docker 中的 kafka

  • 以上两种脚本适用于 Kafka 的生产者和消费者在大多数情况下的使用,在使用的时候只需要将相关的配置信息修改即可;

  • docker 中使用 kafka 的时候与前面的配置稍有不同,当使用docker-compose部署 Kafka 的时候,地址在文件中经过修改,可能会被改变,因此只需要将相关的地址配好,即可;代码信息无需修改;

  • 一般情况下如果是在 docker 中配置相关的参数,需要将端口映射出来,然后如果是 windows 可能需要将host的网络地址解析,与docker 中 kafka 的名称对应;

 host 文件
 
 127.0.0.1 kafka
  • 当需要远程连接的时候,将地址改成该计算机在内网中的地址即可

二、python操作kafka细节

  • # 需要安装的库如下
    pip install kafka
    pip install kafka-python
    

2.1 生产者demo

from kafka import KafkaProducer
from kafka.errors import KafkaError

producer = KafkaProducer(bootstrap_servers=['broker1:1234'])

# Asynchronous by default
future = producer.send('my-topic', b'raw_bytes')

# Block for 'synchronous' sends
try:
    record_metadata = future.get(timeout=10)
except KafkaError:
    # Decide what to do if produce request failed...
    log.exception()
    pass

# Successful result returns assigned partition and offset
print (record_metadata.topic)
print (record_metadata.partition)
print (record_metadata.offset)

# produce keyed messages to enable hashed partitioning
producer.send('my-topic', key=b'foo', value=b'bar')

# encode objects via msgpack
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})

# produce json messages
producer = KafkaProducer(value_serializer=lambda m: json.dumps(m).encode('ascii'))
producer.send('json-topic', {'key': 'value'})

# produce asynchronously
for _ in range(100):
    producer.send('my-topic', b'msg')

def on_send_success(record_metadata):
    print(record_metadata.topic)
    print(record_metadata.partition)
    print(record_metadata.offset)

def on_send_error(excp):
    log.error('I am an errback', exc_info=excp)
    # handle exception

# produce asynchronously with callbacks
producer.send('my-topic', b'raw_bytes').add_callback(on_send_success).add_errback(on_send_error)

# block until all async messages are sent
producer.flush()

# configure multiple retries
producer = KafkaProducer(retries=5)

2.2 消费者demo

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])  #参数为接收主题和kafka服务器地址

# 这是一个永久堵塞的过程,生产者消息会缓存在消息队列中,并且不删除,所以每个消息在消息队列中都有偏移
for message in consumer:  # consumer是一个消息队列,当后台有消息时,这个消息队列就会自动增加.所以遍历也总是会有数据,当消息队列中没有数据时,就会堵塞等待消息带来
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

2.3 消费者(消费群组)

from kafka import KafkaConsumer
# 使用group,对于同一个group的成员只有一个消费者实例可以读取数据
consumer = KafkaConsumer('test',group_id='my-group',bootstrap_servers=['127.0.0.1:9092'])
for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
  • 启动多个消费者,只有其中某一个成员可以消费到,消费组可以横向扩展提高处理能力

2.4 消费者(读取目前最早可读的消息)

from kafka import KafkaConsumer

consumer = KafkaConsumer('test',auto_offset_reset='earliest',bootstrap_servers=['127.0.0.1:9092'])

for message in consumer:
    print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
  • auto_offset_reset:重置偏移量,earliest 移到最早的可用消息,latest 最新的消息,默认为 latest
    源码定义: {‘smallest’: ‘earliest’, ‘largest’: ‘latest’}

2.5 消费者(手动设置偏移量)

# ==========读取指定位置消息===============
from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer('test',bootstrap_servers=['127.0.0.1:9092'])

print(consumer.partitions_for_topic("test"))  #获取test主题的分区信息
print(consumer.topics())  #获取主题列表
print(consumer.subscription())  #获取当前消费者订阅的主题
print(consumer.assignment())  #获取当前消费者topic、分区信息
print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量
consumer.seek(TopicPartition(topic='test', partition=0), 5)  #重置偏移量,从第5个偏移量消费
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

2.6 消费者(订阅多个主题)

# =======订阅多个消费者==========

from kafka import KafkaConsumer
from kafka.structs import TopicPartition

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))  #订阅要消费的主题
print(consumer.topics())
print(consumer.position(TopicPartition(topic='test', partition=0))) #获取当前主题的最新偏移量
for message in consumer:
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))

2.7 消费者(手动拉取消息)

from kafka import KafkaConsumer
import time

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test','test0'))
while True:
    msg = consumer.poll(timeout_ms=5)   #从kafka获取消息
    print(msg)
    time.sleep(2)

2.8 消费者(消息挂起与恢复)

2.8.1 消息挂起和恢复的实现

# ==============消息恢复和挂起===========

from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import time

consumer = KafkaConsumer(bootstrap_servers=['127.0.0.1:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(TopicPartition(topic=u'test', partition=0))  # pause执行后,consumer不能读取,直到调用resume后恢复。
num = 0
while True:
    print(num)
    print(consumer.paused())   #获取当前挂起的消费者
    msg = consumer.poll(timeout_ms=5)
    print(msg)
    time.sleep(2)
    num = num + 1
    if num == 10:
        print("resume...")
        consumer.resume(TopicPartition(topic='test', partition=0))
        print("resume......")
  • pause 执行后,consumer不能读取,直到调用 resume 后恢复

2.8.2 完整示例

from kafka import KafkaConsumer

# To consume latest messages and auto-commit offsets
consumer = KafkaConsumer('my-topic',
                         group_id='my-group',
                         bootstrap_servers=['localhost:9092'])
for message in consumer:
    # message value and key are raw bytes -- decode if necessary!
    # e.g., for unicode: `message.value.decode('utf-8')`
    print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
                                          message.offset, message.key,
                                          message.value))

# consume earliest available messages, don't commit offsets
KafkaConsumer(auto_offset_reset='earliest', enable_auto_commit=False)

# consume json messages
KafkaConsumer(value_deserializer=lambda m: json.loads(m.decode('ascii')))

# consume msgpack
KafkaConsumer(value_deserializer=msgpack.unpackb)

# StopIteration if no message after 1sec
KafkaConsumer(consumer_timeout_ms=1000)

# Subscribe to a regex topic pattern
consumer = KafkaConsumer()
consumer.subscribe(pattern='^awesome.*')

# Use multiple consumers in parallel w/ 0.9 kafka brokers
# typically you would run each on a different server / process / CPU
consumer1 = KafkaConsumer('my-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')
consumer2 = KafkaConsumer('my-topic',
                          group_id='my-group',
                          bootstrap_servers='my.server.com')

2.9 Python创建自定义的Kafka Topic

client = KafkaClient(bootstrap_servers=brokers)

if topic not in client.cluster.topics(exclude_internal_topics=True):  # Topic不存在

    request = admin.CreateTopicsRequest_v0(
        create_topic_requests=[(
            topic,
            num_partitions,
            -1,  # replication unset.
            [],  # Partition assignment.
            [(key, value) for key, value in configs.items()],  # Configs
        )],
        timeout=timeout_ms
    )

    future = client.send(2, request)  # 2是Controller,发送给其他Node都创建失败。
    client.poll(timeout_ms=timeout_ms, future=future, sleep=False)  # 这里

    result = future.value
    # error_code = result.topic_error_codes[0][1]
    print("CREATE TOPIC RESPONSE: ", result)  # 0 success, 41 NOT_CONTROLLER, 36 ALREADY_EXISTS
    client.close()
else:  # Topic已经存在
    print("Topic already exists!")
    return

标签:python,self,value,kafka,topic,操作,message,consumer
From: https://www.cnblogs.com/Mcoming/p/18087704

相关文章

  • Kafka集群部署
    目录Kafka集群部署1.1服务器资源1.1.1安装JDK(所有设备)1.1.2配置ip和主机名映射(所有服务器)(可不做)1.1.3配置主机名(所有设备)(可不做)1.2在node1上安装、配置kafka1.2.1安装kafka1.2.2修改配置文件1.2.2.1修改zookeeper.properties1.2.2.2配置Zookeeper的id1.2.2.3......
  • python 多进程并发:生产者+多消费者模式
    多任务场景中,为了节省大量子任务串行执行的耗时,通常采用并发方式充分利用cpu和内存来节省整体任务运行时间。对于多任务并发,常见的做法自然是抽象出功能函数,借助multiprocess类在主进程中并发出多个子进程,或者构建进程池,将任务构造好后丢入进程池中来实现并发。这种方式对于......
  • python 异常捕获、断言(assert 、finally) 与日志(loguru.logger)
    异常捕获常见的异常类型代码执行顺序从上到下依次运行的,如果出错了,后面的代码不会出错。--所以要对异常做处理。常见的异常的类型,不需要记;平时写代码的时候经常会报错,积累常见错误,排查问题。常见异常的报错的类型:NameError,IndexError,KeyError,ValueError,ZeroDivisionE......
  • python 之 垃圾回收机制(Garbage Collector,简称 GC)
    垃圾回收机制有三种,主要采用引用计数机制为主,标记-清除和分代回收机制为辅的策略。其中,标记-清除机制用来解决计数引用带来的循环引用而无法释放内存的问题,分代回收机制是为提升垃圾回收的效率。1.引用计数:Python中的每个对象都有一个引用计数,每当对象被引用时,其引用......
  • python 函数(解包、互相调用、作用域、函数的封装、内置函数:eval()、zip()、open())
    函数解包"""1、函数的注释:参数和返回值在注释里可以自动添加显示,只需手动加说明。2、函数的解包【拆包】:函数的参数要传递数据有多个值的时候,中间步骤拿到数据保存在元组或者列表或者字典里。-传递参数的时候加一个*或者**解包-一次拿到元组列表字典的......
  • ElasticSearch中使用ik分词器进行实现分词操作
    简介:在默认的情况下,ES中只存在Stander分词器,但是这个分词器往往不满足我们的分词需求,这里通过ik分词器进行自定义我们的分词操作1、第一步将ik分词器进行下载下载地址:https://github.com/medcl/elasticsearch-analysis-ik需要注意,需要选择和自己的ES版本对应的版本2、将ik分词......
  • 身份证ocr,python身份证识别ocr接口代码,实名认证接口
    基于文字识别技术产物的身份证识别接口现已成熟,通过手机、电脑或者摄像头终端设备拍照或者上传身份证图片即可实现身份证照片上文字的识别,从而提取到身份证信息。翔云除了提供身份证识别接口外,还完善了实名认证接口方案,搭配翔云身份证实名认证接口可谓是效率翻倍。身份证......
  • 操作系统综合题之“用记录型信号量机制的wait和signal操作来解决了由北向南和由南向北
    1.问题:假设系统有三个并发进程read、move和print共享缓冲区B1和B2。进程read负责从输入设备上读取信息,每读取一条记录后把它存如缓冲区B1中;进程move负责从缓冲区B1中取出一条记录,整理后放入缓冲区B2;进程print负责将缓冲区B2中的记录取出并打印输出。缓冲区B1和B2每次只能存放1个......
  • 基于Python3的数据结构与算法 - 17 哈希表
    一、哈希表哈希表是一个通过哈希函数来计算数据存储位置的数据结构,通常支持如下操作:insert(key,value):插入键值对(key,value)。get(key):如果存在键值对为key的键值对则返回其value,否则返回空值。delete(key):删除键为key的键值对。1.直接寻址法当关键字的全域U比较小......
  • 从零开始学Spring Boot系列-集成Kafka
    Kafka简介ApacheKafka是一个开源的分布式流处理平台,由LinkedIn公司开发和维护,后来捐赠给了Apache软件基金会。Kafka主要用于构建实时数据管道和流应用。它类似于一个分布式、高吞吐量的发布-订阅消息系统,可以处理消费者网站的所有动作流数据。这种动作流数据包括页面浏览、搜索......