首页 > 其他分享 >kafka笔记

kafka笔记

时间:2022-11-20 23:11:45浏览次数:40  
标签:ProducerConfig 分区 笔记 kafka put CONFIG properties

基本概念

Broker

每个Broker相当于一个服务器,多个Broker构成了一个kafka集群

Topic

主题做消息分类,一个Broker可以包含多个Topic

Partition

分区,一个Topic包含多个分区,分区有leader和follower的区分,由于follower一般起到备份的作用,所以leader和follower一般不在同一台服务器上

kafka架构

0.9之前,offset消费偏移量存储在zk中,0.9之后存储在本地

三个特征

1、一个分区只能被一个消费值组内的一个消费者消费,如果一个消费者组中有消费者消费了该分区,那么同一消费者组中的其他消费者不可以再消费该分区(基于此,消费者组中的消费者个数不应大于分区数)
2、消费者消费消息是以分区为单位的,一次消费一个分区
3、同一个消费者组内,同一时刻只能有一个消费者消费消息

kafka消息写入

写入消息时,有key、partition、value三个参数,
如果只指定value,消息会轮询写入分区,
如果指定key,则会根据key值哈希写入对应的分区

写入流程

分区副本

配置:default.replication.factor=N
此配置可以指定分区的副本(follower)的数量,producer和consumer只与leader分区进行交互

api

生产者

public class KafkaProducerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka服务端的主机名和端口号
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //等待所有副本节点应答
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        //发送消息重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        //一批消息处理大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //请求延时
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //发送缓存区内存大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        //序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        // public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
        ProducerRecord record =
                new ProducerRecord<String, Object>("testInfoTopic", null, System.currentTimeMillis(), null,
                        "value");
        kafkaProducer.send(record);

        /**
         * 回调发送
         */
        kafkaProducer.send(record, (metadata, exception) -> {
            if (metadata != null) {
                System.err.println(metadata.partition() + " : " + metadata.offset());
            }
        });
        kafkaProducer.close();

    }
}

消费者

public class KafkaConsumerTest {
    public static void main(String[] args) {
        Properties properties = new Properties();
        //kafka服务端的主机名和端口号
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //消费者组
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
        //是否自动确认offset
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        //自动确认offset的时间间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        //序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringDeserializer");

        //定义consumer
        KafkaConsumer<Object, Object> kafkaConsumer = new KafkaConsumer<>(properties);
        //订阅topic
        kafkaConsumer.subscribe(Arrays.asList("testInfoTopic"));

        while (true) {
            //每隔100ms读取一次数据
            ConsumerRecords<Object, Object> poll = kafkaConsumer.poll(100);
            //读取的是一批数据,需要遍历
            for (ConsumerRecord<Object, Object> record : poll) {
                System.err.println(record.offset() + "--" + record.key() + "--" + record.value());
            }
        }
    }
}

标签:ProducerConfig,分区,笔记,kafka,put,CONFIG,properties
From: https://www.cnblogs.com/MorningBell/p/16909999.html

相关文章

  • dockerfile学习笔记
    FROM指定基础镜像MAINTAINER指定维护者的信息,可以没有RUN 你想让它干啥(在命令前面加上RUN即可)ADD 添加宿主机的文件到容器内COPY复制文件WORKDIR 设置当前工......
  • 《卓有成效的管理者》-读书笔记
       近期读完《卓有成效的管理者》,觉得在管理和成效的理解上有了一些进步。这里把在书中学到的内容和自己的思考记录下来,作为消化吸收的方式,也作为后续复习的根据。......
  • 【DL论文精读笔记】Object Detection in 20 Y ears: A Survey目标检测综述
    目标检测20年综述(2019)......
  • 【菜菜的sklearn课堂笔记】支持向量机-线性SVM决策过程的可视化
    视频作者:菜菜TsaiTsai链接:【技术干货】菜菜的机器学习sklearn【全85集】Python进阶_哔哩哔哩_bilibili我们可以使用sklearn中的式子来为可视化我们的决策边界,支持向量,以......
  • Python学习笔记:删除多级索引
    在Python中使用stack/unstack/melt/pivot_talbe等函数进行聚合之后,计算得到的结果具有多层索引。一般情况下可以通过额外指定columns或者通过reset_index()可重置......
  • 我是如何构建自己的笔记系统的?
    我是如何构建自己的笔记系统的?关于笔记系统的重要性互联网上有许多的资料,我这里将不再赘述.下面我将直接介绍我的笔记从记录到整理文章发布的所有详细步骤和工具我的......
  • php笔记
    php笔记参考资料https://zeo.cool/2020/12/31/webshell多种方法免杀/https://h3art3ars.github.io/2020/02/27/利用php新特性过静态查杀/函数记录<?phpphpinfo();<%......
  • docker操作笔记
    docker操作笔记https://www.runoob.com/docker/docker-container-usage.html 菜鸟教程安装使用官方安装脚本自动安装安装命令如下:curl-fsSLhttps://get.docker.co......
  • 用 Kafka + DolphinDB 实时计算K线
    Kafka是一个高吞吐量的分布式消息中间件,可用于海量消息的发布和订阅。当面对大量的数据写入时,以消息中间件接收数据,然后再批量写入到时序数据库中,这样可以将消息中间件的高......
  • Vue笔记 - 样式穿透原理及使用方法
    样式穿透目录样式穿透1.为什么需要样式穿透1.1为什么样式会失效2.如何使用样式穿透2.1实例1.为什么需要样式穿透在开发中引入了第三方组件库(如element-ui),但又想......