基本概念
Broker
每个Broker相当于一个服务器,多个Broker构成了一个kafka集群
Topic
主题做消息分类,一个Broker可以包含多个Topic
Partition
分区,一个Topic包含多个分区,分区有leader和follower的区分,由于follower一般起到备份的作用,所以leader和follower一般不在同一台服务器上
kafka架构
0.9之前,offset消费偏移量存储在zk中,0.9之后存储在本地
三个特征
1、一个分区只能被一个消费值组内的一个消费者消费,如果一个消费者组中有消费者消费了该分区,那么同一消费者组中的其他消费者不可以再消费该分区(基于此,消费者组中的消费者个数不应大于分区数)
2、消费者消费消息是以分区为单位的,一次消费一个分区
3、同一个消费者组内,同一时刻只能有一个消费者消费消息
kafka消息写入
写入消息时,有key、partition、value三个参数,
如果只指定value,消息会轮询写入分区,
如果指定key,则会根据key值哈希写入对应的分区
写入流程
分区副本
配置:default.replication.factor=N
此配置可以指定分区的副本(follower)的数量,producer和consumer只与leader分区进行交互
api
生产者
public class KafkaProducerTest {
public static void main(String[] args) {
Properties properties = new Properties();
//kafka服务端的主机名和端口号
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//等待所有副本节点应答
properties.put(ProducerConfig.ACKS_CONFIG, "all");
//发送消息重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, "0");
//一批消息处理大小
properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
//请求延时
properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);
//发送缓存区内存大小
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
//序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer kafkaProducer = new KafkaProducer(properties);
// public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers)
ProducerRecord record =
new ProducerRecord<String, Object>("testInfoTopic", null, System.currentTimeMillis(), null,
"value");
kafkaProducer.send(record);
/**
* 回调发送
*/
kafkaProducer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.err.println(metadata.partition() + " : " + metadata.offset());
}
});
kafkaProducer.close();
}
}
消费者
public class KafkaConsumerTest {
public static void main(String[] args) {
Properties properties = new Properties();
//kafka服务端的主机名和端口号
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//消费者组
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
//是否自动确认offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
//自动确认offset的时间间隔
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
//序列化器
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringDeserializer");
//定义consumer
KafkaConsumer<Object, Object> kafkaConsumer = new KafkaConsumer<>(properties);
//订阅topic
kafkaConsumer.subscribe(Arrays.asList("testInfoTopic"));
while (true) {
//每隔100ms读取一次数据
ConsumerRecords<Object, Object> poll = kafkaConsumer.poll(100);
//读取的是一批数据,需要遍历
for (ConsumerRecord<Object, Object> record : poll) {
System.err.println(record.offset() + "--" + record.key() + "--" + record.value());
}
}
}
}
标签:ProducerConfig,分区,笔记,kafka,put,CONFIG,properties
From: https://www.cnblogs.com/MorningBell/p/16909999.html