zookeeper
所谓分布式系统就是在不同的地域分布的多个服务器,共同组成一个应用系统来为用户提供服务,在分布式系统中最重要的是进程调度。多个进程的应用需要竞争资源,此时需要一个协调器来让多个进程有序的访问这个资源。这个协调器就是分布式系统中经常使用的锁机制,例如进程1使用资源的时候,获得锁,保持对该资源独占,此时其他进程就无法访问该资源,而进程1在使用完该资源后将锁释放掉,以便要其他进程来获得锁。通过锁机制,可以保证分布式系统中多个进程能够有序的访问该共享资源。在分布式环境中这个锁就叫做分布式锁,它就是分布式协调技术实现的核心内容。
zookeeper是一种为分布式应用所设计的高可用,高性能开源协调服务,它提供了一项基本服务,分布式锁服务,同时也提供了数据维护和管理机制,如统一命名服务,状态同步服务,集群管理,分布式消息列队,分布式应用配置项的管理等等。
zookeeper的应用场景
在分布式锁中,有一种典型应用场景,就是通过对进群进行 master角色选举,来解决分布式系统中的单点问题。所谓单点故障,就是在一个主从的分布式系统中,主节点负责任务调度分发,从节点任务的处理,而当主节点发故障的时候,整个应用系统也就瘫痪了。
传统的解决单点问题的方式是采用一个备用节点,这个备用节点定期向主节点发送ping包,主节点收到ping包后向备节点回复ack信息,当备节点收到回复的时候认为当前主节点运行正常。当主节点故障的时候,备用节点就无法收到回复的信息了,此时备用节点认为主节点宕机,然后接替它成为信的主节点继续提供服务。
这种传统的解决单点故障的方法,虽然在一定程度上解决了问题,但是有一个问题,就是当网络故障的时候,主从都会认为对方挂掉,都会成为主节点,就是双master的情况,一旦出现该情况,就会导致整个系统混乱不可用。
zookeeper工作原理
master启动
分布式系统引入zookeeper后,就可以配置多个主节点。以配置两个节点为例,当他们都启动的时候,都会向zookeeper注册节点信息,注册完毕后,进行选举,选举由多种算法,例如以最小编号作为选举算法,那么编号小的就在在选举中获胜并获取锁成为主节点,通过这种方式完成主备节点的分配和协作。
master故障
如果主节点发生了故障,此时它在zookeeper中注册的节点信息会被删除,而zookeeper会自动感知节点的变化,发现故障后,再次发出选举,此时备节点选举获胜,这样就完成了主备节点的重新选举。
master恢复
如果主节点恢复了,它会再次向zookeeper注册自身的节点信息,此时它注册的节点信息会改名,并不是原来的信息。zookeeper会感知节点的变化从而再次发动选举
zookeeper就是通过这样协调,调度机制进行反复的对进行进行管理和状态同步。
zookeeper集群架构
zookeeper集群主要角色有server和client,其中server又分leader,follower和observer三个角色。每个角色含义如下:
- leader领导者角色 主要负责发起和决议,以及更新系统状态
- follower跟随者角色,用于接收客户端的请求并返回结果给客户端,在选举过程中参与投票
- observer观察者角色,用户接收客户端的请求,并将请求发送给leader,同时同步leader状态,但是不参与投票,observer的目的是扩展系统,提高伸性
- client 客户端角色,用于向zookeeper发起请求
zookeeper集群中每个server在内存中都存储了一份数据,在启动的时候,将从实例中选举一个server作为leader, leader负责处理更新等操作,当且大多数server在内存中成功修改数据,才认为数据修改成功。
zookeeper写流程,客户端clinet首先和一个server或者observe通信,发起写请求,然后server将请求转发给leader,leader再将请求转发给其他server,其他 server再接收到请求后写入数据并响应leader,leader再接收到大多数写成功回应后,认为数据写成功,最后响应client,完成一次写操作过程。
kafka基础
概念
kafka是一个高吞吐量的分布式发布订阅消息系统,它的最大特性就是可以实时处理大量数据满足各种需求场景,现在它已经被多家大型公司作为多种类型数据的管道和消息系统使用。
角色
- Broker:Kafka集群包含一个或多个服务器,每个服务器被称为broker。
- Topic:每条发布到Kafka集群的消息都有一个分类,这个类别被称为Topic(主题)。
- Producer:指消息的生产者,负责发布消息到Kafka broker。
- Consumer :指消息的消费者,从Kafka broker拉取数据,并消费这些已发布的消息。
- Partition:Parition是物理上的概念,每个Topic包含一个或多个Partition,每个partition都是一个有序的队列。partition中的每条消息都会被分配一个有序的id(称为offset)。
- Consumer:Group:消费者组,可以给每个Consumer指定消费者组,若不指定消费者组,则属于默认的group。
- Message:消息,通信的基本单位,每个producer可以向一个topic发布一些消息。
拓扑架构
一个kafka集群包含若干producer,若干broker,若干consumer group,以及一个zookeeper集群,它通过zookeeper管理集群配置,选举leader,以及再consumer group发生变化的时候进行rebalance。producer使用push模式将消息发布到borker,consumer使用pull模式从broker订阅并消费消息。
典型的消息系统有生产者(Producer),存储系统(broker)和消费者(Consumer)组成,Kafka作为分布式的消息系统支持多个生产者和多个消费者,生产者可以将消息分布到集群中不同节点的不同Partition上,消费者也可以消费集群中多个节点上的多个Partition。在写消息时允许多个生产者写到同一个Partition中,但是读消息时一个Partition只允许被一个消费组中的一个消费者所消费,而一个消费者可以消费多个Partition。也就是说同一个消费组下消费者对Partition是互斥的,而不同消费组之间是共享的。
kafka支持消息持久化存储,持久化数据保存在kafka的日志文件中,在生产者生产消息后,kafka不会直接把消息传递给消费者,而是先要在broker中进行存储,为了减少磁盘写入的次数,broker会将消息暂时缓存起来,当消息的个数或尺寸、大小达到一定阀值时,再统一写到磁盘上,这样不但提高了kafka的执行效率,也减少了磁盘IO调用次数。
kafka中每条消息写到partition中,是顺序写入磁盘的,这个很重要,因为在机械盘中如果是随机写入的话,效率将是很低的,但是如果是顺序写入,那么效率却是非常高,这种顺序写入磁盘机制是kafka高吞吐率的一个很重要的保证。
Topic与Partition
Kafka中的topic是以partition的形式存放的,每一个topic都可以设置它的partition数量,Partition的数量决定了组成topic的log的数量。推荐partition的数量一定要大于同时运行的consumer的数量。另外,建议partition的数量大于集群broker的数量,这样消息数据就可以均匀的分布在各个broker中。
那么,Topic为什么要设置多个Partition呢,这是因为kafka是基于文件存储的,通过配置多个partition可以将消息内容分散存储到多个broker上,这样可以避免文件尺寸达到单机磁盘的上限。同时,将一个topic切分成任意多个partitions,可以保证消息存储、消息消费的效率,因为越多的partitions可以容纳更多的consumer,可有效提升Kafka的吞吐率。因此,将Topic切分成多个partitions的好处是可以将大量的消息分成多批数据同时写到不同节点上,将写请求分担负载到各个集群节点。
在存储结构上,每个partition在物理上对应一个文件夹,该文件夹下存储这个partition的所有消息和索引文件。partiton命名规则为topic名称+序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
在每个partition(文件夹)中有多个大小相等的segment(段)数据文件,每个segment的大小是相同的,但是每条消息的大小可能不相同,因此segment 数据文件中消息数量不一定相等。segment数据文件有两个部分组成,分别为index file和data file,此两个文件是一一对应,成对出现,后缀.index和“.log”分别表示为segment索引文件和数据文件。
Producer生产机制
Producer是消息和数据的生产者,它发送消息到broker时,会根据Paritition机制选择将其存储到哪一个Partition。如果Partition机制设置的合理,所有消息都可以均匀分布到不同的Partition里,这样就实现了数据的负载均衡。如果一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不同的消息可以并行写入不同broker的不同Partition里,极大的提高了吞吐率。
Consumer消费机制
Kafka发布消息通常有两种模式:队列模式(queuing)和发布/订阅模式(publish-subscribe)。在队列模式下,只有一个消费组,而这个消费组有多个消费者,一条消息只能被这个消费组中的一个消费者所消费;而在发布/订阅模式下,可有多个消费组,每个消费组只有一个消费者,同一条消息可被多个消费组消费。
Kafka中的Producer和consumer采用的是push(推送)、pull(拉取)的模式,即Producer只是向broker push消息,consumer只是从broker pull消息,push和pull对于消息的生产和消费是异步进行的。pull模式的一个好处是Consumer可自主控制消费消息的速率,同时Consumer还可以自己控制消费消息的方式是批量的从broker拉取数据还是逐条消费数据。
zookeeper的配置文件解析
zookeeper配置文件zookeeper.properties
dataDir=/data/kafka/zookeeper
dataLogDir=/data/kafka/log/zookeeper
# the port at which the clients will connect
clientPort=2181
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=0
# Disable the adminserver by default to avoid port conflicts.
# Set the port to something non-conflicting if choosing to enable this
admin.enableServer=false
# admin.serverPort=8080
maxClientCnxns=100
tickTime=2000
initLimit=10
syncLimit=5
server.1=flow-sz-kafka-1:2888:3888
server.2=flow-sz-kafka-2:2888:3888
server.3=flow-sz-kafka-3:2888:3888
配置文件说明
- tickTime=2000 单位为毫秒,用来控制心跳和超时,zookeeper的基本时间度量单位
- initLimit=10 配置follower服务器初始化连接到leader,最长能忍受的多少个心跳时间间隔数,默认10个,既10*2000=20秒
- syncLimit=5 leader与follower之间发送消息,请求和应答时间长度不能超过多少个tickTime的时间长度,总长时间是5*2000=10秒
- maxClientCnxns=100 最大客户端连接数量
- dataDir=/data/kafka/zookeeper 必须配置。用于存储快照文件的目录
- dataLogDir=/data/kafka/log/zookeeper 日志存储路径
- clientPort=2181 进程监听端口,默认2181
- server.1=flow-sz-kafka-1:2888:3888 ,.1代表是第一个服务器,2888表示这个服务器与集群中的leader服务器通信的端口,3888表示如果集群中的leader服务器器宕机了,需要一个新端口来选举,来选举leader,该端口就是用来执行选举服务器互相通信的端口。
除了修改配置文件之外,进群模式下还要配置一个文件myid,这个文件要放在dataDir配置项指定的目录下。这个文件里中只有一个数字,例如1,代表第一个服务器,与server.1中对应,其他server2,也要写入myid中,写入2,其他一次类推。zookeeper会在启动的时候读取这个文件夹,得到的信息跟配置文件进行比较,从而判断每个zookeeper server的对应关系。
#
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
#-daemon 代表后台进程运行
kafka配置文件解析
kafka配置文件 server.properties
broker.id=1
listeners=PLAINTEXT://flow-sz-kafka-1:9092
# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3
# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=12
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1024000
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1024000
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=1048576000
# A comma separated list of directories under which to store log files
log.dirs=/data/kafka/kafka-logs
num.partitions=3
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
delete.topic.enable=true
log.retention.hours=24
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=flow-sz-kafka-1:2181,flow-sz-kafka-2:2181,flow-sz-kafka-3:2181
zookeeper.connection.timeout.ms=30000
group.initial.rebalance.delay.ms=0
必要参数配置
- broker.id 每个broker的在集群中的唯一标识,当该服务器的IP地址发生变化,broker.id没有变化,不会影响consumers的消息情况
- listeners=PLAINTEXT://flow-sz-kafka-1:9092 设置监听地址和端口,可以设置为主机名或者IP地址,如果设置主机名需要配置/etc/hosts
- log.dirs=/data/kafka/kafka-logs 配置保存kafka保存数据的位置,kafka中所有的数据都会存在这个目录下。可以使用逗号来指定多个路径。
- num.partitions=3 设置新的topic又多少分区,可以根据实际消费情况配置,配置过小影响性能
- log.retention.hours=24 配置消息保存时间
- log.segment.bytes=1073741824 配置parition中每个segment数据文件大小
- delete.topic.enable=true 配置是否可以删除topic,
- zookeeper.connect=flow-sz-kafka-1:2181, 指定zookeeper所在的地址。它存储了broker的元信息
#运行
bin/kafka-server-start.sh -daemon config/server.properties
kafka常用命令
- 创建topic
一般只要在一台上创建即可
bin/kafka-topics.sh --create --topic traffic-sz --partitions 3 --replication-factor 2 --zookeeper localhost:2181
#--create 创建
#--replication-factor: 副本数
#--partitions : 指定topic分区数,一般设置小于或者登陆kafa节点
#--topic 要创建的topic名称
#--zookeeper 指定zookeeper地址
- 列出所有topic
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
- 查看指定的topic
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --topic topic-test --describe
其中
leader表示现在正在读写的leader borker
replicas 表示当前分区所有副本的对应的broker列表
isr 表示处于活跃状态的broker
- 修改topic配置
bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name kscan-urls-original --alter --add-config retention.ms=86400000
- 查看consumer group,分为新版和旧版(kafka都支持)
- 新版信息保存在broker上,使用--new-consumer参数
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --list
- 老版信息保存在zookeeper上
bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --list
- 查看特定consumer group 详情,使用--group与--describe参数
新版:bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server 127.0.0.1:9092 --group test --describe
老版:bin/kafka-consumer-groups.sh --zookeeper 127.0.0.1:2181 --group test --describe
- 删除topic
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --delete --topic testtopic
生产消费消息
开启一个终端,生产消息
#老版
bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic topic-test
#新版
bin/kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic topic-test
开启另外一个终端,消费消息
#老版
bin/kafka-console-consumer.sh --broker-list 127.0.0.1:9092 --topic topic-test
#新版
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic-test