首页 > 其他分享 >Kafka

Kafka

时间:2023-07-25 10:22:06浏览次数:25  
标签:springframework kafka topic import org message Kafka

目录

Kafka组成

基础组成

  • 生产者会将信息推送给topic并由topic决定要将该消息发送给哪一个分区
    • 轮询
    • key值分区: 生产者根据key的值来决定将信息发送给哪个分区
  • 消费者组
    • 组中的消费者不会接收到全量信息 但是全量信息在消费组中
  • offset
    • kafka的存储文件都是按照offset.kafka来命名,用offset做名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.kafka的文件即可。当然the first offset就是00000000000.kafka

Kafka的配置

配置kafka
  kafka:
    bootstrap-servers: 139.224.49.36:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。      batch-size: 16384      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: 1
    consumer:
      group-id: test
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: false
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 5
      #listner负责ack,每调用一次,就立即commit
      ack-mode: manual_immediate
      missing-topics-fatal: false

简单的Kafka生产和消费者

生产者

  package com.han.springbatch.common;

import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String topic,Object message) {

        String json = JSONObject.toJSONString(message);
        // 发送消息
        kafkaTemplate.send(topic, json);
    }


}

消费者

同一个组中的消费者组

package com.han.springbatch.common;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
@Slf4j
public class KafkaConsumer {
    @KafkaListener(topics = "springbatch_topic_1", id = "topic.group1")
    public void topicTest(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("客户端 A 消费了: Topic[{}] Message[{}]", topic, msg);
            ack.acknowledge();
        }
    }

    @KafkaListener(topics = "springbatch_topic_1", id = "topic.group2")
    public void topicTest1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        Optional message = Optional.ofNullable(record.value());
        if (message.isPresent()) {
            Object msg = message.get();
            log.info("客户端 B 消费了: Topic[{}] Message[{}]", topic, msg);
            ack.acknowledge();
        }
    }

}

标签:springframework,kafka,topic,import,org,message,Kafka
From: https://www.cnblogs.com/AIxuexiH/p/17579083.html

相关文章

  • 【项目实战】Kafka 的 Leader 选举和负载均衡
    ......
  • 【项目实战】Kafka 重平衡 Consumer Group Rebalance 机制
    ......
  • Kafka核心API -- Connect
    Connect基本概念KafkaConnect是Kafka流式计算的一部分KafkaConnect主要用来与其他中间件建立流式通道KafkaConnect支持流式和批量处理集成 环境准备创建两个表createtableusers_bak(`uuid`intprimarykeyauto_increment,`name`VARCHAR(20),`ag......
  • Kafka客户端操作
    五类API Kafka客户端API类型AdminClientAPI:允许管理和检测Topic、broker以及其它Kafka对象(类似于命令行的createtopic)ProducerAPI:发送消息到1个或多个TopicConsumerAPI:订阅一个或多个Topic,并处理产生的消息StreamsAPI:高效地将输入流转换到输出流ConnectorAPI:从一......
  • debezium同步postgresql数据至kafka
    0实验环境全部部署于本地虚拟机debeziumdocker部署postgresql、kafka本机部署1postgresql1.1配置设置postgres密码为123仿照example,创建databasepostgres,schemeinventory,tablecustomers因为postgres用户有replication权限,所以可以直接使用修改postgresql.conf文......
  • kafka基础操作
    什么是kafkakafka本身并不是消息队列,而是一份分布式流平台(高并发,低延迟。高吞吐量)。kafka是基于zookeeper的分布式消息系统。kafka具有高吞吐率、高性能、实时及高可靠等特点。kafka基本概念Topic:一个虚拟的概念,由一个到多个Partitions组成Partition:实际消息存储单位P......
  • Kafka - kafka为啥这么快?(基于磁盘存储的,为何还能拥有高性能)
    总结1.顺序读写磁盘读写有两种方式:顺序读写或者随机读写。Kafka是磁盘顺序读写,利用了一种分段式的、只追加(Append-Only)的日志,基本上把自身的读写操作限制为顺序I/O,磁盘的顺序读写速度和内存持平(见图1.1)。kafkatopic的每一个Partition其实都是一个文件,收到消息后Kaf......
  • MQTT 与 Kafka|物联网消息与流数据集成实践
    MQTT如何与Kafka一起使用?MQTT(MessageQueuingTelemetryTransport)是一种轻量级的消息传输协议,专为受限网络环境下的设备通信而设计。ApacheKafka是一个分布式流处理平台,旨在处理大规模的实时数据流。Kafka和MQTT是实现物联网数据端到端集成的互补技术。通过结合使用......
  • 【项目实战】Kafka 生产者幂等性和事务
    ......
  • 【项目实战】Kafka 生产者写入分区的策略
    ......