首页 > 其他分享 >Kafka Topic 中明明有可拉取的消息,为什么 poll 不到

Kafka Topic 中明明有可拉取的消息,为什么 poll 不到

时间:2024-08-28 08:57:21浏览次数:8  
标签:消费者 props kafka Topic 镜像 可拉取 docker poll Kafka

开心一刻

今天小学女同学给我发消息
她:你现在是毕业了吗
我:嗯,今年刚毕业
她给我发了一张照片,怀里抱着一只大橘猫
她:我的眯眯长这么大了,好看吗
我:你把猫挪开点,它挡住了,我看不到
她:你是 sb 吗,滚
我解释道:你说的是猫呀
可消息刚发出,就出现了红色感叹号,并提示:消息已发出,但被对方拒收了

你也没说猫叫眯眯呀

kafka搭建

出于简单考虑,基于 docker 搭建一个 kafka 节点;因为一些原因,国内的 Docker Hub 镜像加速器都不可用了,目前比较靠谱的做法是搭建个人镜像仓库,可参考:Docker无法拉取镜像解决办法,我已经试过了,是可行的,但还是想补充几点

  1. sync-image-example.yml 只需要修改最后的镜像拷贝,其他内容不需要改

    sync-image-example_改动点

    支持一次配置多个镜像的拷贝

  2. 镜像拷贝

    docker 镜像拷贝命令的格式

    skopeo copy docker://docker.io/命名空间/镜像名:TAG docker://阿里云镜像地址/命名空间/镜像名:TAG

    我们以 kafka 为例,去 Docker Hub 一搜,好家伙,搜出来上万个

    上万个kafka镜像

    我们将搜索条件精确化一些,搜 wurstmeister/kafka

    wurstmeister_kafka

    点进去,它在 Docker Hub 的地址是:

    https://hub.docker.com/r/wurstmeister/kafka

    那它的 docker 地址就是

    docker://docker.io/wurstmeister/kafka

    其他的镜像用类似的方式去找,所以最终的拷贝命令类似如下:

    skopeo copy docker://docker.io/wurstmeister/kafka:latest docker://registry.cn-hangzhou.aliyuncs.com/qingshilu/wurstmeister_kafka:latest

    如果一切顺利,那么在我们的阿里云个人镜像仓库就能看到我们拷贝的镜像了

    阿里云个人镜像
  3. 如何 pull

    在个人仓库点镜像名,会看到 操作指南

    如何pull

    我们只关注前两步,就可以将镜像 pull 下来

    镜像过滤_wurstmeister

镜像获取到之后,就可以搭建 kafka 了;因为依赖 zookeeper,我们先启动它

docker run -d --name zookeeper-test -p 2181:2181 \
--env ZOO_MY_ID=1 \
-v zookeeper_vol:/data \
-v zookeeper_vol:/datalog \
-v zookeeper_vol:/logs \
registry.cn-hangzhou.aliyuncs.com/qingshilu/wurstmeister_zookeeper

然后启动 kafka

docker run -d --name kafka-test -p 9092:9092 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.2.118:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.2.118 \
--env KAFKA_ADVERTISED_PORT=9092  \
--env KAFKA_LOG_DIRS=/kafka/logs \
-v kafka_vol:/kafka  \
registry.cn-hangzhou.aliyuncs.com/qingshilu/wurstmeister_kafka

不出意外的话,都启动成功

kafka启动成功

如果出意外了,大家也别慌,用 docker log 去查看日志,然后找对应的解决方案

# 1.先找到启动失败的容器id
docker ps -a
# 2.用 docker log 查看容器启动日志
docker log 容器id

如果需要开启 kafkaSASL 认证,可参考:Docker-Compose搭建带SASL用户密码验证的Kafka 来搭建

Kafka Tool

详情可查看:kafka可视化客户端工具(Kafka Tool)的基本使用

kafka_tool连接成功

创建 Topic:test-topic,并发送一条消息

创建test_topic并发送一条消息

此时 test-topic 中有 1 条消息

消费者 poll

代码很简单

/**
 * @author: 青石路
 */
public class MsgConsumer {

    private static final Logger LOGGER = LoggerFactory.getLogger(MsgConsumer.class);

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.2.118:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test_group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        // 如果在kafka中找不到当前消费者的偏移量,则设置为最旧的
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
        // 订阅主题
        consumer.subscribe(Collections.singleton("test-topic"));
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        LOGGER.info("records count = {}", records.count());
        records.forEach(record -> LOGGER.info("{} - {} - {}", record.offset(), record.key(), record.value()));
        // consumer.commitAsync();
        consumer.close();
    }
}

我们执行下,输出日志如下

poll没拉取到消息_日志

竟然 poll 不到消息,为什么呀?

思考考

我们调整下代码,循环 poll

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    LOGGER.info("records count = {}", records.count());
    records.forEach(record -> LOGGER.info("{} - {} - {}", record.offset(), record.key(), record.value()));
}

我们再执行下,输出日志如下

while循环后日志

消费者 poll 的过程中会先判断当前消费者是否在 消费者组 中,如果不在,会先加入消费者组,在加入过程中,ConsumerCoordinator 会对这个消费者组 Rebalance,整个过程中该消费者组内的所有消费者都不能工作,而 poll 又配置了超时时间(100 毫秒),如果在超时时间内,当前消费者还未正常加入消费者组中,那么 poll 肯定是拉取不到数据的;根据日志可以看出,第 3 次 poll 的时候,消费者已经正常加入消费者组中,那么就能 poll 到数据了

很多小伙伴可能可能会有这样的疑问

平时在项目中使用的时候,从来没有感受到这样的问题,为什么呢

原因有以下几点

  1. poll 的超时时间设置比较长,超时时间内消费者能够正常加入到消费者组中
  2. 消费者随项目的启动创建,存活周期与项目一致,那么只有前几次 poll 的时候,可能会因为消费者未加入到消费者组中而拉取不到数据,而一旦消费者成功加入到消费者组之后,那么只要 Topic 中有数据,poll 肯定能拉取到数据;从整个次数占比来看,poll 拉取不到数据的异常情况(Topic 中有可拉取的数据,但 poll 不到)占比非常小,小到可以忽略不计了

所以你们感受不到这样;但如果某些场景下,比如 DataX 从 kafka 读数据

异源数据同步 → DataX 为什么要支持 kafka?

消费者要不断新建,那么 poll 不到数据的异常情况的占比就会上来了,那就需要通过一些机制来降低其所造成的的影响了,比如说重试机制

总结

  1. 示例代码:kafka-demo
  2. 如果大家平时用 docker 比较多,推荐通过搭建个人镜像仓库来解决镜像拉取超时的问题
  3. kakfa 消费者 poll 的时候,消费者如果不在消费者组中,会先加入消费者组,那么超时时间内可能 poll 不到数据,可以通过增大超时时间,或者重试机制来降低 poll 不到数据的异常次数(Topic 中没有可拉取的数据而 poll 不到的情况不算异常情况)

标签:消费者,props,kafka,Topic,镜像,可拉取,docker,poll,Kafka
From: https://www.cnblogs.com/youzhibing/p/18383403

相关文章

  • SRE 必备知识 - Kafka 探秘之零拷贝技术
    如果你了解过Kafka,那么它用到的一个性能优化技术可能会引起你的注意--操作系统的零拷贝(zero-copy)优化。零拷贝操作可以避免对数据的非必要拷贝,当然,并非是说完全没有拷贝。在Kafka的场景下,操作系统可以从pagecache拷贝数据到socketbuffer,直接绕过Kafkabroker这个......
  • Kafka的基本概念
    目录1.Kafka的介绍1.1介绍1.2Kafka的概念1.3.Kafka实现的日志聚合1.4简单的收发消息1.5其他消费模式1.5.1指定消费进度1.5.2分组消费1.5.3查看消费者组的偏移量1.6基于Zookeeper的Kafka集群1.6.1使用集群的原因1.6.2Kafka集群架构1.6.3Topic下的Partition分布情况......
  • Kafka的生产者和消费者机制
    目录1.基础的客户端1.1消息发送者的主流程1.2消息消费者主流程2.客户端工作机制2.1消费者分组消费机制2.2生产者拦截器机制2.3消息序列化机制2.4消息分区路由机制2.5生产者消息缓存机制2.6发送应答机制2.7生产者消息幂等性(1)生产者消息幂等性介绍(2)解决方案2.8......
  • 图解Kafka | 16张图讲透生产者交付语义
    交付(传递)语义交付语义是在分布式消息系统(如Kafka)中,用来描述消息从生产者到达消息系统并最终被消费者消费时的可靠性保证。它主要涉及到消息是否能正确地被投递,及在什么情况下可能会出现消息丢失或重复的问题。根据Kafkabroker和生产者的配置,支持“最多一次”、“至少一......
  • 图解Kafka | 28张图彻底搞懂消费者
    消费者消费者角色Kafka消费者与生产者一样,是优化Kafka数据处理的重要角色。消费者的主要任务是与Kafka集群建立连接,并基于配置的消费者属性从相应的Kafkabroker读取记录。多应用消费多个应用程序可以从同一个Kafka主题中消费记录,每个应用程序都会获取该数据的......
  • 利用kafka和kafka connect插件debezium实现oracle表同步
    1.kafka安装1.1.java安装openjdk下载,建议使用17,至少应该高于版本11#进入家目录,解压下载的java包,配置环境变量tarvxfopenjdk-20.0.1_linux-x64_bin.tar.gz-C/usr/local/vi.bash_profile#注意要把JAVA的目录放到$PATH之前exportJAVA_HOME=/usr/local/jdk-20exportP......
  • 异源数据同步 → DataX 为什么要支持 kafka?
    开心一刻昨天发了一条朋友圈:酒吧有什么好去的,上个月在酒吧当服务员兼职,一位大姐看上了我,说一个月给我10万,要我陪她去上海,我没同意朋友评论道:你没同意,为什么在上海?我回复到:上个月没同意前情回顾关于DataX,官网有很详细的介绍,鄙人不才,也写过几篇文章异构数据源同步之数据......
  • linux下试验中间件canal的example示例-binlog日志的实时获取显示以及阿里巴巴中间件ca
    一、linux下试验中间件canal的example示例-binlog日志的实时获取显示    今天重装mysql后,进行了canal的再次试验,原来用的mysql5.7,今天重装直接换了5.6算了。反正测试服务器的mysql也不常用。canal启动后日志显示examplepreparetofindstartpositionjustshowmaste......
  • rocketmq 是参考了 kafka架构, 为什么rocketmq吞吐量是10万/秒, kafka吞吐量是17万/秒
    我们都知道,为了防止消息在服务器丢失,一般都是进行持久化(保存在磁盘),在发送消失时那就涉及到从磁盘拷贝到内核空间,从内核空间到用户态,再从用户态到socket缓存区,从socket缓存区到网卡四次拷贝。kafka使用的是零拷贝-sendfile,把内核态数据发送到网卡,减少两次拷......
  • kafka
    消息队列的流派MQ是什么MessageQueue(MQ)是一种消息队列中间件。MQ的主要作用是通过分离消息的发送和接收来实现应用程序的异步和解耦。然而,MQ的核心目的是通信:它屏蔽了底层复杂的通信协议,并定义了一套更简单的应用层通信协议。在分布式系统中,模块间通信通常使用HTTP......