文章目录
- 1.JMS+AMQP核心知识
- 1.1.什么是MQ中间件
- 1.2.使用场景
- 1.3.JMS消息服务和和常见核心概念
- 2.分布式流处平台Kafka核心概念
- 2.1.Kafka核心概念
- 2.2.特点总结
- 2.3.基于消费者组可以实现
- 3.Linux环境下Zookeeper和Kafka安装
- 3.1.安装启动Zookeeper
- 3.2.安装启动Kafka
- 4.生产者发送消息和消费者消费消息
- 4.1.创建topic
- 4.2.查看topic
- 4.3.生产者发送消息
- 4.4.消费者消费消息
- 4.5.删除topic
- 4.6.查看broker节点topic状态信息
- 5.Kafka点对点模型和发布订阅模型
- 5.1.JMS规范支持两种消息模型
- 5.2.消费者点对点消费模型
- 5.3.消费者发布订阅消息模型
- 6.Kafka数据存储流程和原理概述
- 6.1.Partition
- 6.2.LEO(LogEndOffset)
- 6.3.HW(HighWatermark)
- 6.4.offset
- 6.5.Segment
- 6.6.Kafka高效文件存储设计特点
- 7.SpringBoot2.x项目整合Kafka
- 7.1.引入kafka-clients依赖
- 7.2.配置客户端
- 7.3.创建topic
- 7.4.删除topic
- 7.5.列举topic-list
- 7.6.增加分区数量
- 7.7.查看topic详情
- 8.producer发送到Broker分区策略
- 8.1.生产者发送消息到broker的策略
- 8.2.生产者到broker发送流程
- 8.3.生产者常见配置
- 9.producer API讲解
- 9.1.封装配置属性
- 9.2.生产者投递消息同步发送
- 9.3.异步发送配置回调函数
- 9.4.producer发送消息到指定分区
- 10.ProducerRecord介绍
- 10.1.ProducerRecord(简称PR)
- 11.生产者自定义partition分区规则
- 11.1.源码解读默认分区器
- 11.2.自定义Partitioner类实现Partitioner接口
- 11.3.配置自定义的Partitioner生效
- 12.Consumer消费者机制和分区策略
- 12.1.消费者拉取数据机制
- 12.2.消费者从那个分区进行消费
- 12.3.消费者消费的分区策略
- 12.4.什么是Rebalance操作
- 12.5.Rebalance触发的机制
- 12.6.容灾消费机制
- 13.Consumer配置和Kafka调试日志配置
- 13.1.配置日志级别
- 13.2.消费者配置
- 13.3.Kafka消费者Consumer消息配置实战
- 13.4.Consumer手工提交offset配置和从头消费配置
- 14.kafka数据存储流程
- 14.1.kafka数据存储流程
- 14.2.自定义的索引分片
- 14.3.文件分段的规则
- 15.分布式系统CAP定理
- 16.Kafka数据可靠性投递
- 16.1.生产者发送消息确认机制
- 16.2.副本数据同步机制
- 16.3.副本数据同步策略
- 17.Kafka数据可靠性保障原理之ISR机制
- 18.Kafka的HighWatermark的作用
- 19.kafka高可用和高性能
- 19.1.Kafka-zookeeper集群搭建
- 19.2.Kafka中的日志数据清理
- 19.3.Kafka的高性能原理分析-ZeroCopy
- 20.SpringBoot项目整合Spring-kafka
- 20.1.Springboot项目整合spring-kafka依赖发送消息
1.JMS+AMQP核心知识
1.1.什么是MQ中间件
- 全称MessageQueue,主要是用于程序和程序之间的通信,异步+解耦
1.2.使用场景
- 核心应用:
- 解耦:订单系统->物流系统
- 异步:用户注册->发送右键,初始化信息
- 削峰:秒杀、日志处理
- 跨平台、多语言
- 分布式事务、最终一致性
- RPC调用上下游对接,数据源变动->通知下属
1.3.JMS消息服务和和常见核心概念
(1)什么是JMS
- java消息服务(java message service) ,Java平台中关于面向消息中间件的接口
(2)特性
- 面向java平台的标准消息传递API
- 在Java或JVM语言比如Scala、Groovy中具有互用性
- 无需担心底层协议
- 有queues和topics两种消息传递模型
- 支持事务、能够定义消息格式(消息头、属性和内容)
(3)常见概念
- JMS提供者:连接面向消息中间件的,JMS接口的一个实现,RocketMQ,ActiveMQ,Kafka等等
- JMS生产者:生产消息的服务
- JMS消费者:消费消息的服务
2.分布式流处平台Kafka核心概念
2.1.Kafka核心概念
Broker
kafka的服务端程序,可以认为一个mq节点就是一个broker,broker存储topic的数据
Producer生产者
创建消息Message,然后发布到MQ中,该角色将消息发布到Kafka的topic中
Consumer消费者
消费队列里的消息
ConsumerGroup消费者组
同个topic,广播发送给不同的group,一个group中只能又一个consumer可以消费此消息
Topic主题
每条发布到kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思
Partition分区
kafka数据存储的基本单元,topic中的数据分割称一个或者多个partition,每个topic至少又一个partition,有序的
一个Topic的多个partitions,被分布在kafka集群中的多个server上
消费者数量<=partition数量
Replication副本
同个partition会有多个副本replication,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务
默认每个topic的副本都是1(默认没有副本的)。可以在创建topic的时候指定
如果当前kafka集群只有3个broker节点,则replication-factor的最大数就是3.
ReplicationLeader
Partition有多个副本,但只有一个replicationleader负载该partition和生产者消费者交互
ReplicationFollower
ReplicationFollower只做备份,从ReplicationLeader进行同步
ReplicationManager
负责Broker所有分区副本信息,Replication副本状态切换
offset
每个consumer示例需要为他消费的partition维护一个记录自己消费道哪里的便宜offset
kafka把offset保存在消费端的消费者组里
2.2.特点总结
- 多订阅者
- 一个topic可以有一个或者多个订阅者
- 每个订阅者都要有一个partition,所以订阅者数量要少于等于partition
- 高吞吐量、低延迟:每秒可以处理几十万条消息
- 高并发:几千个客户端同时读写
- 容错性:多副本、多分区允许集群中节点失败。
- 扩展性强:支持热扩展
2.3.基于消费者组可以实现
- 基于队列的模型:所有消费者都在同一消费者组里,每条消息只会被一个消费者处理
- 基于发布订阅模型:消费者属于不同的消费者组,假如每个消费者都有自己的消费者组,这样kafka消息就能广播到所有消费者实例上
3.Linux环境下Zookeeper和Kafka安装
注意:提前安装好jdk1.8
#编辑群居配置文件
vim /etc/profile
#在最下面,按i进入insert模式,添加一下内容
JAVA_HOME=jdk路径
export JAVA_HOME
CLASSPATH=.:$JAVA_HOME/lib
export CLASSPATH
PATH=$PATH:$JAVA_HOME/bin:$CLASSPATH
export PATH
#重新加载配置
source /etc/profile
3.1.安装启动Zookeeper
(1)解压zookeeper,重命名
tar -xvf apache-zookeeper-3.7.0-bin.tar.gz
mv apache-zookeeper-3.7.0-bin zookeeper
(2)复制默认的配置文件zoo.cfg
(3)启动zookeeper,默认2181端口
/usr/local/software/zookeeper/bin/zkServer.sh start
3.2.安装启动Kafka
(1)解压Kafka,重命名
tar -xvf kafka_2.13-2.8.0.tgz
mv kafka_2.13-2.8.0 kafka
(2)修改配置文件config目录下的server.properties
#标识broker编号,集群中有多个broker,则每个broker的编号需要设置不同
broker.id=0
#修改listeners(内网ip)和advertised.listeners(公网ip)配置,不能一样的ip否则启动报错
listeners=PLAINTEXT://(内网ip):端口号
advertised.listeners=PLAINTEXT://(公网ip):端口号
#修改zk地址,默认为localhost,注意:zk在那个机器上配置那个机器的公网ip地址
zookeeper.connection=localhost:2181
(3)bin目录启动,默认9092端口
#启动
./kafka-server-start.sh ../config/server.properties &
#停止
kafka-server-stop.sh
(4)创建topic
./kafka-topics.sh --create --zookeeper 8.140.116.67:2181 --replication-factor 1 --partitions 2 --topic xd888-topic
(5)查看topic
./kafka-topics.sh --list --zookeeper 8.140.116.67:2181
(6)kafka如果直接启动信息会打印在控制台,如果关闭窗口,kafka随之关闭,用守护线程的方式启动
./kafka-server-start.sh -daemon ../config/server.properties &
4.生产者发送消息和消费者消费消息
4.1.创建topic
./kafka-topics.sh --create --zookeeper 8.140.116.67:2181 --replication-factor 1 --partitions 2 --topic topic-test
参数:
kafka-topics.sh脚本启动
--zookeeper ip:端口 #zookeeper在那台机器就写那个IP端口
--replication-factor 1 #指定副本的数量为1
--partitions 2 #指定分区数为2
--topic t1 #指定topic的名称
4.2.查看topic
./kafka-topics.sh --list --zookeeper localhost:2181
参数:
kafka-topics.sh脚本启动
--list #查询列表
--zookeeper ip:端口 #zookeeper在那台机器就写那个IP端口
4.3.生产者发送消息
./kafka-console-producer.sh --broker-list 8.140.116.67:9092 --topic topic-test
参数:
kafka-console-producer.sh脚本启动
--broker-list ip:端口号 #指定kafka节点的ip端口号(注意这块一定要写配置文件里配置的ip)
--topic t1 #指定是那个topic
4.4.消费者消费消息
./kafka-console-consumer.sh --bootstrap-server 8.140.116.67:9092 --from-beginning --topic topic-test
参数:
--bootstrap-server ip:端口号 #指定kafka节点的ip端口号(注意这块一定要写配置文件里配置的ip)
--from-begnning #指当客户端退出时,重新连接消费者,那么他会将之前的消息重新消费一遍
--topic t1 #指定是那个topic
4.5.删除topic
./kafka-topics.sh --zookeeper 8.140.116.67:2181 --delete --topic topic-test
4.6.查看broker节点topic状态信息
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic xdclass-topic
注意:添加、删除、查看topic都用的./kafka-topics.sh脚本
5.Kafka点对点模型和发布订阅模型
5.1.JMS规范支持两种消息模型
点对点(point to point)
- 消息生产者生产消息发送到queue中,然后消息消费者从queue中取出并且消费消息
- 消息被消费之后,queue中不在存储,所以消息消费者不能消费到已经被消费的消息。Queue支持存在多个消费者,但对于一个消息而言,只会又一个消费者可以消费
发布订阅(publish/subscribe)
- 消息生产者(发布)将消息发布到topic中,同事有多个消息消费者(订阅)消费该消息
- 和点对点方式不同,发布到topic的消息会被所有订阅者消费
5.2.消费者点对点消费模型
- 编辑消费者配置(确保同个名称group.id一样)config/consumer.properties
- 创建topic,1个分区
./kafka-topics.sh --create --zookeeper 112.74.55.160:2181 --replication-factor 1 --partitions 1 --topic t1
- 指定配置文件启动两个消费者
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer.properties
参数:
--consumer.config #指定配置文件启动
- 现象
只有一个消费者可以消费到数据,一个分区只能被同个消费者组的某个消费者进行消费
5.3.消费者发布订阅消息模型
- 编辑消费者配置,使其groud.id不一样
config/consumer-1.properties
config/consumer-2.properties
- 指定配置文件启动两个消费者
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-1.properties
./kafka-console-consumer.sh --bootstrap-server 112.74.55.160:9092 --from-beginning --topic t1 --consumer.config ../config/consumer-2.properties
- 现象
两个消费者同时能够消费到消息
6.Kafka数据存储流程和原理概述
6.1.Partition
- topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
- 是以文件夹的形式存储在具体Broker本机上
6.2.LEO(LogEndOffset)
- 表示每个partition的log最后一条Message的位置
6.3.HW(HighWatermark)
- 表示partition各个replicas数据键同步且一致的offset位置,即表示allreplicas已经commit的位置
- HW之前的数据才是commit后的,对消费者才可见
- ISR集合里面最小leo
6.4.offset
- 每个partition都由一些列有序的、不可变的消息组成,这些消息被连续的追加到partition中
- partition中的每个消息都有一个连续的序号叫做offset,用于partition唯一标识一条信息
- 可以认为offset是partition中Message的id
6.5.Segment
- 每个partition又由多个segment file组成
- segment file 由2部分组成,分别为index file 和 data file(log file)
- 两个文件一一对应,后缀“.index”和".log"分别标识索引文件和数据文件
- 命名规则:partition的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset+1
6.6.Kafka高效文件存储设计特点
- kafka把topic中一个partition大文件分成多个小文件段,通过多个小文件段,就容易定期清除或删除已经消费完的文件,减少磁盘占用。
- 通过索引信息可以快速定位Message
- producer生产数据,要写入到log文件中,写的过程中一直追加到文件末尾,为顺序写,官网数据表明,同样的磁盘,顺序写能到600M/S,而随机写只是100K/S
7.SpringBoot2.x项目整合Kafka
7.1.引入kafka-clients依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency>
7.2.配置客户端
AdminClientConfig.BOOTSTARP_SERVERS_CONFIG,"ip:端口"
AdminClient.create(配置信息);
/**
* 设置admin客户端
* @return
*/
public static AdminClient initAdminClient(){
Properties properties = new Properties();
properties.setProperties(AdminClientConfig.BOOTSTARP_SERVERS_CONFIG,"112.74.55.160:9092");
AdminClient adminClient = AdminClient.create(properties);
return adminClient;
}
7.3.创建topic
NewTopic newTopic = new NewTopic(topic名称,分区数量,副本数量);
adminClient.createTopics(Arrays.asList(newTopic)); //返回一个CreateTopicsResult
createTopicsResult.all().get(); //异常处理
@Test
public void createTopic(){
AdminClient adminClient = initAdminClient();
//2个分区,1个副本
NewTopic newTopic = new NewTopic(TOPIC_NAME,2,(short)1);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
try {
//future等待创建,成功不会有任何报错,失败或者超时会报错
createTopicsResult.all().get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("ڠୌෛጱtopic");
}
7.4.删除topic
adminClient.deleteTopics(topic名称的list集合); //返回一个DeleteTopicsResult
deleteTopicsResult.all().get();
@Test
public void delTopicTest(){
AdminClient adminClient = initAdminClient();
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclass-test-topic"));
try {
deleteTopicsResult.all().get();
} catch (Exception e) {
e.printStackTrace();
}
}
7.5.列举topic-list
adminClient.listTopics();
adminClient.listTopics(options); //返回一个ListTopicsResult
Set<String> topics = listTopics.names().get(); //得到一个set集合遍历
//是否查看内部topic,可以不用
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
@Test
public void listTopic(){
AdminClient adminClient = initAdminClient();
//是否查看内部topic,可以不用
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopics = adminClient.listTopics(options);
Set<String> topics = listTopics.names().get();
for (String topic : topics) {
System.err.println(topic);
}
}
7.6.增加分区数量
NewPartitions.increateTo(5);
infoMap.put(TOPIC_NAME, newPartitions);
adminClient.createPartitions(infoMap);
@Test
public void incrPartitionsTest() throws Exception{
Map<String, NewPartitions> infoMap = new HashMap<>();
NewPartitions newPartitions = NewPartitions.increaseTo(5);
AdminClient adminClient = initAdminClient();
infoMap.put(TOPIC_NAME, newPartitions);
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
createPartitionsResult.all().get();
}
7.7.查看topic详情
adminClient.describeTopics(Arrays.asList(TOPIC_NAME))
@Test
public void getTopicInfo() throws Exception {
AdminClient adminClient = initAdminClient();
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
Map<String, TopicDescription> stringTopicDescriptionMap =
describeTopicsResult.all().get();
Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc:"+ entry.getValue()));
}
8.producer发送到Broker分区策略
8.1.生产者发送消息到broker的策略
- 如果指定Partition ID,则PR(ProducerRecord)被发送到指定的Partition。
- 如果未指定Partition ID,但是指定了Key,PR就会按照Key的哈希取模发送到对应的Partition
- 如果未指定Partition ID,也未指定Key,PR会按照默认round-robin轮询模式发送到每个Partition
- 如果即制定了Partition ID,也指定了Key,PR会被发送到指定的Partition
注意:Partition有多个副本,但只有一个replicationLeader复制该Partition和生产者消费者交互,消费者默认的消费Partition是range模式。
8.2.生产者到broker发送流程
Kafka的客户端发送数据到服务器,会经过内存缓冲区(默认是16KB),通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲区里,然后把很多消息收集到Batch里面,然后在通过Sender线程发送到Broker上面,这样才尽可能的提高性能。
8.3.生产者常见配置
#Kafka地址,即broker的ip地址
bootstrap.servers
#当producer向leader发送数据时,可以通过request.required.acks来设置数据可靠性的级别,分别是0,1,all
acks
#请求失败,生产者会自动重试,默认是0次,如果启动重试,则会有消息重复消费的可能性
retries
#每个分区未发送消息的总字节大小,超过的化就会提交到服务端broker,默认是16kb
batch.size
#默认是0,指消息立即发送,即便是batch.size缓冲区还没满,到达linger.ms设置的秒数,也会提交消息到服务器
linger.ms
#buffer.memory用来约束KafkaProducer能够使用的缓冲区大小,默认是32MB
#注意:buffer.memory不能设置的太小,否则一旦写满,就会阻塞用户线程,不能在向kafka里写消息了
#buffer.momery一定要比batch.size设置的大,否则会报申请内存不足的错误。
buffer.memory
#key的序列化器,将用户提供的 key和value对象ProducerRecord 进行序列化处理,key.serializer必须被设置,即使消息中没有指定key,序列化器必须是一个实现org.apache.kafka.common.serialization.Serializer接口的类,将key序列化成字节数组。
key.serializer
value.serializer
9.producer API讲解
9.1.封装配置属性
public static Properties getProperties(){
Properties props = new Properties();
//kafka服务器地址
props.put("bootstrap.servers", "112.74.55.160:9092");
//props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "112.74.55.160:9092");
//当producer向leader发下哦那个数据时,可以通过request.required.acks参数来设置数据可靠性级别,0,1,all
props.put("acks", "all");
//props.put(ProducerConfig.ACKS_CONFIG, "all");
//表示请求失败时,生产者会尝试自动重连,0表示不重连(默认),如果开启重连的话,可能会导致消息重复消费
props.put("retries", 0);
//props.put(ProducerConfig.RETRIES_CONFIG, 0);
//设置batch缓冲区的大小,表示当到达缓冲区的大小时,sender线程将拿取batch中的消息,默认是16kb
props.put("batch.size", 16384);
//表示缓冲区消息停留的时间,默认是0,立即发送,配置之后即使batch中的数据没有达到设定的值,到达时间后也会发送消息
props.put("linger.ms", 1);
//用来约束kafka Producer能够使用的内存缓冲区的大小,默认是32MB
props.put("buffer.memory", 33554432);
//序列化机制
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
return props;
}
9.2.生产者投递消息同步发送
send()方法是异步的,添加消息到缓冲区等待发送,并立即返回
生产者将单个消息批量在一起发送提高效率,即batch.size和linger.ms结合
实现同步发送:一条消息发送之后,会阻塞当前线程,直至返回ack
发送消息后返回一个Future对象,调用get即可
消息发送主要是两个线程:Main主线程,Sender线程
main线程发送消息到RecordAccumulator即返回
sender线程从RecordAccumulator拉取信息发送到broker
batch.size和linger.ms两个参数可以影响 sender 线程发送次数
@Test
public void testSend(){
Properites props = getProperties();
Producer<String,String> producer = new KafkaProducer<>(props);
for(int i = 0;i < 3;i++){
Future<RecordMetadata> future = producer.send(TOPIC_NAME,"xdclass-key"+i,"xdclass-value"+i);
try{
RecordMetadata recordMetadata = future.get(); //不关心结果的话,可以不用这部
System.out.println("发送状态:"+recordMetadata.toString());
}catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
//记得要关必
producer.close();
}
9.3.异步发送配置回调函数
发送消息配置回调函数即可,该回调方法会在Producer收到ack时被调用,为异步调用
回调函数有两个参数RecordMetadata和Exception,如果Exception是null,则发送消息成功,否则失败
@Test
public void testSendCallback(){
Properties properties = getProperties();
Producer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 3; i++) {
producer.send(new ProducerRecord<>(TOPIC_NAME, "xdclass-key" + i, "xdclass-value" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
System.out.println("发送状态:"+recordMetadata.toString());
}else{
e.printStackTrace();
}
}
});
}
producer.close();
}
9.4.producer发送消息到指定分区
发送到指定topic的第五个分区
@Test
public void testSendCallbackAndPartition(){
Properties properties = getProperties();
Producer<String, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i < 3; i++) {
//在ProducerRecord中配置,第二个参数
producer.send(new ProducerRecord<>(TOPIC_NAME, 4,"xdclass-key" + i, "xdclass-value" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e == null){
System.out.println("发送状态:"+recordMetadata.toString());
}else{
e.printStackTrace();
}
}
});
}
producer.close();
}
10.ProducerRecord介绍
10.1.ProducerRecord(简称PR)
发送给Kafka Broker的key/value键值对,封装基础数据信息
--Topic(topic名称)
--Partition ID(可选,分区的ID)
--Key(可选,指定的key)
--value(发送的消息value)
key默认是null,大多数应用程序会用到key
- 如果key为空,kafka会使用默认的partitioner,使用RoundRobin算法将消息均衡的分布在各个partition上
- 如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息改写到Topic的那个partition,拥有相同的key的消息会被写道同一个partition上,实现顺序消息
11.生产者自定义partition分区规则
11.1.源码解读默认分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
11.2.自定义Partitioner类实现Partitioner接口
public class MyPartitioner implements Partitioner {
//在partition方法里实现自定义的配置规则
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if("xdclass".equals(key)) {
return 0;
}
//使用hash值取模,确定分区(默认的也是这个方式)
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
11.3.配置自定义的Partitioner生效
在配置的对象里加上配置props.put(“partitioner.class”,“com.lixiang.config.MyPartitioner”); //自定义的配置路径
props.put("partitioner.class", "com.lixiang.config.MyPartitioner");
12.Consumer消费者机制和分区策略
12.1.消费者拉取数据机制
消费者为什么是从broker中pull数据,而不是broker主动push给消费者呢
- 消费者采用pull方式拉取,从broker的partition获取数据。
- pull模式则可以根据consumer的消费能力进行自行调节拉取消息的多少,不同的消费者性能不一样。
- 如果broker没有消息,consumer可以配置timeout时间,阻塞一段时间之后在返回。
- 如果是broker主动push的话,优点是可以快速处理,因为消费者pull模式,可能会造成生产者已经把消息投递到broker,但是消费者没有及时的取pull,但是push容易造成消费者处理不过来,消息堆积和延迟。
12.2.消费者从那个分区进行消费
一个topic有多个partition,一个消费者组里面又多个消费者,那是怎么分配的?
- 一个主题topic可以有多个消费者,因为里面有多个partition分区(leader分区)
- 一个partition Leader可以由一个消费者组里的一个消费者消费
12.3.消费者消费的分区策略
顶层接口
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
- round-robin(RoundRobinAssignor非默认策略)轮询策略
- 按照消费者组进行轮询分配,同个消费者组监听不同主题也是一样,是把所有的partition和所有consumer都列出来,所以消费者组里面订阅的主题是一样才行,主题不一样则会出现分配不均匀的问题。
- 例如7个分区,同组内2个消费者,你一个我一个分配
c-1:topic-0、topic-2、topic-4、topic-6
c-2:topic-1、topic-3、topic-5
弊端
如果同一消费者组内,所有订阅的消息是不同的,在执行分区分配的时候不是轮询分配,可能会导致分区配置不均匀
- range(RangeAssignor默认策略)范围策略
- 按照主题进行分配,如果不平均分配,则第一个消费者会分配比较多的分区,一个消费者监听不同主题也不影响
- 例如7个分区,同组内2个消费者,全部主题平均分配
c-1:topic-0、topic-1、topic-2、topic-3
c-2:topic-4、topic-5、topic-6
弊端
只针对一个topic而言,c-1多消费一个分区影响不大,但是如果N多个Topic,那么针对每个Topic,消费者c-1每次都要多分一个分区,那这会造成负载过大,影响性能
12.4.什么是Rebalance操作
- kafka怎么均匀的分配某个topic下的所有partition到各个消费者,从而使得消息的消费速度达到最快,就是平衡(balance),前面讲了Range范围分区和RoundRobin轮询策略,也支持自定义分区策略。
- 而rebalance(重平衡)其实就是重新进行partition的分配,从而使得partition的分配重新达到平衡状态
12.5.Rebalance触发的机制
- 当消费者组内消费者数量发生变化时(增加或者减少),就会产生重新分配partition
- 分区数量发生变化时(即topic分区的数量发生变化时)
12.6.容灾消费机制
当消费者在消费过程中突然宕机了,重新恢复服务时从那里消费的?
消费者会记录offset,故障恢复后会从这里继续消费,之前版本会存在zookeeper或者本地里,新版默认存在kafka内置的topic中,名称是_consumer_offsets。
- 该topic默认有50个Partition,每个Partition有三个副本,分区数量可以由offset.topic.num.partition进行配置
- 通过groupId的哈希值和该参数取模的方式来确定某个消费者组已消费的offset保存到_consumer_offsets主题的那个分区中
- 由消费者组名+主题+分区,确定唯一的offset的key,从而获取对应的key
- 三元组:group.id+topic+分区号,而value就是offset的值
13.Consumer配置和Kafka调试日志配置
13.1.配置日志级别
#yum配置文件修改
logging:
config: classpath:logback.log
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!--log日志的级别-->
<root level="info">
<appender-ref ref="STDOUT" />
</root>
</configuration>
13.2.消费者配置
#消费者组id,分组内的消费者只能消费该消息一次,不同的组可以重复此消息
group.id
#为true的时候自动提交偏移量
enable.auto.commit
#自动提交offset周期
auto.commit.interval.ms
#重置消费便宜量策略,消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(消费者长时间读取一个无效的偏移量记录)该怎么处理
#默认是latest,如果需要从头消费,则需要消费者组名变更才可以
auto.offset.reset
#序列化器,反序列化机制,生产者序列化,消费者反序列化
key.deserializer
13.3.Kafka消费者Consumer消息配置实战
配置
public static Properties getProperties() {
Properties props = new Properties();
//kafka服务器broker地址
props.put("bootstrap.servers", "8.140.116.67:9092");
//消费者组id,同组消费者只能一个消费者消费,不同组可以重复消费消息
props.put("group.id", "xdclass-g1");
//默认是latest,从最新的消息开始消费,earliest是从头开始消费,但是还要改消费者组
props.put("auto.offset.reset","earliest");
//是否自动提交偏移量,一般不自动
props.put("enable.auto.commit", "false");
//自动提交offset延迟时间
props.put("auto.commit.interval.ms", "1000");
//反序列化机制
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
消费者订阅
订阅主题api:subscribe(topic主题集合)
拉取消息api:poll(Duration.ofMillis(500))
@Test
public void simpleConsumerTest(){
Properties props = getProperties();
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//订阅topic主题
consumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME));
while (true) {
//拉取时间控制,阻塞超时时间,当拉取topic里没有消息时,消费者阻塞500毫秒
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (ConsumerRecord<String, String> record : records) {
System.err.printf("topic = %s, offset = %d, key = %s, value = %s%n",record.topic(), record.offset(), record.key(), record.value());
}
}
}
13.4.Consumer手工提交offset配置和从头消费配置
配置从头消费partition消息
auto.offset.reset配置策略即可
默认是latest,需要改为earliest且消费者组名变更,即可实现从头消费
//默认是latest,如果需要从头消费partition消息,需要改为earliest,且变更消费者组名,才生效
props.put("auto.offset.reset","earliest");
配置手动提交
- 自动提交的问题
- 没法控制消息是否正常被消费
- 适合非常严谨的场景,比如日志收集发送
- 手工提交offset配置
- 同步commitSync阻塞当前线程(自动失败重试)
- 异步commitAsync不会阻塞当前线程(没有失败重试,回调callback函数获取提交信息,记录日志等)
@Test
public void simpleConsumerTest(){
Properties properties = getProperties();
KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<>(properties);
//订阅主题
kafkaConsumer.subscribe(Arrays.asList(KafkaProducerTest.TOPIC_NAME));
while(true){
ConsumerRecords<String,String> poll = kafkaConsumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : poll) {
System.out.println("topic:"+record.topic()+"--"+"key:"+record.key()+"--"+"offset:"+record.offset()+"--"+"value:"+record.value());
}
//同步阻塞提交
//kafkaConsumer.commitSync();
if(!poll.isEmpty()){
//异步提交offset
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if(exception == null){
System.out.println("手工提交offset成功");
}else{
System.out.println("手工提交offset失败");
exception.printStackTrace();
}
}
});
}
}
}
14.kafka数据存储流程
14.1.kafka数据存储流程
kafka采用了分片和索引机制,将每个partition分为多个segment,每个segment对应2个文件log和index
index文件中并没有为每一条message简历索引,采用了稀疏存储的方式
每隔一定字节的数据建立一条索引,避免了索引文件占用过多的空间和资源,从而可以将索引文件保留到内存中
缺点是没有建立索引的数据在查询过程中需要小范围的顺序扫描操作
14.2.自定义的索引分片
在server.properties中进行配置
#默认是1G,segment文件达到1G(index和log文件的总和),进行分片
log.segment.bytes=1073741824
14.3.文件分段的规则
#分段一
00000000000000000000.index 00000000000000000000.log
#分段二 数字1234指的是当前文件的最小偏移量offset,即上个文件的最后一个消息offset+1
00000000000000001234.index 00000000000000001234.log
#分段三
00000000000000088888.index 00000000000000088888.log
15.分布式系统CAP定理
CAP定理:指的是在一个分布式系统中,Consistency(一致性)、Availability(可用性)、Partition tolerance(分区容错性),三者不可同事获取
- 一致性(C):所有节点都可以访问待最新的数据,锁定其他节点,不一致之前不可读
- 可用性(A):每个请求都是可以得到响应的,不管请求是成功还是失败
- 分区容错性(P):除了全部整体网络故障,其他故障都不能导致系统不可用,节点间通信可能失败,无法避免
CAP理论就是说分布式存储系统中,最多只能实现上面两个点。而由于当前的网络硬件肯定会出现延迟丢包等问题,所以分区容忍性是我们必须要接受的。所以我么那只能在一致性和可用性之间进行权衡。
CA:如果不要求P(不允许分区),则C(强一致性)和A(可用性)是可以保证的。但放弃P的同时也就意味着放弃了系统的扩展性,也就是分布式节点受限,每办法部署子节点,违背分布式系统设计的初衷。
CP:如果不要求A(可用性),每个请求都需要在服务器之间保持强一致,而P(分区)会导致同步时间无限延长(也就是等待数据同步完成才可以正常访问),一旦发生网络故障或者消息丢是的情况,就要牺牲用户的体验,等待所有数据全部一致了在让去用户去访问
AP:要高可用并且允许分区,则需要放弃一致性。一旦分区发生,节点之间可能会失去联系,为了高可用,每个节点只能用本地数据提供服务,而这样会导致全局数据的不一致性。
结论:
分布式系统中P肯定要满足,所以只能CA中二选一
CP:适合支付、交易类、要求强一致性,宁可业务不可用,也不能出现脏数据
AP:信息流架构,不要求数据强一致性,更想要服务的可用
16.Kafka数据可靠性投递
16.1.生产者发送消息确认机制
保证producer发送到指定的topic,topic的每个partition收到producer发送的数据后,需要像producer发送一个ack确认收到,然后才会进入到下一轮的发送数据,否则就会重新发送数据。
16.2.副本数据同步机制
- 当producer向partition写数据时,根据ack机制,默认ack=1,只会向leader中写入数据。
- 然后leader中的数据会复制到其他的replication中,follower会周期性的从leader中pull数据
对于数据的读写操作都在leader replication中,follower之作为副本使用,假如follower在向leader pull数据的时候,还没同步完成,leader突然宕机这回怎末办?
16.3.副本数据同步策略
当producer向parition发送数据时,可以根据request.required.acks参数来时数据的可靠性级别。
ack=0
- producer发送一次消息到partition就不再发送了,不管是否发送成功,发出去的消息可能还在内存中,还没有写道磁盘中,Leader宕机了,producer也认为发送成功了,此时消息就达不到一致性。
ack=1(默认)
- 只要producer发送到partition中的数据写入到leader的磁盘中就认为发送成功了,返回给producer一个ack标识,不会管follower是否同步完成。
- 问题:如果leader刚刚接收到消息,还没来的及取同步follower宕机了,这回就会造成数据的丢失
ack=all(-1)
- producer发送数据到partition leader ,只有当leader的所有follower完全同步完成的时候才会返回ack标识,通知消息推送成功。
- 注意:当leader在同步follower的时候,某个follower因为网络的原因一直同步不上,这会kafka里有一个概念叫做isr集合,partition 副本的集合,leader也在里面,假如某个follower一致同步不上,isr就会把他给剔除集合,isr时一个可变的分区集合。
- 问题1:producer向partition发送数据,部分isr副本同步,leader此时挂掉,那么kafka会从follower中重新选择leader,假如某个follower已经同步完成数据了,当它被选上leader的时候,producer会重现向他发送数据,这会就会造成数据的重复发送。
- 问题2:假如partition 只有一个副本,那即使是all也会造成数据丢失,接受完消息后宕机,所以ack=all必须跟isr里面至少有两个副本的情况下使用。
- 在设置requsest.required.acks=-1的同时,也要min.insync.replicas这个参数设定isr中的最小副本为>=2,默认为1,如果isr中副本的数量少于min.insync.replicas的数量时,客户端会报异常。
17.Kafka数据可靠性保障原理之ISR机制
什么是ISR(in-sync replica set)
- leader会维持一个与其保持同步的replica集合,该集合就是ISR,每一个leader partition都有一个ISR,keader本身也在ISR集合中,leader动态维护,保证kafka消息不丢失,ISR中至少有一个存活,并且commit成功的。
- Partition leader保持同步的Partition Follower集合,当ISR中的Partition Follower完成数据的同步之后,就会给leader发送ack。
- 如果partition follower长时间未响应(replica.lag.time.max.ms),则partition follower就会被剔除ISR集合
- partition leader 发生故障后,就会从ISR中重新选出partition leader
OSR(out-of-sync-replca set)
- 与leader副本分区,同步滞后过多,被剔除ISR集合的副本
AR(Assign Replicas)
- 分区中所有的集合称为AR,即ISR+OSR
18.Kafka的HighWatermark的作用
ACK保障了【生产者】的投递可靠性
partition的多副本保证了【消息存储】的可靠性
hw的作用:保证消费数据的一致性和副本数据的一致性
Follower故障
Folllower发生故障后会被临时踢出ISR集合中,等该Follower恢复后,follower会读取本地磁盘上次记录的HW,并将该log文件高于HW的部分去掉,从HW开始向leader同步,等该follower的LEO大于等于该partition的hw,即follower追上leader后,重新假如ISR集合
Leader故障
Leader发生故障后,会从ISR中选出一个新的leader,为了保证多个副本之间的数据一致性,其余的follower会先将各自的log文件搞与hw的部分劫掉(新的leader不会劫掉),然后在跟新的leader进行同步
19.kafka高可用和高性能
19.1.Kafka-zookeeper集群搭建
(1)解压zookeeper压缩包,重命名为zk1,zk2,zk3
tar -xvf apache-zookeeper-3.7.0-bin.tar.gz
mv apache-zookeeper-3.7.0 zk1
cp -r zk1 zk2
cp -r zk1 zk3
(2)修改每个zookeeper的配置文件
端口:
- 2181
- 2182
- 2183
#客户端端口
clientPort=2181
#数据存储路径,以节点为命名区分
dataDir=/tmp/zookeeper/2181
#修改AdminServer端口
admin.serverPort=8888
(3)dataDir中创建myid文件,内容分别为1,2,3作为zk的唯一标识
cd /tmp/zookeeper/2181
echo 1 >myid
cd /tmp/zookeeper/2182
echo 2 >myid
cd /tmp/zookeeper/2183
echo 3 >myid
(4)配置集群
#各个配置文件zoo.cfg加入集群配置
#server.服务器id=服务器IP地址:服务器直接通信的端口:服务器之间选举投票的端口
server.1=127.0.0.1:2881:3881
server.2=127.0.0.1:2882:3882
server.3=127.0.0.1:2883:3883
(5)启动zk的各个节点
#启动zk1节点
cd /usr/local/software/zk1/bin
./zkServer.sh start
#启动zk2节点
cd /usr/local/software/zk2/bin
./zkServer.sh start
#启动zk3节点
cd /usr/local/software/zk3/bin
./zkServer.sh start
(6)查看各个几点的状态
./zkServer.sh status
(7)模拟leader副本宕机,follower重新选取leader,宕机后的leader并入ISR集群变为follower
(8)配置kafka集群
端口:
- 9092
- 9093
- 9094
配置
#内网中使用,内网部署kafka集群只须用到listeners,内网需要作为区分时,才需要用到advertised.listeners
listeners=PLAINTEXT://172.18.123.229:9092
advertised.listeners=PLAINTEXT://112.74.55.160:9092
#每个节点编号1、2、3
broker.id=1
#端口
port=9092
#配置3个
log.dirs=/tmp/kafka-logs/kafka1
#zk地址
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
(9)启动三个kafka实例
./kafka-server-start.sh -daemon ../config/server.properties & #守护线程
./kafka-server-start.sh ../config/server.properties &
./kafka-topics.sh --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 3 --partitions 6 --topic xdclass-cluster-topic
19.2.Kafka中的日志数据清理
kafka将数据持久化到硬盘上,为了控制磁盘容量,需要对过去的消息进行清理,kafka内置有个定时任务检测删除日志,默认是5分钟。
log.retention.check.interval.ms=30000
根据segment单位进行定期清理
启用cleaner
- log.cleaner.enable=true
- log.cleaner.threads=2(清理线程数配置)
支持配置策略对数据定期清理
日志删除
- log.cleanup.policy=delete
#清理超过指定时间的消息,默认是168小时,7天
#还有log.retention.ms,log.metention.minutes,log.retention.hours 优先级高到低
log.retention.hours=168
#超过指定大小后,删除旧的消息,下面是1G的字节数,-1就是没有限制
log.retention.bytes=1073741824
#还有基于日志起始位移(log start offset)
基于【时间删除】日志
每个日志文件都维护一个最大时间戳字段,每次日志写入新的消息时,都会更新该字段
一个日志段,segment写满了被切分之后,时间戳就不会在发生改变,这时kafka可以根据当前的时间去和各个日志文件的时间戳去进行比较来判断是否过期了
基于【大小超过阈值删除】日志
超过阈值的部分不许要大于一个日志字段的大小,这回kafka会把最早的一个日志文件删除
假设热值大小是500MB,当前分区共有四个日志文件,大小分别为500MB,500MB,500MB,10MB
假设阈值设置1500,那么kafka会计算当前文件的总值,500*3+10=1510MB,1510-1500=10 <500,所以kafka不会删除日志文件
假设阈值设置1000,1510-1000 = 510 >500,所以kafka会删除最早的一个日志文件
log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除
日志压缩
- log.cleanup.policy=compact启用压缩策略
- 按照key进行整理,有相同的key不同的value,值保留最后一个
19.3.Kafka的高性能原理分析-ZeroCopy
零拷贝ZeroCopy(SendFile)
案例:将一个File读取并发送出去(Linux有两个上下文,内核态、用户态)
传统linux上
- file文件经过了四次拷贝,调用read,将文件拷贝到了kernel内核态,CPU控制kernel态的数据copy到用户态,调用write时,user态下的内容会copy到内核态的socket的buffer中
- 最后将内核态的socket buffer的数据copy到网卡设备中传送
- 缺点:增加了上下文切换,浪费了两次无效拷贝
ZeroCopy
- 请求kernel直接把disk的data传输给socket,而不是通过应用程序传输,ZeroCopy大大提高了应用程序的性能,较少不必要的内核缓冲区很用户缓冲区间的拷贝,从而减少cpu的开销和减少了kernel和user模式的 上下文切换,达到性能的提升
- 对应零拷贝技术有mmap及sendfile
- mmap:小文件传输快
- sendfile:大文件传输必mmap快
kafka高性能
- 存储模型,topic多个分区,每个分区多个segment段
- index索引文件查找,利用分段和稀疏索引
- 磁盘顺序写入
- 异步操作少阻塞sender和main线程,批量操作
- 页缓存Page cache,没有利用jvm内存,因为容易GC影响性能
- 零拷贝ZeroCopy
20.SpringBoot项目整合Spring-kafka
20.1.Springboot项目整合spring-kafka依赖发送消息
- 添加pom文件
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
- 配置文件修改增加生产者信息
spring:
kafka:
bootstrap-servers: 192.168.111.31:9092,192.168.111.31:9093,192.168.111.31:9094
producer:
# 消息重发的次数。
retries: 0
#一个批次可以使用的内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
- 发送消息
private static final String TOPIC_NAME = "user.register.topic";
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 发送消息
* @param phone
*/
@GetMapping("/api/user/{phone}")
public void sendMessage1(@PathVariable("phone") String phone) {
kafkaTemplate.send(TOPIC_NAME, phone).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
ework.kafka
spring-kafka
- 配置文件修改增加生产者信息
```bash
spring:
kafka:
bootstrap-servers: 192.168.111.31:9092,192.168.111.31:9093,192.168.111.31:9094
producer:
# 消息重发的次数。
retries: 0
#一个批次可以使用的内存大小
batch-size: 16384
# 设置生产者内存缓冲区的大小。
buffer-memory: 33554432
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
- 发送消息
private static final String TOPIC_NAME = "user.register.topic";
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 发送消息
* @param phone
*/
@GetMapping("/api/user/{phone}")
public void sendMessage1(@PathVariable("phone") String phone) {
kafkaTemplate.send(TOPIC_NAME, phone).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}