首页 > 其他分享 >kafka备忘录

kafka备忘录

时间:2024-05-30 16:10:39浏览次数:18  
标签:消费者 分区 Kafka 备忘录 实例 import kafka

生产者:

package org.example;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

public class KafkaP {
    public static void main(String[] args) {
        // 创建一个hashmap作为生产者的配置对象
        Map<String,Object> configMap = new HashMap<>();
        configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 创建一个kafka生产者,泛型是对k,v定义和限制
        KafkaProducer<String, String> producer = new KafkaProducer<>(configMap);

        // 把数据发送到kafka
        for (int i = 0; i < 10; i++){
            // 创建一个消息对象
            ProducerRecord<String, String> record = new ProducerRecord<>("test", "key"+i,"hello kafka"+i);
            producer.send(record);
        }

        // 关闭生产者
        producer.close();
    }
}

消费者:

package org.example;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.HashMap;
import java.util.Map;

public class KafkaC {
    public static void main(String[] args) {
        Map<String,Object> configMap = new HashMap<>();
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configMap.put(ConsumerConfig.GROUP_ID_CONFIG,"test_group");
        // 创建kafka消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configMap);
        // 订阅主题
        consumer.subscribe(java.util.Arrays.asList("test"));
        // 消费数据
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }

        // 关闭消费者
        // consumer.close();
    }
}

 

先启动消费者等待着

再启动生产者即可得到测试数据

如果是windows可以使用kafkatool可以便利操作

 

核心概念补充:

Kafka的topic分区是一个核心概念,它允许Kafka将数据分散到多个broker上,以实现更高的吞吐量和容错性。以下是关于Kafka topic分区的详细解释:

Kafka Topic分区
定义:
Kafka的topic是一个记录流,可以看作是一个消息队列。而每个topic都可以被划分为一个或多个分区(partition)。
作用:
分区的主要目的是提高Kafka的吞吐量和容错性。通过将topic划分为多个分区,Kafka可以实现数据的并行处理,因为不同的分区可以部署在不同的broker上,并由这些broker独立处理。
同时,分区也是Kafka实现数据冗余和容错的关键。通过将同一个topic的多个分区复制到不同的broker上,Kafka可以确保即使某个broker出现故障,数据也不会丢失,因为其他broker上的副本仍然可用。
分区与数据:
不同分区存储不同数据:在Kafka中,每个分区都是一个有序的消息序列。不同的分区之间不会存储相同的数据。这意味着,当生产者向topic发送消息时,这些消息会被分配到该topic的一个或多个分区中,但同一消息不会出现在多个分区中。
分区与消费者:消费者组(consumer group)中的消费者可以并行地从不同的分区中读取数据。但是,对于同一个消费者组内的消费者,Kafka会确保每个分区只被一个消费者读取,以避免数据重复消费。不过,不同的消费者组可以独立地读取同一个分区的数据。
分区数与性能:
分区数过多或过少都可能影响Kafka的性能。过多的分区会导致过多的文件句柄打开,增加系统的I/O压力;而过少的分区则可能导致单个broker的负载过高,无法充分利用集群的资源。
因此,在选择分区数时需要根据实际场景进行权衡。一般来说,可以根据topic的数据量、消费者的并发量以及集群的规模来确定合适的分区数。
总结
Kafka的topic分区是Kafka实现高性能、高吞吐量和容错性的关键。通过将topic划分为多个分区,Kafka可以实现数据的并行处理和冗余存储,从而满足各种实际场景的需求。同时,不同的分区之间存储的是不同的数据,确保了数据的一致性和正确性。

 

 

Kafka中的消费者组(Consumer Group)是一个非常重要的概念,它允许多个消费者实例共同消费一个或多个Kafka topic的数据,同时保证每条消息只被消费一次。以下是关于消费者组的详细解释:

消费者组(Consumer Group)
定义:
消费者组是一个或多个消费者的集合,它们共享一个共同的消费逻辑(通常是一个应用程序)。消费者组内的消费者实例可以并行地消费数据,但每个分区内的数据只能被组内的一个消费者实例消费。
作用:
水平扩展:通过增加消费者组内的消费者实例数量,可以水平扩展应用程序的吞吐量,从而处理更多的数据。
容错性:如果消费者组内的某个消费者实例失败,其他消费者实例可以继续消费数据,确保数据不会丢失。
数据隔离:不同的消费者组可以独立地消费同一个topic的数据,实现数据的隔离和独立处理。
分区与消费者组:
Kafka保证每个分区只能被消费者组内的一个消费者实例消费。这意味着,如果消费者组内的消费者实例数量少于分区的数量,那么一些消费者实例将消费多个分区的数据;如果消费者实例数量多于分区的数量,那么一些消费者实例将处于空闲状态。
Kafka通过消费者组的协调器(coordinator)来管理消费者与分区之间的映射关系。当消费者加入或离开消费者组时,协调器会重新分配分区给消费者实例,以确保每个分区都有且仅有一个消费者实例在消费。
偏移量(Offset):
Kafka使用偏移量来跟踪消费者已经消费的数据位置。每个消费者实例都会维护一个偏移量,表示它已经消费到了分区中的哪个位置。当消费者实例消费数据时,它会更新自己的偏移量,并将新的偏移量提交给Kafka服务器。这样,即使消费者实例失败或重启,它也可以从上次提交的偏移量位置继续消费数据。
消费者组ID:
每个消费者组都有一个唯一的ID,用于标识该消费者组。在创建消费者实例时,需要指定消费者组ID,以便Kafka服务器将其加入到相应的消费者组中。
消费者组配置:
Kafka提供了许多与消费者组相关的配置选项,如session.timeout.ms(消费者与协调器之间的会话超时时间)、max.poll.interval.ms(消费者两次轮询之间的最大时间间隔)等。这些配置选项可以根据实际场景进行调整,以优化消费者组的性能和容错性。
总结
消费者组是Kafka实现水平扩展、容错性和数据隔离的关键。通过合理地配置和管理消费者组,可以确保Kafka系统的高效运行和数据的可靠处理。

 

生产者(Producer):负责将数据发送到Kafka服务中。
消费者(Consumer):从Kafka中提取数据进行消费。
Broker(集群实例):Kafka支持集群分布式部署,每个服务器上的Kafka服务即为一个Broker。
Topic(主题):Kafka中的关键词,单个Topic代表一种类型的消息。
Partition(分区):Topic下的更小元素,每个Topic可以有多个Partition,每个Partition在磁盘上对应一个文件。
Offset(偏移量):用于标记消息在Partition中的位置。

 

标签:消费者,分区,Kafka,备忘录,实例,import,kafka
From: https://www.cnblogs.com/jiangbei/p/18222578

相关文章

  • kafka 偏移量坑点
    auto.offset.reset值含义解释earliest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费latest当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据nonetopic各分区都存在已提交的offset时,从offset后......
  • Kafka的结构
    Kafka的结构与工作原理Kafka是一种分布式流处理平台,广泛应用于实时数据处理和数据管道。它的核心组件包括Producer、Topic、Partition、Broker、Consumer和ConsumerGroup。以下是Kafka从生产到消费端的工作流程及其关键概念的解释。1.生产者(Producer)功能:生产者负责将数据发送......
  • kafka单机安装及性能测试
    kafka单机安装及性能测试ApacheKafka是一个分布式流处理平台,最初由LinkedIn开发,并于2011年开源,随后成为Apache项目。Kafka的核心概念包括发布-订阅消息系统、持久化日志和流处理平台。它主要用于构建实时数据管道和流处理应用,广泛应用于日志聚合、数据传输、实时监控和分......
  • golang kafka例子
    packagemain//生产者代码import( "fmt" "github.com/IBM/sarama" "time")//基于sarama第三方库开发的kafkaclientvarbrokers=[]string{"127.0.0.1:9092"}vartopic="hello_kafka0"//同步消息模式funcsyncProducer(conf......
  • MQTT和kafka搭配使用 集成 emq iot 物联网
    MQTT历史MQTT协议于1999年发明,用于石油和天然气行业。工程师需要一种协议来实现最小带宽和最小电池损耗,以通过卫星监控石油管道。最初,该协议被称为MessageQueuingTelemetryTransport(消息队列遥测传输),得名于首先支持其初始阶段的IBM产品MQ系列。2010年,IBM发布了......
  • 开发 备忘录 - 杂
    对于用blob格式存储在数据库中的数据,在各层怎样传输?dto-JSONArrayservice-JSONArray.toString().getBytes()dao-byte[]MySQL-blob/longblobdao-byte[]service-JSONUtill.parseArray(newString(bytes))vo-JSONArraylinux命令-服务top查看进程状态......
  • RabbitMQ、RocketMQ、Kafka对比(消息组件的作用)
    在高并发的系统中,消息组件是最为常见的一款应用对比RacketMQ要比RabbitMQ性能高,但是不合适进行日志数据的采集(大数据采集最好别用)利用消息组件可以有效地实现数据缓冲的处理操作,例如:现在有一个抢购系系统,是需要考虑高并发状态下的用户请求正常处理问题服务器一旦接收远......
  • kafka多线程顺序消费
    一、单线程顺序消费为了避免有的小伙伴第一次接触顺序消费的概念,我还是先介绍一下顺序消费是个什么东西。双十一,大量的用户抢在0点下订单。为了用户的友好体验,我们把订单生成逻辑与支付逻辑包装成一个个的MQ消息发送到Kafka中,让kafka积压部分消息,防止瞬间的流量压垮服务。那么......
  • kafka解决重复消费问题
    Kafka避免消息重复消费通常依赖于以下策略和机制:  总结就是通过消费者组+手动提交偏移量+处理消息的幂等性(数据库redis分布式锁等)1.ConsumerGroupIDKafka使用ConsumerGroupID来跟踪每个消费者所读取的消息。确保每个消费者都具有唯一的GroupID。如果多个消费者属......
  • kafka 保证消息有序性
    具体需要从生产者和消费者两个方面来讲:生产者:1.分区机制:Kafka的核心机制之一是分区(Partition)。每个主题(Topic)可以被分割成多个分区,而消息在发布时会被追加到特定的分区中。在每个分区内部,消息是按照它们被追加的顺序来存储的,因此保证了分区内的消息顺序性。 2.分区器:生......