Kafka消息队列的两种模型:
-
点对点模式:
-
在点对点模式中,有一个消息生产者(Producer)将消息发送到特定的消息队列(Queue),然后消息消费者(Consumer)从该队列中获取消息。每个消息只能被一个消费者接收,即使有多个消费者监听同一个队列,每条消息也只会被其中一个消费者消费。这种模式适用于一对一的通信场景。
-
-
发布/订阅模式:
Kafka基本概念:
-
Producer(生产者)
-
Consumer(消费者)
-
Consumer Group(消费者组)
-
Broker(代理服务器)
-
Topic(主题)
-
Partition(分区)
-
Replica(副本)
-
Leader 和 Follower(领导者和从属): 副本中的一个负责处理读写请求,其他副本从中同步数据。
-
生产者角度的幂等性
如果在Kafka中不设置幂等性,可能会在以下情况下出现消息重复消费:
生产者角度的幂等性实现:
-
幂等性配置: Kafka生产者可以通过设置
enable.idempotence
属性来启用幂等性。启用后,生产者会自动重试发送失败的消息,确保消息最终发送成功。 -
序列号(Sequence Number): 生产者在发送每条消息时都会分配一个唯一的序列号(Sequence Number)。
-
确认消息: 当生产者发送消息成功后,Kafka broker会返回一个确认消息(acknowledgment)。如果生产者重试发送相同消息,Kafka broker会识别重复的序列号并返回相同的确认消息,而不会产生新的消息。
消费者角度的幂等性实现:
-
幂等性处理逻辑: 消费者在处理消息时,可以采用幂等的处理逻辑。这意味着相同消息的多次处理不会引发不一致的结果。例如,在更新数据库记录时,确保相同的更新操作不会产生不同的影响。
-
消息去重: 消费者可以使用消息的唯一标识符(如消息ID)来判断是否已经处理过相同的消息。如果已经处理过,可以选择忽略重复消息,以实现消息去重。
总之,Kafka的幂等性实现主要集中在生产者和消费者两个角度。生产者通过序列号、幂等性配置和确认消息等机制,确保相同消息的多次发送不会引发重复消息写入。消费者通过幂等性处理逻辑、消息去重和幂等性数据库操作等方法,确保相同消息的多次处理不会产生不一致的结果。这些机制共同确保了Kafka中消息的幂等性,防止重复处理和数据不一致。
Kafka 的分区策略在生产者和消费者之间起着重要的作用,它决定了消息在分区之间的分配方式。分区策略可以影响消息的负载均衡、并行处理和数据分布等方面。下面我们将分别讨论生产者和消费者的分区策略:
生产者的分区策略:
生产者在发送消息到 Kafka 主题时,可以通过配置分区策略来确定消息被写入哪个分区。Kafka 提供了几种默认的分区策略,也允许自定义分区策略。
-
轮询策略(Round Robin)
-
随机策略
-
按键分配策略
-
自定义分区策略
消费者的分区策略:
消费者在订阅 Kafka 主题时,会根据消费者组(Consumer Group)的配置来确定如何分配分区给不同的消费者。
-
Range 范围分配策略
-
Round Robin 轮询分配策略
-
Sticky 粘性分配策略
它的数据流转处理包括数据的生产、传输、存储和消费等环节
流处理是一种数据处理模型,专注于实时数据流的连续处理和分析。与传统的批处理不同,流处理不需要等待一段时间来收集足够的数据,而是实时地处理数据流中的每个事件。这种实时性和即时性使得流处理适用于需要快速响应和实时决策的场景。
以下是流处理的一些关键概念和特点:
-
实时性:流处理强调实时性,意味着数据在进入系统后立即被处理,而不是等待一段时间的聚合和处理。这样可以实现低延迟的数据分析和反馈。
-
无限数据流:流处理处理的是无限的数据流,而不是有限的数据集。数据不断地到达,系统需要持续地处理和分析。、
总之,流处理通过处理实时数据流,使得实时分析、实时决策和实时反馈成为可能,适用于多种需要快速、敏捷和即时性的应用场景。
批处理需要等待一段时间来收集足够的数据量,然后将这些数据一次性地加载到处理系统中,进行大规模的计算和分析。
传统批处理的主要特点包括:
-
周期性处理
-
离线分析
-
高吞吐量
-
数据存储和加载
-
延迟
传统批处理适用于以下场景:
-
离线分析
-
大规模计算
-
历史数据处理
数据可靠性: 只有当 Leader 和 Followers 都确认收到消息时,生产者才会认为消息发送成功。即使 Leader 发生故障,Followers 中的一个可以被提升为新的 Leader,从而保证消息不会丢失。
一致性和 ISR 机制: ISR(In-Sync Replicas)是 Kafka 中一致性和可用性之间的权衡机制。当生产者成功将消息发送给 Leader 后,Leader 需要等待 ISR 中的所有 Followers 都成功复制消息后,才会发送确认给生产者,表示消息已经安全复制。这就是所谓的 ISR 机制。
ISR 机制的作用在于,在发生故障时,Kafka 可以确保消息的一致性。如果一个 Follower 落后于 Leader 太多,Kafka 将会将该 Follower 从 ISR 中移除,不再等待它的确认。这可以防止消息的积压,保证消息传递的可靠性。一旦 Follower 追赶上来,再重新加入 ISR。
总之,Kafka 的数据可靠性通过副本机制和一致性的 ISR 机制来实现。这些机制确保消息不会丢失,并且在发生故障时能够保持数据的一致性。这使得 Kafka 成为处理大规模数据流的可靠解决方案。
public class KafkaConsumer
{
public static void Consumer()
{
var config = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
AutoOffsetReset = AutoOffsetReset.Earliest,
};
new TaskFactory().StartNew(async () => {
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("topic1");
while (true)
{
var consume = consumer.Consume();
string receiveMsg = consume.Value;
}
}
});
}
}
static ProducerConfig config = new ProducerConfig()
{
BootstrapServers = "172.31.231.72:9092",
Acks = Acks.Leader
};
static IProducer<Null, string> producer = new ProducerBuilder<Null, string>(config).Build();
producer.Produce("document", new Message<Null, String>()
{
Value = JsonSerializer.Serialize(kafkamodel)
});
using (var streams = new KafkaStreams(streamConfig, builder.Build()))标签:处理,分区,Kafka,生产者,消息,new,KAFKA From: https://www.cnblogs.com/cooooooooookie/p/17679924.html
{
streams.Start();
// Produce a message to the input topic
using (var producer = new ProducerBuilder<Null, string>(new ProducerConfig
{
BootstrapServers = "localhost:9092"
}).Build())
{
var message = new Message<Null, string>
{
Value = "Hello, Kafka Streams!"
};
await producer.ProduceAsync("input-topic", message);
}
// Wait for a few seconds to allow processing
await Task.Delay(5000);
streams.Close();
}