首页 > 其他分享 >【大数据学习 | kafka】简述kafka的消费者consumer

【大数据学习 | kafka】简述kafka的消费者consumer

时间:2024-11-05 18:44:55浏览次数:3  
标签:消费者 pro kafka topic 简述 import consumer

1. 消费者的结构

能够在kafka中拉取数据进行消费的组件或者程序都叫做消费者。

这里面要涉及到一个动作叫做拉取。

首先我们要知道kafka这个消息队列主要的功能就是起到缓冲的作用,比如flume采集数据然后交给spark或者flink进行计算分析,但是flume采用的就是消息的push方式,这个方式不能够保证推送的数据消费者端一定会消费完毕,会出现数据的反压问题,这个问题很难解决,所以才出现了消息队列kafka,它可以起到一个缓冲的作用,生产者部分将数据直接全部推送到kafka,然后消费者从其中拉取数据,这边如果也采用推送的方式,那么也就在计算端会出现反压问题,所以kafka的消费者一般都是采用拉的方式pull,并不是push

1.1 消费者组

在一个topic中存在多个分区,可以分摊压力实现负载均衡,那么整体topic中的数据会很多,如果消费者只有一个的话很难全部消费其中的数据压力也会集中在一个消费者中,并且在大数据行业中几乎所有的计算架构都是分布式的集群模式,那么这个集群模式中,计算的节点也会存在多个,这些节点都是可以从kafka中拉取数据的,所有消费者不可能只有一个,一般情况下都会有多个消费者。

正因为topic存在多个分区,每个分区中的数据是独立的,那么消费者最好也是一个一个和分区进行一一对应的,所以有几个分区应该对应存在几个消费者是最好的。

这个和分蛋糕是一样的,一个蛋糕分成几块,那么有几个人吃,应该是对应关系的

消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。

消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

2. 消费者实现

在实现消费者的时候我们需要知道几个消费者的配置重要参数

参数解释
bootstrap.servers集群地址
key.deserializerkey反序列化器
value.deserializervalue反序列化器
group.id消费者组id

首先创建消费者对象

消费者对象订阅相应的topic然后拉取其中的数据进行消费

整体代码如下

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

public class Consumer1 {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");
        //设定组id
        pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //设定key的反序列化器
        pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //设定value的反序列化器
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
        List<String> topics = Arrays.asList("topic_a","topic_b");
        //一个消费者可以消费多个分区的数据
        consumer.subscribe(topics);
        //订阅这个topic
        while (true){
            //死循环要一直消费数据
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            //间隔一秒钟消费一次数据,拉取一批数据过来
            Iterator<ConsumerRecord<String, String>> it = records.iterator();
            while(it.hasNext()){
                ConsumerRecord<String, String> record = it.next();
                System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
            }
        }
    }
}
[hexuan@hadoop106 datas]$ kafka-console-producer.sh --bootstrap-server hadoop106:9092 --topic topic_b

>>1
>2
>3
>4
>5
>

3. 消费者与分区之间的对应关系

一个消费者组中的消费者和分区是一一对应的关系,一个分区应该对应一个消费者,但是如果消费者多了,那么有的消费者就没有分区消费,如果消费者少了那么会出现一个消费者消费多个分区的情况。

# 首先创建topic_c 用于测试分区和消费者的对应关系
kafka-topics.sh --bootstrap-server hadoop106:9092 --create --topic topic_c --partitions 3 --replication-factor 2
# 启动两个消费者 刚才我们写的消费者main方法运行两次
# 然后分别在不同的分区使用生产者发送数据,看数据在消费者中的打印情况

首先选择任务可以并行执行

选择任务修改配置

我们可以看到允许多实例并行执行

启动两次,这个时候我们就有了两个消费者实例

生产者线程:分别向三个分区中发送1 2 3元素

package com.hainiu.kafka.consumer;

/**
 * ClassName : test3_producer
 * Package : com.hainiu.kafka.consumer
 * Description
 *
 * @Author HeXua
 * @Create 2024/11/3 23:40
 * Version 1.0
 */

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class test3_producer {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop106:9092");
        pro.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        pro.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(pro);
        ProducerRecord<String, String> record1 = new ProducerRecord<>("topic_d", 0,null,"1");
        ProducerRecord<String, String> record2 = new ProducerRecord<>("topic_d", 1,null,"2");
        ProducerRecord<String, String> record3 = new ProducerRecord<>("topic_d", 2,null,"3");
        producer.send(record1);
        producer.send(record2);
//        producer.send(record3);
        producer.close();
    }
}

可以看到有的消费者消费了两个分区的数据

如果启动三个消费者会发现每个人消费一个分区的数据

如果启动四个消费者

我们发现有一个消费者没有数据

3. 1 消费多topic的数据

不同组消费不同的topic或者一个组可以消费多个topic都是可以的

3.2 多个组消费一个topic

同一个topic可以由多个消费者组进行消费数据,并且相互之间是没有任何影响的

修改同一份代码的组标识不同。启动两个实例查看里面的消费信息

   pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group1");
   pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group2");
   //分别修改消费者组的id不同
package com.hainiu.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;

public class Consumer1 {
    public static void main(String[] args) {
        Properties pro = new Properties();
        pro.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"nn1:9092");
        pro.put(ConsumerConfig.GROUP_ID_CONFIG,"hainiu_group");
        pro.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        pro.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(pro);
        List<String> topics = Arrays.asList("topic_c");
        //订阅多个topic的数据变化
        consumer.subscribe(topics);

        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            Iterator<ConsumerRecord<String, String>> it = records.iterator();
            while(it.hasNext()){
                ConsumerRecord<String, String> record = it.next();
                System.out.println(record.topic()+"->"+record.partition()+"->"+ record.offset()+"->"+record.key()+"->"+record.value());
            }
        }
    }
}

标签:消费者,pro,kafka,topic,简述,import,consumer
From: https://blog.csdn.net/2301_80912559/article/details/143442808

相关文章

  • Linux中Kafka单机部署
    一、安装JDK请看:Linux中安装JDK1.8二、安装kafka下载地址 https://kafka.apache.org/downloads1、上传解压到/usr/local/kafkatar-zxvfkafka_2.13-2.6.3.tgzmvkafka_2.13-2.6.3kafka2、创建数据与日志目录zk数据目录(如依据配置中ip和server.[1|2|3]中的数字对应......
  • CentOS部署Kafka中间件
    CentOS部署Kafka中间件 1.环境及版本说明:系统版本:CentOSLinuxrelease7.6.1810(Core)Kafka版本:kafka_2.12-2.2.0JDK版本:1.8.0_2122.安装下载#wget下载安装包wgethttp://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.2.0/kafka_2.12-2.2.0.tgz#无法......
  • Kafka 消息丢失如何处理?
    今天给大家分享一个在面试中经常遇到的问题:Kafka消息丢失该如何处理?这个问题啊,看似简单,其实里面藏着很多“套路”。来,咱们先讲一个面试的“真实”案例。面试官问:“Kafka消息丢失如何处理?”小明一听,反问:“你是怎么发现消息丢失了?”面试官顿时一愣,沉默了片刻后,可能......
  • 从安装到实战:Spring Boot与kafka终极整合指南
    docker环境下部署kafka前置条件ApacheKafka自2.8.0版本开始引入了不依赖Zookeeper的“KafkaRaftMetadataMode”,本文章依然使用Zookeeper作为集群管理的插件。#拉去zookeeper镜像dockerpullwurstmeister/zookeeper#运行zookeeper容器dockerrun-d--na......
  • Kafka笔记系列-概念相关
    消息队列的主要功能连接服务、消息路由、消息传递、数据持久化、日志记录消息队列基本分类1、点对点生产者发送消息到队列中,消费者从队列中取出并消费。消息在消费以后,队列中不再有存储,队列可以有多个消费者,但是一个消息只能被一个消费者消费2、发布订阅模式生产者发布消息......
  • 如何选择最适合的消息队列?详解 Kafka、RocketMQ、RabbitMQ 的使用场景
    引言在日常开发中,消息队列已经成为业务场景中几乎不可或缺的一部分。无论是订单系统、日志收集、分布式事务,还是大数据实时流处理,消息队列都在支撑着这些关键环节。目前市面上常用的消息队列有三种(ActiveMQ虽然在企业集成中仍有应用,但由于性能和扩展性在高并发、大数据量......
  • Python向kafka发消息
    后端研发可以提供一个向kafka发消息的接口,用requests向接口post消息就行:importrequestsimportjsonimporttimenow=int(time.time())n=10whilen>0:tt=now-n*60data={"queue":"alarm-dog-alarm-dog-test","payload&......
  • Kafka 之消息广播消费
    前言:上一篇我们分享了Kafka批量消息相关的知识,本篇我们继续分享Kafka的广播消费。Kafka系列文章传送门Kafka简介及核心概念讲解SpringBoot整合Kafka详解Kafka@KafkaListener注解的详解及使用Kafka客户端工具使用分享【offsetexplorer】Kafka之消息同步......
  • Kafka安装
    说明:kafka是一款消息中间件,本文介绍如何安装启动kafka下载首先,去官网下载(https://kafka.apache.org/downloads)配置下载好了,解压,放到一个没有中文、没有空格的路径下,修改下面几个配置修改1:\kafka_2.12-3.8.1\config\server.properties第62行:修改成相对路径修改2:\k......
  • Java面向对象简述
    Java是一门面向对象的编程语言,那么我们要先了解什么是面向对象。编程语言分为:面向机器语言(例如汇编语言),面向过程语言(例如c语言),以及面向对象语言。而面向对象编程主要体现在:封装性,继承性和多态性。封装性:将数据和对数据的操作封装在一起,通过抽象,形成一般概念(类)。举例而言,公......