安装kafka
- 首先安装jdk,zookeeper,kafka
将压缩包放进linux目录/opt/mySoftware
下,并依次执行下面的命令
tar -zxvf jdk-8u361-linux-x64.tar.gz
tar -zxvf zookeeper-3.4.12.tar.gz
tar -zxvf kafka_2.11-2.0.0.tgz
解压后会生成文件夹jdk1.8.0_361``kafka_2.11-2.0.0
zookeeper-3.4.12
- 配置环境变量
vim /etc/profile
然后到文件最下方,添加下面的内容
# jdk
export JAVA_HOME=/opt/mySoftware/jdk1.8.0_361
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
# zookeeper
export ZOOKEEPER_HOME=/opt/mySoftware/zookeeper-3.4.12
export PATH=$PATH:$ZOOKEEPER_HOME/bin
# kafka
export KAFKA_HOME=/opt/mySoftware/kafka_2.11-2.0.0
export PATH=$PATH:$KAFKA_HOME/bin
添加完成后保存并退出,然后执行命令使配置生效
source /etc/profile
验证是否成功
java -version
切换目录,创建
tmp
目录,打开配置目录,修改zookeeper配置文件名字
cd /opt/mySoftware/zookeeper-3.4.12
mkdir tmp
cd conf
cp zoo_sample.cfg zoo.cfg
打开zoo.cfg
vim zoo.cfg
修改/添加以下内容后,保存退出
# 数据目录
dataDir=/opt/mySoftware/zookeeper-3.4.12/tmp/data
# 日志目录
dataLogDir=/opt/mySoftware/zookeeper-3.4.12/tmp/log
创建zookeeper服务器编号
cd /opt/mySoftware/zookeeper-3.4.12/tmp
mkdir data
touch /opt/mySoftware/zookeeper-3.4.12/tmp/data/myid
vim /opt/mySoftware/zookeeper-3.4.12/tmp/data/myid
在myid文件中添加一个数字0
启动zookeeper
zkServer.sh start 启动服务
zkServer.sh status 查看服务状态
zkServer.sh stop 停止服务
zkCli.sh 客户端连接
然后修改kafka的server.properties
cd /opt/mySoftware/kafka_2.11-2.0.0/config
vim server.properties
修改这些内容
# broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
broker.id=0
# broker对外提供的服务入口地址,下面的ip是虚拟机的ip
listeners=PLAINTEXT://192.168.162.124:9092
# 存放消息日志文件的地址
log.dirs=/tmp/kafka-logs
# Kafka所需的ZooKeeper集群地址,为了方便演示,我们假设Kafka和ZooKeeper都安装在本机
zookeeper.connect=localhost:2181/kafka
启动kafka
在/opt/mySoftware/kafka_2.11-2.0.0
目录下执行下面的命令
# 前台启动
bin/kafka-server-start.sh config/server.properties
# 后台启动
bin/kafka-server-start.sh config/server.properties &
//关闭后台运行
bin/kafka-server-stop.sh
基本概念
生产者:Producer,发送消息的一方,将消息发送给kafka
broker:翻译过来是经纪人,代理人,通常情况下将它看作kafka实例
消费者:Consumer,接收消息的一方,从kafka上拉取消息
主题:Topic,kafka中,消息依据主题进行分类,生产者将消息发送到特定的主题,消费者从特定的主题接收消息
分区:一个主题可以分为多个分区,一个分区只能属于一个主题。同一个主题下不同分区的消息是不同的,分区在存储层面可以看作一个可追加的日志,消息在追加到分区时会分配一个偏移量offset,offset是消息在分区中的唯一标识,kafka根据offset来保证消息在分区内的顺序性。offset不跨越分区,也就是说消息在分区内是有序的而不是主题有序的
分区多副本机制:在分区中,有多个副本,保存的是相同的消息,副本之间是一主多从的关系,leader副本负责处理读写请求,follower副本只负责和leader保持同步,副本处于不同的broker,当一个leader出现故障,会从follower中重新选出leader对外提供服务,这样就保证了在某个broker失效时仍然能保证服务可用
不仅broker会失效,consumer也会发生故障宕机,消费端也是具有一定的容灾能力的,当consummer拉取消息时,会保存拉取消息的位置(从哪里拉取的消息),当重新恢复后,会根据之前保存的位置重新拉取进行消费
AR,ISR,OSR:包括leader和follower在内的所有副本叫做AR(all),所有和leader保持一定程度同步的副本(包含leader)叫做ISR(in),与leader同步滞后过多的副本组成OSR(out),AR=ISR+OSR,正常情况下AR=ISR,即所有副本的同步程度都比较好,leader负责跟踪和维护这两个集合,如果ISR中的副本滞后过多就移除,如果OSR同步追上则加入到ISR,当leader故障重选leader时,只能在ISR中选
LSO,HW,LEO:HW是高水位,在这个标志之前的消息是可以被消费者拉取的,HW及之后的消息对消费者不可见
几个例子:
kafka的复制机制是介于同步复制和异步复制之间的一种均衡的状态,如果是同步复制,只有所有的follower都拷贝了leader的消息副本,这条消息才算发送成功,这种情况,效率很低;如果是单纯的异步复制,则当leader写入了消息就认为发送成功,如果follower还没来得及拷贝,leader就宕机,就会造成数据丢失
fdsf