一、概述
- kafka是一个分布式的基于发布/定义的消息队列(Message Queue)
- 通信处理
- 同步处理 客户端->数据库->发送短信->响应客户端
- 异步处理 客户端->数据库->发送短信放入MQ(直接响应客户端)
- 消息队列的优势
- 解耦: 允许独立的处理两边处理过程,遵循接口约束即可
- 可恢复性:当某一边出现异常,也可保证信息在队列中不会丢失
- 缓冲:控制生产和消费速度不一致
- 灵活性&峰值处理:环境并发压力,动态增删服务器
- 异步通讯
- 消息队列的两种模式
- 点对点模式(一对一模式,消费者主动拉取消息并消除消息)
- 发布/订阅模式(一对多模式消费者消费数据后不会清除)
二、基础架构
- producer:消息生产者,发布消息到 kafka 集群的终端或服务。
- broker:kafka 集群中包含的服务器。
- topic:每条发布到 kafka 集群的消息属于的类别,即 kafka 是面向 topic 的。
- partition:partition 是物理上的概念,每个 topic 包含一个或多个 partition。kafka 分配的单位是 partition。
- consumer:从 kafka 集群中消费消息的终端或服务。
- Consumer group: high-level consumer API 中,每个 consumer 都属于一个 consumer group,每条消息只能被 consumer group 中的一个 Consumer 消费,但可以被多个 consumer group 消费。
- replica:partition 的副本,保障 partition 的高可用。
- leader: replica 中的一个角色, producer 和 consumer 只跟 leader 交互。controller通过算法选出
- follower:replica 中的一个角色,从 leader 中复制数据。
- controller:kafka 集群中的其中一个服务器,用来进行 leader election 以及 各种 failover。通过竞争抢
- zookeeper:kafka 通过 zookeeper 来存储集群的 meta 信息。
三、下载及安装
-
下载并解压
- 下载地址 http://kafka.apache.org/downloads
- 安装目录到opt/kafka并解压 tar -zxvf kafka_2.11-2.4.0.tar.gz
-
配置文件
-
进入解压后的文件config文件修改server.properties文件
# broker的id,必须是唯一的整数 broker.id=0 #是否允许删除topic delete.topic.enable=true #内网服务地址 listeners=PLAINTEXT://0.0.0.0:9092 #外网服务地址 advertised.listeners=PLAINTEXT://119.3.221.118:9092 #临时的数据目录 log.dirs=/tmp/kafka-logs #每个topic默认的partition个数 num.partitions=1 #每个topic默认的副本个数 offsets.topic.replication.factor=1 #临时数据保存的时间 log.retention.hours=168 #最大的数据大小 log.segment.bytes=1073741824 #启动用幂等性没保证数据唯一性 enable.idompotence=true #zookeeper的集群连接地址 zookeeper.connect=127.0.0.1:8600,192.168.156:8601,192.168.157:8602
-
-
配置环境变量
export KAFKA_HOME=/opt/kafka export PATH=$KAFKA_HOME/bin/:$PATH
- 刷新环境变量:source /etc/profile
-
启动服务
-
启动zokeeper
cd /opt/kafka; ./bin/zookeeper-server-start.sh -daemon /mnt/project/kafka/config/zookeeper.properties
-
启动kafka (进入kafka目录下,守护进程方式执行启动脚本并指定配置文件)
cd /opt/kafka; /bin/kafka-server-start.sh -daemon /mnt/project/kafka/config/server.properties
-
-
创建kafka的启动和停止脚本(kafka.sh)(可选)
```bash #!/bin/bash case $1 in "start" ){ for i in 127.0.0.1 192.168.1.119 192.168.1.118 do echo "开始启动$i" ssh $i "/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties" done };; "stop" ){ for i in 127.0.0.1 192.168.1.119 192.168.1.118 do echo "开始关闭$i" ssh $i "/mnt/project//kafka/bin/kafka-server-stop.sh" done };; esac ```
- 复制到kafka目录下
- 赋予权限: chmod 777 kafka.sh
- 启动kafka: kafka.sh start
- 关闭kafka: kafka.sh stop
四、Kafka基础操作
4.1 Kafka命令行操作(最后的的地址为zookeeper的服务器和启动的端口号)
- 查看topic:bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181
- 删除topic:bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic topicName
- 创建topic:bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --topic topicName --partitions 1 --replication-factor 1
- topic 主题名称
- partitions 分区数量:推荐为消费者的整数倍
- replication-factor 副本数量:小于等于broker(kafka的集群数量)
- 查看单个topic:bin/kafka-topics.sh --describe --topic topicName --zookeeper 127.0.0.1:2181
- 启动生产者控制台:bin/kafka-console-producer.sh --topic zzx --broker-list 127.0.0.1:9092
- 启动消费者控制台:bin/kafka-console-consumer.sh --topic zzx --bootstrap-server 127.0.0.1:9092 (127.0.0.1是生产者的ip)
- 启动消费者控制台:bin/kafka-console-consumer.sh --topic zzx --bootstrap-server 127.0.0.1:9092 --from-beginning (离线状态从头接收所有消息)
- 注意:
- 0.9版本后的kafka使用bootstrap-server,相关的数据信息都存在kafka本地
- leader和follower本质上都是副本并且leader和follower不可能存在同一台机器上
- 查看消费者组:bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list
4.2 创建错误
- The Cluster ID doesn’t match stored clusterId Some(VGdS0qUyTjivSazKdPs2Gg) in meta.properties.
- 修改logs文件中的clusterId
五、Kafka架构深入
5.1 kafka工作流及文件存储机制
- 每一个broker都一个偏移量,偏移量记录消费者当前消费的位置
- kafka为了避免文件过大导致搜索效率;采取了分片和索引规则;将每一个partition分为多个segment,每个segment包含.log和.index文件。
- 命名规则
- 文件夹命名:topic的名字±分区号;例如:topic的名字是zzx;当前分区是0;所以真正的文件夹名字为 zzx-0
- 文件的命名:当前segment第一条数据的偏移量命名;当数据文件的大于log.segment.bytes的配置,会创建第二个文件,以此类推;
- 例如当偏移量110的时候超过了文件大小,则会创建新的文件00000000000000000111.log
- kafka的logs文件中存储了kafka的相关记录数据
- 00000000000000000000.log、00000000000000000111.log
- 记录的是真实的生产数据
- 00000000000000000000.index、00000000000000000111.index
- 记录的是偏移量和当前偏移量的数据大小
- 记录的是偏移量和当前偏移量的数据大小
- 00000000000000000000.log、00000000000000000111.log
5.2 kafka生产者
- 分区策略
- 分区的原因
- 方便集群的扩展,提高负载能力
- 提高并发,以分区(partition)为一个读写单位
- 分区原则
- 我们需要将producer生产的数据封装成一个ProducerRecord对象,ProducerRecord提供了多个重载方法
- 可以指明要发送到的分区(partition)
- 未指明分区但是包含key,会根据key的hash与topic的partition数进行取余得到分区
- 如都没有,会轮询分区
- 我们需要将producer生产的数据封装成一个ProducerRecord对象,ProducerRecord提供了多个重载方法
- 分区的原因
- 数据可靠性保证
- 当副本全部都接收并同步数据(leador和replica)会返回ack,如果producer接收到ack则代表数据接收正常,否则会重新发送。
- 若其中一个副本宕机,将导致永远不会发送ack,因此推出了Isr(同步副本)的概念
- Isr(同步副本)
- leador会同步Isr中的follower,如果follower长时间(replica.lag.time.max.ms=10000)(毫秒)未同步数据,会被踢出Isr
- ack(同步响应机制)三种响应级别
- acks = 0 不等待响应
- acks = 1 只等待leador接收到数据就响应
- acks = -1 等待副本同步
- 消费数据一致性保证
- LEO: 每一个副本的最大offset,当offset和leador的offset相同则会有机会进入ISR
- 是否能够进入分区则会按照HW是否大于ISR中的HW
- HW: 消费者所能看到的最大offset,ISR队列中数据量最小的LEO的数量
- 当leador宕机后,会重新选取leador,其他得follower会截取掉HW之后的数据,并重新同步新的leador数据
- 当follower宕机后恢复,会重新读取本地数据,并截取掉HW之后的数据,重新同步leador数据后,重新进入ISR
- LEO: 每一个副本的最大offset,当offset和leador的offset相同则会有机会进入ISR
- 数据唯一性(数据区重复)
- 添加配置 enable.idompotence=true
- At Least Once + 幂等性 = Exactly Once
- 当副本全部都接收并同步数据(leador和replica)会返回ack,如果producer接收到ack则代表数据接收正常,否则会重新发送。
5.3 kafka消费者
- 消费方式
- 拉取模式(pull) 不足:如果没有数据,会频繁的拉取空数据;引入了timeout参数;
- 消费者消费的时候会传递过来timeout,当没有数据的时候,会在timeout之后才会返回数据
- 推送模式(push)主动推送,不能保证消费者的消费能力,会造成阻塞
- 拉取模式(pull) 不足:如果没有数据,会频繁的拉取空数据;引入了timeout参数;
- 分区分配策略
- 一个主题可以有多个分区;一个消费者组可以有多个消费者;一个消费者组可以消费同一个主题,但是不能消费同一个分区,因此出现了分区策略
- 配置分区策略 partition.assignment.strategy=Rang
- 轮询策略 按照组划分,轮询进入消费组中的所有消费者(即便未订阅)
- 可以避免分配不均匀;
- 范围策略(默认) 按照主题划分,按照订阅的人均分进入消费组中的消费者
- 每一个主题按照分区均分消费者,当除不尽的情况,第一个消费者会比其他的多一个分区,当主题越多,第一个消费者就会消费越多,出现分配不均
- 当消费者数量发生变化的时候就会自动进行分区分配策略
- 消费数据顺序性(消费者组和主题联合判定的offset)
- 当消费者组消费某一个主题数据后,会记录一个offset,记录当前消费的数据偏移量
- 当同一个消费者组重新开始消费数据的时候,会从当前的偏移量开始消费数据
- 0.9版本之后,offset保存在kafka内置的主题中(_consumer_offsets)
- consumer中配置group-id=xxx,代表当前消费者的组
5.4 kafka高效读写数据
-
顺序写入磁盘数据,避免的磁盘寻址
-
零拷贝技术(Nio技术)
# java实现demo FileChannel srcChannel = new FileInputStream(new File()).getChannel(); FileChannel destChannel = new FileInputStream(new File()).getChannel() srcChannel.transferTo(0, srcChannel.size(), destChannel);
5.5 Zookeeper在kafka中的作用
- 使用Zookeeper的分布式锁以及分布式统一协调方案
- 通过zk的临时节点来选举controller
- 通过zk的节点version实现全局唯一的brokerID
- 存储broker的状态数据(元数据)
5.6 Kafka事务
- Producer事务
- 为了实现跨会话(多分区)之间的数据一致性,引入了TransationID,并将ProducerID和TransationID进行绑定,Producer可以根据当前事务的TransationID获取到ProducerID
- 为了管理TransationID,Kafka引入了事务管理器(Trasation Coordinator),Producer通过此获取到事务ID的状态;除此之外还会将事务写进内部topic,即使宕机,也可以实现数据恢复
- Consumer事务
- 在Kafka中,可以使用事务来保证从producer到consumer的消息处理的原子性。这意味着要么所有的操作都成功,要么所有的操作都不做。
六、 Kafka API(Spring、SpringBoot)
6.1 Spring、SpringBoot依赖
<!-- Spring 依赖 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<!-- SpringBoot 依赖 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
6.2 Producer API
6.2.1 功能简介
- 消息发送流程:主线程(main)生产消息;异步线程(sender)发送
- 主线程生产消息(Producer)->拦截器拦截消息(Intercepter)->序列化(serializer)->分区策略(Partitioner)
- 所有的消息,都会进入共享变量线程(RecordAccumulator),main线程添加信息,sender从变量线程消费消息
6.2.2 JavaDemo
- Producer API
Properties properties = new Properties();
properties.put("bootstrap.servers", "119.3.221.118:9092");
properties.put("buffer.memory", 33554432);
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transactionalID");//加入事务ID
properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);//开启幂等性
properties.put("partitioner.class", "com.zcgk.xmdj.web.config.kafka.MyPartitioner"); //自定义分区策略,有默认
List<String> interceptions = Arrays.asList("com.zcgk.xmdj.web.config.kafka.MyPartitioner", "otherInterception");
properties.put("interceptor.classes", interceptions);//自定义拦截器,有默认
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
producer.initTransactions(); //初始化事务
producer.beginTransaction();//开启事务
try{
Future future = producer.send(new ProducerRecord<>("zzx", "love111"),
(recordMetadata, e) ->
System.out.println("#####" +recordMetadata.offset() + " " + recordMetadata.partition()));
try {
future.get();
} catch (Exception e) {
e.printStackTrace();
}
producer.commitTransaction();//只有提交事务,才会发送
} finally {
producer.close();// 1.关闭生产者资源2.调用拦截器的close
}
- Consumer API
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "119.3.221.118:9092");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");//offset提交延迟时间
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//开启自动提交偏移量
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "newGroup");//消费者组
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//重置消费者offset;生效满足条件:组没有被初始化(新组)或者偏移的数据被删除
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
List<String> topics = Arrays.asList("zzx", "love");
consumer.subscribe(topics);//订阅主题
Set<TopicPartition> topicPartitions = consumer.assignment();
for (TopicPartition topicPartition : topicPartitions) {
consumer.seek(topicPartition, 10);//从第十个偏移开始消费
}
while (true){
ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);//延迟1s拉取数据
consumerRecords.forEach(consumerRecord -> System.out.println("!!!!!!!!!" + consumerRecord.key() + " : "+ consumerRecord.value()));
}
6.3 消费者offset问题
- properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);//开启自动提交偏移量
- 消费者只有在初始化启动的时候,会从硬盘中读取一次offset,其余情况都是读取缓存中的临时偏移量
- 自动提交可以保证提交,但是当提交过程中,消费者宕机,将会导致部分数据不会被消费(偏移量已经修改);
- 手动提交
- 同步提交:consumer.commitSync(); 会阻塞直到提交成功
- 异步提交:consumer.commitAsync(new OffsetCommitCallback());
- 如果为了保证数据绝不能丢失,可以实现
- consumer.subscribe(topics, new ConsumerRebalanceListener()});//自定义Rebalance;
- 当有消费者发生变化的时候,会调用Rebalance进行重新分区;提供了onPartitionsRevoked(Rebalance之前)和onPartitionsAssigned(Rebalance之后)
6.4 自定义生产者分区器(实现Partitioner)
public class MyPartitioner implements Partitioner {
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) { //自定义逻辑返回分区位置
return 0;
}
}
6.5 自定义生产者拦截器(实现ProducerInterceptor)
public class KafkaInterception implements ProducerInterceptor {
private int successCount = 0;
@Override
public void configure(Map<String, ?> map) {//生产者配置信息
}
@Override
public ProducerRecord onSend(ProducerRecord producerRecord) {
return new ProducerRecord(producerRecord.topic(), producerRecord.key(),
producerRecord.timestamp() + ": " +producerRecord.value());
}
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) { //消息发送回调
if(recordMetadata != null) successCount++;
}
@Override
public void close() {//生产结束后关闭
System.out.println(successCount);
}
}
七、Kafka 监控
-
修改kafka启动命令:修改kafka-server-start.sh文件
- 将中间内容(export )替换如下
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="9999" #export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
-
下载解压安装包 kafka-eagle-bin-xxx.tar.gz
- 解压后还有一个真正的eagle压缩包,再次解压解压后的安装包
-
配置环境变量
export KE_HOME=/usr/local/eagle export PATH=$PATH:$KE_HOME/bin
-
刷新环境变量 source /etc/profile
-
配置edage
#zookeeper和kafka集群配置,多个用逗号隔开
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.1.40:2181,192.168.1.41:2181,192.168.1.41:2182
#web页面访问端口号
kafka.eagle.webui.port=8048
# 设置Zookeeper线程数
kafka.zk.limit.size=25
#offset存储地址kafka
cluster1.kafka.eagle.offset.storage=kafka
#开启图表
kafka.eagle.metrics.charts=true
#在使用Kafka SQL查询主题时,如果遇到错误,可以尝试开启这个属性
kafka.eagle.sql.fix.error=true
#错误通知邮箱配置
kafka.eagle.mail.enable=true
kafka.eagle.mail.sa=17862851291
kafka.eagle.mail.username=17862851291@163.com
kafka.eagle.mail.password=zzx19951126
kafka.eagle.mail.server.host=smtp.163.com
kafka.eagle.mail.server.port=25
#kafka存放部分元数据存储方式
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.1.40:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
kafka.eagle.username=root
kafka.eagle.password=root
- 启动edage: 进入安装目录bin目录下 执行启动脚本 ./ke.sh start
- 进入eagle后台管理端 119.3.221.118:8048/ke
- topic 主题
- Consumers 消费者组
- Metrics 监控
八、connect文件系统
-
kafka connect两大核心 worker和task
- 连接器:实现connect API,决定运行的任务,从work进程获取任务配置并传递
- 任务:负责将数据移入或移除Kafka
- work进程:或则管理连接器的配置、启动和连接任务
- 转化器:connect和其他系统的数据转化
-
文件系统
- FileStreamSource: 读取并发布内容到broker
- FileStreamSink:读取broker并提供给其他存储系统
- 修改配置文件 /config/connect-standalone.properties
- 修改配置文件 /config/connect-file-source.properties
- 修改资源文件路径 file=/temp/source.txt
- 修改端口
- 修改配置文件 /config/connect-file-sink.properties
- 修改被写入的文件路径file=/temp/sink.txt
- 修改端口 rest.port=8083
- 通过standalone启动 source和sink
- bin/connect-standalone.sh config/connect-file-source.properties
- bin/connect-standalone.sh config/connect-file-sink.properties