首页 > 其他分享 >KAFKA

KAFKA

时间:2023-09-05 16:11:23浏览次数:30  
标签:处理 分区 Kafka 生产者 消息 new KAFKA

Kafka消息队列的两种模型:

  1. 点对点模式:

    • 在点对点模式中,有一个消息生产者(Producer)将消息发送到特定的消息队列(Queue),然后消息消费者(Consumer)从该队列中获取消息。每个消息只能被一个消费者接收,即使有多个消费者监听同一个队列,每条消息也只会被其中一个消费者消费。这种模式适用于一对一的通信场景。

  2. 发布/订阅模式:

 

Kafka基本概念:

  • Producer(生产者)

  • Consumer(消费者)

  • Consumer Group(消费者组)

  • Broker(代理服务器)

  • Topic(主题)

  • Partition(分区)

  • Replica(副本)

  • Leader 和 Follower(领导者和从属): 副本中的一个负责处理读写请求,其他副本从中同步数据。

  • img

     

生产者角度的幂等性

如果在Kafka中不设置幂等性,可能会在以下情况下出现消息重复消费:

生产者角度的幂等性实现:

  1. 幂等性配置: Kafka生产者可以通过设置enable.idempotence属性来启用幂等性。启用后,生产者会自动重试发送失败的消息,确保消息最终发送成功。

  2. 序列号(Sequence Number): 生产者在发送每条消息时都会分配一个唯一的序列号(Sequence Number)。

  3. 确认消息: 当生产者发送消息成功后,Kafka broker会返回一个确认消息(acknowledgment)。如果生产者重试发送相同消息,Kafka broker会识别重复的序列号并返回相同的确认消息,而不会产生新的消息。

消费者角度的幂等性实现:

  1. 幂等性处理逻辑: 消费者在处理消息时,可以采用幂等的处理逻辑。这意味着相同消息的多次处理不会引发不一致的结果。例如,在更新数据库记录时,确保相同的更新操作不会产生不同的影响。

  2. 消息去重: 消费者可以使用消息的唯一标识符(如消息ID)来判断是否已经处理过相同的消息。如果已经处理过,可以选择忽略重复消息,以实现消息去重。

总之,Kafka的幂等性实现主要集中在生产者和消费者两个角度。生产者通过序列号、幂等性配置和确认消息等机制,确保相同消息的多次发送不会引发重复消息写入。消费者通过幂等性处理逻辑、消息去重和幂等性数据库操作等方法,确保相同消息的多次处理不会产生不一致的结果。这些机制共同确保了Kafka中消息的幂等性,防止重复处理和数据不一致。

 

Kafka 的分区策略在生产者和消费者之间起着重要的作用,它决定了消息在分区之间的分配方式。分区策略可以影响消息的负载均衡、并行处理和数据分布等方面。下面我们将分别讨论生产者和消费者的分区策略:

生产者的分区策略:

生产者在发送消息到 Kafka 主题时,可以通过配置分区策略来确定消息被写入哪个分区。Kafka 提供了几种默认的分区策略,也允许自定义分区策略。

  1. 轮询策略(Round Robin)

  2. 随机策略

  3. 按键分配策略

  4. 自定义分区策略

消费者的分区策略:

消费者在订阅 Kafka 主题时,会根据消费者组(Consumer Group)的配置来确定如何分配分区给不同的消费者。

  1. Range 范围分配策略

  2. Round Robin 轮询分配策略

  3. Sticky 粘性分配策略

 

它的数据流转处理包括数据的生产、传输、存储和消费等环节

流处理是一种数据处理模型,专注于实时数据流的连续处理和分析。与传统的批处理不同,流处理不需要等待一段时间来收集足够的数据,而是实时地处理数据流中的每个事件。这种实时性和即时性使得流处理适用于需要快速响应和实时决策的场景。

以下是流处理的一些关键概念和特点:

  1. 实时性:流处理强调实时性,意味着数据在进入系统后立即被处理,而不是等待一段时间的聚合和处理。这样可以实现低延迟的数据分析和反馈。

  2. 无限数据流:流处理处理的是无限的数据流,而不是有限的数据集。数据不断地到达,系统需要持续地处理和分析。、

总之,流处理通过处理实时数据流,使得实时分析、实时决策和实时反馈成为可能,适用于多种需要快速、敏捷和即时性的应用场景。

 

批处理需要等待一段时间来收集足够的数据量,然后将这些数据一次性地加载到处理系统中,进行大规模的计算和分析。

传统批处理的主要特点包括:

  1. 周期性处理

  2. 离线分析

  3. 高吞吐量

  4. 数据存储和加载

  5. 延迟

传统批处理适用于以下场景:

  • 离线分析

  • 大规模计算

  • 历史数据处理

 

数据可靠性: 只有当 Leader 和 Followers 都确认收到消息时,生产者才会认为消息发送成功。即使 Leader 发生故障,Followers 中的一个可以被提升为新的 Leader,从而保证消息不会丢失。

一致性和 ISR 机制: ISR(In-Sync Replicas)是 Kafka 中一致性和可用性之间的权衡机制。当生产者成功将消息发送给 Leader 后,Leader 需要等待 ISR 中的所有 Followers 都成功复制消息后,才会发送确认给生产者,表示消息已经安全复制。这就是所谓的 ISR 机制。

ISR 机制的作用在于,在发生故障时,Kafka 可以确保消息的一致性。如果一个 Follower 落后于 Leader 太多,Kafka 将会将该 Follower 从 ISR 中移除,不再等待它的确认。这可以防止消息的积压,保证消息传递的可靠性。一旦 Follower 追赶上来,再重新加入 ISR。

总之,Kafka 的数据可靠性通过副本机制和一致性的 ISR 机制来实现。这些机制确保消息不会丢失,并且在发生故障时能够保持数据的一致性。这使得 Kafka 成为处理大规模数据流的可靠解决方案。

public class KafkaConsumer
  {
      public static void Consumer()
      {
          var config = new ConsumerConfig
          {
              GroupId = "test-consumer-group",
              BootstrapServers = "localhost:9092",
              AutoOffsetReset = AutoOffsetReset.Earliest,
          };
          new TaskFactory().StartNew(async () => {
              using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
              {
                  consumer.Subscribe("topic1");
                  while (true)
                  {
                      var consume = consumer.Consume();
                      string receiveMsg = consume.Value;
                  }
              }
          });
      }
  }
static ProducerConfig config = new ProducerConfig()
      {
          BootstrapServers = "172.31.231.72:9092",
          Acks = Acks.Leader
      };
static IProducer<Null, string> producer = new ProducerBuilder<Null, string>(config).Build();
producer.Produce("document", new Message<Null, String>()
{
Value = JsonSerializer.Serialize(kafkamodel)
});
using (var streams = new KafkaStreams(streamConfig, builder.Build()))
          {
              streams.Start();

              // Produce a message to the input topic
              using (var producer = new ProducerBuilder<Null, string>(new ProducerConfig
              {
                  BootstrapServers = "localhost:9092"
              }).Build())
              {
                  var message = new Message<Null, string>
                  {
                      Value = "Hello, Kafka Streams!"
                  };
                  await producer.ProduceAsync("input-topic", message);
              }

              // Wait for a few seconds to allow processing
              await Task.Delay(5000);

              streams.Close();
          }
 

标签:处理,分区,Kafka,生产者,消息,new,KAFKA
From: https://www.cnblogs.com/cooooooooookie/p/17679924.html

相关文章

  • kafka的幂等性
    什么是幂等性:无论发送多少次相同的请求,最终的结果都是一致。 问:那他又是如何保证消息不会被重复发送的?答:Kafka通过ProducerId(生产者标识符)和SequenceNumber(序列号)来保证消息不会被重复发送。以下是Kafka如何实现这一点的工作原理:ProducerId(PID):每个Kafka生产......
  • kafka集群安装(CentOS7 + kafka 2.7.1)
    Linux系统-部署-运维系列导航 kafka介绍Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源......
  • 【Kafka系列】(一)Kafka入门
    有的时候博客内容会有变动,首发博客是最新的,其他博客地址可能会未同步,认准https://blog.zysicyj.top首发博客地址系列文章地址Kafka是什么?一句话概括:ApacheKafka是一款开源的消息引擎系统什么是消息引擎系统?消息引擎系统(MessageBrokerSystem)是一种中间件软件或服务,用......
  • Go语言实现Kafka消费者的示例代码
    Kafka是一种分布式流处理平台,由Facebook于2011年推出,现在已经成为Apache项目的一部分。Kafka提供了高可用性、可扩展性和低延迟的消息传递服务,适用于处理实时和离线数据。Kafka的主要功能包括生产者-消费者通信、批处理和实时数据流处理。Kafka基于发布/订阅模型,允许消息发布者将数......
  • kafka原理与应用
    架构图BrokerKafka集群包含多个服务器,服务器节点称为BrokerBroker存储Topic数据如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个br......
  • 我的 Kafka 旅程 - 基于账号密码的 SASL+PLAIN 认证授权 · 配置 · 创建账号 · 用户
    本文基于Kafka3.0+的KRaft模式来阐述默认的Kafka不受认证约束,可不用账号就可以连接到服务,也就是默认的PLAIN方式,不需要认证;配置了SASL认证之后,连接Kafka只能用凭证连接登录。SASL支持的认证方式有多种:GSSAPI,PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,OAUTHBEARERGSSAPI......
  • 使用Nginx做页面采集, Kafka收集到对应Topic_6XwWe5qWHGM2PojVPUSejM
    使用Nginx做页面采集,Kafka收集到对应Topic_6XwWe5qWHGM2PojVPUSejM使用Nginx做页面采集,Kafka收集到对应Topic0.架构简介模拟线上的实时流,比如用户的操作日志,采集到数据后,进行处理,暂时只考虑数据的采集,使用Html+Jquery+Nginx+Ngx_kafka_module+Kafka来实现,其中Ngx​kafka​m......
  • 使用Nginx做页面采集, Kafka收集到对应Topic_6XwWe5qWHGM2PojVPUSejM
    使用Nginx做页面采集,Kafka收集到对应Topic_6XwWe5qWHGM2PojVPUSejM使用Nginx做页面采集,Kafka收集到对应Topic0.架构简介模拟线上的实时流,比如用户的操作日志,采集到数据后,进行处理,暂时只考虑数据的采集,使用Html+Jquery+Nginx+Ngx_kafka_module+Kafka来实现,其中Ngx​kafka​m......
  • 使用Nginx做页面采集, Kafka收集到对应Topic_6XwWe5qWHGM2PojVPUSejM
    使用Nginx做页面采集,Kafka收集到对应Topic_6XwWe5qWHGM2PojVPUSejM使用Nginx做页面采集,Kafka收集到对应Topic0.架构简介模拟线上的实时流,比如用户的操作日志,采集到数据后,进行处理,暂时只考虑数据的采集,使用Html+Jquery+Nginx+Ngx_kafka_module+Kafka来实现,其中Ngx​kafka​m......
  • kafka在工作中的使用
    @KafkaListener(topics={KafkaInitialConfig.TOPIC_RECHARGE_YTK},groupId=KafkaInitialConfig.GROUP_ID_STORE_BFF,containerFactory="kafkaListenerContainerFactory",autoStartup="${kafka.listener.autoStartup}")publicvoidre......