1. 集群部署
1.1 集群安装
(1)集群规划
在 hadoop102、hadoop103 和 hadoop104 三个节点上都部署 Zookeeper。
(2)解压安装
在 hadoop102 解压 Zookeeper 安装包到 /opt/module/ 目录下,修改 apache-zookeeper-3.5.7-bin 名称为 zookeeper-3.5.7。
(3)配置服务器编号
- 在 /opt/module/zookeeper-3.5.7/ 这个目录下创建 data 目录;
- 在 /opt/module/zookeeper-3.5.7/data 目录下创建一个 myid 的文件;
- 在文件中添加与 server 对应的编号(注意:上下不要有空行,左右不要有空格);
(4)在 zoo.cfg 中修改数据存储路径配置
dataDir=/opt/module/zookeeper-3.5.7/data
(5)zoo.cfg 增加集群配置
# server.A=B:C:D
server.2=hadoop102:2888:3888
server.3=hadoop103:2888:3888
server.4=hadoop104:2888:3888
- A 是一个数字,表示这个是第几号服务器;集群模式下配置一个文件 myid,这个文件在 dataDir 目录下,这个文件里面有一个数据就是 A 的值,Zookeeper 启动时读取此文件,拿到里面的数据与 zoo.cfg 里面的配置信息比较从而判断到底是哪个 server;
- B 是这个服务器的地址;
- C 是这个服务器 Follower 与集群中的 Leader 服务器交换信息的端口;
- D 是万一集群中的 Leader 服务器挂了,需要一个端口来重新进行选举,选出一个新的 Leader,而这个端口就是用来执行选举时服务器相互通信的端口。
(6)拷贝配置好的 zookeeper 到其他机器上
使用自定义的命令:xsync zookeeper-3.5.7
,然后分别在 hadoop103、hadoop104 上修改 myid 文件中内容为 3、4。
(7)分别启动 ./bin/zkServer.sh start
1.2 选举机制
- SID:服务器 ID。用来唯一标识一台 ZooKeeper 集群中的机器,每台机器不能重复,和 myid 一致;
- ZXID:事务 ID。ZXID 是一个事务 ID,用来标识一次服务器状态的变更。在某一时刻,集群中的每台机器的 ZXID 值不一定完全一致,这和 ZooKeeper 服务器对于客户端“更新请求”的处理逻辑有关;
- Epoch:每个 Leader 任期的代号。没有 Leader 时同一轮投票过程中的逻辑时钟值是相同的。每投完一次票这个数据就会增加。
a. 第一次启动
- Server1 启动,发起一次选举。Server1 投自己一票。此时 Server1 票数一票,不够半数以上(3票),选举无法完成,Server1 状态保持为 LOOKING;
- Server2 启动,再发起一次选举。Server1 和 2 分别投自己一票并交换选票信息:此时 Server1 发现 Server2 的 myid 比自己目前投票推举的(Server1)大,更改选票为推举 Server2。此时 Server1 票数 0 票,Server2 票数 2 票,没有半数以上结果,选举无法完成,Server 1、2 状态保持 LOOKING;
- Server3 启动,发起一次选举。此时 Server1、2 都会更改选票为 Server3。此次投票结果:Server1 为 0 票, Server2 为 0 票, Server3 为 3 票。此时 Server3 的票数已经超过半数, Server3 当选 Leader。 Server1、2 更改状态为 FOLLOWING, Server3 更改状态为 LEADING;
- Server4 启动,发起一次选举。此时 Server1、2、3 已经不是 LOOKING 状态,不会更改选票信息。交换选票信息结果:Server3 为 3 票,Server4 为 1 票。此时 Server4 服从多数,更改选票信息为 Server3,并更改状态为 FOLLOWING;
- Server5 启动,同 4 一样当小弟。
b. 非第一次启动
(1)当 ZooKeeper 集群中的一台服务器出现以下两种情况之一时,就会开始进入 Leader 选举:
- 服务器初始化启动;
- 服务器运行期间无法和 Leader 保持连接;
(2)而当一台机器进入 Leader 选举流程时,当前集群也可能会处于以下两种状态:
- 集群中本来就已经存在一个 Leader;
- 集群中确实不存在 Leader
对于第一种已经存在 Leader 的情况,机器试图去选举 Leader 时,会被告知当前服务器的 Leader 信息,对于该机器来说,仅仅需要和 Leader 机器建立连接,并进行状态同步即可。
假设 ZooKeeper 由 5 台服务器组成,SID 分别为 1、2、3、4、5,ZXID 分别为 8、8、8、7、7,并且此时 SID 为 3 的服务器是 Leader。某一时刻,3 和 5 服务器出现故障,因此开始进行 Leader 选举。
Server | EPOCH | ZXID | SID |
---|---|---|---|
1 | 1 | 8 | 1 |
2 | 1 | 8 | 2 |
4 | 1 | 7 | 4 |
选举 Leader 规则: ① EPOCH 大的直接胜出;② EPOCH 相同,事务 id 大的胜出;③ 事务 id 相同,服务器 id 大的胜出。
1.3 集群停止、启动脚本
(1)vim /home/{xxx}/bin/zk.sh
#!/bin/bash
case $1 in
"start") {
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- Zookeeper-$i start ----------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
};;
"stop") {
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- Zookeeper-$i stop ----------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
};;
"status") {
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- Zookeeper-$i status ----------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
};;
esac
(2)chmod u+x zk.sh
(3)分发到 hadoop103、hadoop104
(4)然后测试一下,是否 ok
2. 客户端命令行操作
2.1 命令行语法
(1)启动客户端:bin/zkCli.sh -server hadoop102:2181
(2)显示所有操作命令:help
2.2 节点数据信息
- cZxid:创建节点的事务 zxid。每次修改 ZooKeeper 状态都会产生一个 ZooKeeper 事务 ID。事务 ID 是 ZooKeeper 中所有修改总的次序。每次修改都有唯一的 zxid,如果 zxid1 小于 zxid2,那么 zxid1 在 zxid2 之前发生;
- ctime:znode 被创建的毫秒数(从 1970 年开始);
- mZxid:znode 最后更新的事务 zxid;
- mtime:znode 最后修改的毫秒数(从 1970 年开始);
- pZxid:znode 最后更新的子节点 zxid;
- cversion:znode 子节点变化号,znode 子节点修改次数;
- dataversion:znode 数据变化号;
- aclVersion:znode 访问控制列表的变化号;
- ephemeralOwner:如果是临时节点,这个是 znode 拥有者的 session id;如果不是临时节点则是 0;
- dataLength:znode 的数据长度;
- numChildren:znode 子节点数量。
2.3 节点类型
持久 | 短暂 | 有序号 | 无序号
- 持久(Persistent):客户端和服务器端断开连接后,创建的节点不删除;
- 短暂(Ephemeral):客户端和服务器端断开连接后,创建的节点自己删除
- 持久化目录节点:客户端与 Zookeeper 断开连接后,该节点依旧存在;
- 持久化顺序编号目录节点:客户端与 Zookeeper 断开连接后,该节点依旧存在,只是 Zookeeper 给该节点名称进行顺序编号;
- 临时目录节点:客户端与 Zookeeper 断开连接后,该节点被删除;
- 临时顺序编号目录节点:客户端与 Zookeeper 断开连接后 , 该节点被删除 , 只是 Zookeeper 给该节点名称进行顺序编号。
(1)分别创建 2 个普通节点(永久节点 + 不带序号)
# 创建节点时,要赋值
[zk: localhost:2181(CONNECTED) 3] create /sanguo "sanguo"
Created /sanguo
[zk: localhost:2181(CONNECTED) 4] create /sanguo/shuguo "liubei"
Created /sanguo/shuguo
(2)获得节点的值
[zk: localhost:2181(CONNECTED) 5] get -s /sanguo
sanguo
cZxid = 0x100000003
ctime = Wed Aug 29 00:03:23 CST 2018
mZxid = 0x100000003
mtime = Wed Aug 29 00:03:23 CST 2018
pZxid = 0x100000004
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 7
numChildren = 1
[zk: localhost:2181(CONNECTED) 6] get -s /sanguo/shuguo
liubei
cZxid = 0x100000004
ctime = Wed Aug 29 00:04:35 CST 2018
mZxid = 0x100000004
mtime = Wed Aug 29 00:04:35 CST 2018
pZxid = 0x100000004
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 6
numChildren = 0
(3)创建带序号的节点(永久节点 + 带序号)
- 先创建一个普通的根节点 /sanguo/weiguo
[zk: localhost:2181(CONNECTED) 1] create /sanguo/weiguo "caocao" Created /sanguo/weiguo
- 创建带序号的节点(如果原来没有序号节点,序号从 0 开始依次递增。如果原节点下已有 2 个节点,则再排序时从 2 开始,以此类推)
[zk: localhost:2181(CONNECTED) 2] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000000 [zk: localhost:2181(CONNECTED) 3] create -s /sanguo/weiguo/zhangliao "zhangliao" Created /sanguo/weiguo/zhangliao0000000001 [zk: localhost:2181(CONNECTED) 4] create -s /sanguo/weiguo/xuchu "xuchu" Created /sanguo/weiguo/xuchu0000000002
(4)创建短暂节点(短暂节点 + 不带序号 or 带序号)
- 创建短暂的不带序号的节点
[zk: localhost:2181(CONNECTED) 7] create -e /sanguo/wuguo "zhouyu" Created /sanguo/wuguo
- 创建短暂的带序号的节点
[zk: localhost:2181(CONNECTED) 2] create -e -s /sanguo/wuguo "zhouyu" Created /sanguo/wuguo0000000001
- 在当前客户端是能查看到的
[zk: localhost:2181(CONNECTED) 3] ls /sanguo [wuguo, wuguo0000000001, shuguo]
- 退出当前客户端然后再重启客户端
[zk: localhost:2181(CONNECTED) 12] quit [liujiaqi@hadoop104 zookeeper-3.5.7]$ bin/zkCli.sh
- 再次查看根目录下短暂节点已经删除
[zk: localhost:2181(CONNECTED) 0] ls /sanguo [shuguo]
(5)修改节点数据值
[zk: localhost:2181(CONNECTED) 6] set /sanguo/weiguo "simayi"
2.4 监听器原理
a. 流程
- 首先要有一个 main 线程;
- 在 main 线程中创建 Zookeeper 客户端,这时就会创建两个线程,一个负责网络连接通信(connet),一个负责监听(listener);
- 通过 connect 线程将注册的监听事件发送给 Zookeeper;
- 将注册的监听事件添加到 Zookeeper 的注册监听器列表中;
- Zookeeper 监听到有数据或路径变化,就会将这个消息发送给 listener 线程;
- listener 线程内部调用了 process() 方法。
b. 命令
- 监听节点数据的变化:
get path [watch]
- 监听子节点增减的变化:
ls path [watch]
(1)节点的值变化监听
- 在 hadoop104 主机上注册监听 /sanguo 节点数据变化
[zk: localhost:2181(CONNECTED) 26] get -w /sanguo
- 在 hadoop103 主机上修改 /sanguo 节点的数据
[zk: localhost:2181(CONNECTED) 1] set /sanguo "yanyi"
- 观察 hadoop104 主机收到数据变化的监听
WATCHER:: WatchedEvent state:SyncConnected type:NodeDataChanged path:/sanguo
【注】在 hadoop103 再多次修改 /sanguo 的值,hadoop104 上不会再收到监听。因为注册 1 次,只能监听 1 次。想再次监听,需要再次注册。
(2)节点的子节点变化监听(路径变化)
- 在 hadoop104 主机上注册监听 /sanguo 节点的子节点变化
[zk: localhost:2181(CONNECTED) 1] ls -w /sanguo [shuguo, weiguo]
- 在 hadoop103 主机 /sanguo 节点上创建子节点
[zk: localhost:2181(CONNECTED) 2] create /sanguo/jin "simayi" Created /sanguo/jin
- 观察 hadoop104 主机收到子节点变化的监听
WATCHER:: WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sanguo
【注】节点的路径变化,也是注册 1 次,生效 1 次。想多次生效,就需要多次注册。
2.5 节点删除与查看
- 查看节点状态
[zk: localhost:2181(CONNECTED) 17] stat /sanguo
- 删除节点
[zk: localhost:2181(CONNECTED) 4] delete /sanguo/jin
- 递归删除节点
[zk: localhost:2181(CONNECTED) 15] deleteall /sanguo/shuguo
3. 客户端 API 操作
3.1 Idea 环境搭建
pom.xml
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.7</version>
</dependency>
</dependencies>
需要在项目的 src/main/resources 目录下,新建一个文件,命名为“log4j.properties”,在文件中填入:
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/spring.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
3.2 连接 Zk
public class ZkClient {
/**
* 注意:逗号左右不能有空格
*/
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zkClient;
@Before
public void init() throws Exception {
zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// getChildrenTest的回调处理
try {
System.out.println("\r\n========= ↓ watch ↓ =========");
List<String> children = zkClient.getChildren("/", true);
System.out.println(children);
System.out.println("========= ↑ watch ↑ =========");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
@Test
public void createZNodeTest() throws KeeperException, InterruptedException {
String zNode = zkClient.create("/tree6x7", "liujiaqi".getBytes(CharsetUtil.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(zNode);
}
@Test
public void getChildrenTest() throws KeeperException, InterruptedException {
// 设置成true会使用 new ZooKeeper 时使用的那个 Watch (监听只能用一次, 在watch里再次注册)
zkClient.getChildren("/", true);
// 延时阻塞
Thread.sleep(Long.MAX_VALUE);
}
@Test
public void existTest() throws KeeperException, InterruptedException {
Stat exists = zkClient.exists("/testExist", false);
System.out.println(exists != null ? "Exist!" : "Null!");
}
}
getChildrenTest 控制台打印:
3.3 写数据流程
写流程之写入请求直接发送给 Leader 节点:
写流程之写入请求发送给 Follower 节点:
4. 案例:服务器动态上下线
4.1 需求分析
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
4.2 具体实现
a. Zk 创建根节点
b. 客户端代码
package io.tree6x7.zk.case1;
/**
* @author 6x7
* @Description 客户端监听/servers
* @createTime 2022年03月08日
*/
public class MyClient {
private String rootZNode = "/servers";
private String connectUrl = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
public static void main(String[] args) throws Exception {
MyClient myClient = new MyClient();
// 1. 获取 Zk 连接
myClient.connect();
// 2. 监听 /servers 下面子节点的增加和删除
myClient.getServerList();
// 3. 业务逻辑
myClient.work();
}
private void work() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private List<String> getServerList() throws KeeperException, InterruptedException {
return zk.getChildren(rootZNode, true);
}
private void connect() throws IOException {
zk = new ZooKeeper(connectUrl, sessionTimeout, watchEvent -> {
try {
System.out.println("\r\n========= ↓ watch ↓ =========");
List<String> serverList = getServerList();
List<String> retServerData = new ArrayList<>(serverList.size());
for (String server : serverList) {
byte[] data = zk.getData(rootZNode + "/" + server, false, null);
retServerData.add(new String(data));
}
System.out.println(retServerData);
System.out.println("========= ↑ watch ↑ =========");
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
c. 服务端代码
package io.tree6x7.zk.case1;
import io.netty.util.CharsetUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
/**
* @author 6x7
* @Description 服务器注册到/servers
* @createTime 2022年03月08日
*/
public class MyServer {
private String rootZNode = "/servers";
private String connectUrl = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int sessionTimeout = 2000;
private ZooKeeper zk;
public static void main(String[] args) throws Exception {
// 1. 获取 Zk 连接
MyServer myServer = new MyServer();
myServer.connect();
// 2. 向 Zk 注册(创建临时、有序的节点)
myServer.register(args[0]);
// 3. 启动业务逻辑
myServer.work();
}
private void work() throws InterruptedException {
Thread.sleep(Long.MAX_VALUE);
}
private void register(String hostname) throws KeeperException, InterruptedException {
String zNode = zk.create(rootZNode + "/" + hostname, hostname.getBytes(),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("[ZNode] " + zNode + " is online ...");
}
private void connect() throws IOException {
zk = new ZooKeeper(connectUrl, sessionTimeout, watchEvent -> {});
}
}
d. 运行、测试
- 先启动 MyClient,然后手动向 Zk 中注册两个节点查看是否能正常监听;
- 在 MyServer 的 Run/Debug Configurations 中勾选 Allow parallel run,然后在 Program arguments 中写入参数,每启动一个 MyServer 就换一个数值。
5. 案例:分布式锁
5.1 需求分析
什么叫做分布式锁呢?
比如说“进程1”在使用该资源的时候,会先去获得锁,“进程 1”获得锁以后会对该资源保持独占,这样其他进程就无法访问该资源,“进程1”用完该资源以后就将锁释放掉,让其他进程来获得锁,那么通过这个锁机制,我们就能保证了分布式系统中多个进程能够有序的访问该临界资源。那么我们把这个分布式环境下的这个锁叫作「分布式锁」。
5.2 代码实现
a. 锁实现
public class DistributedLock {
private String rootZNode = "/locks";
private String rootZNodeName = "locks";
private String lockPrefix = "seq-";
private String connectUrl = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private String waitNodePath;
private int sessionTimeout = 2000;
private ZooKeeper zk;
private CountDownLatch connectLatch = new CountDownLatch(1);
private CountDownLatch waitPreNodeReleaseLatch = new CountDownLatch(1);
private String zNode;
public DistributedLock() throws Exception {
// 1. 获取连接
zk = new ZooKeeper(connectUrl, sessionTimeout, watchEvent -> {
if (watchEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
connectLatch.countDown();
}
if (watchEvent.getType() == Watcher.Event.EventType.NodeDeleted
&& watchEvent.getPath().equals(waitNodePath)) {
waitPreNodeReleaseLatch.countDown();
}
});
// --- 等待 Zk 正常连接后,程序才往下走 ---
connectLatch.await();
// 2. 判断根节点 locks 是否存在
Stat stat = zk.exists(rootZNode, false);
if (stat == null) {
zk.create(rootZNode, rootZNodeName.getBytes(CharsetUtil.UTF_8),
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* 对 Zk 加锁
*/
public void lock() throws KeeperException, InterruptedException {
// 1. 创建临时、带序号节点
zNode = zk.create(rootZNode + "/" + lockPrefix, null,
ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 2. 判断创建的节点是否是最小的序号节点
List<String> children = zk.getChildren(rootZNode, false);
if (children.size() == 1) {
// --- 是,获取到锁
return;
} else {
// --- 否,监听更小序号的节点
Collections.sort(children);
// seq-00000000
String currentNode = zNode.substring((rootZNode + "/").length());
int index = children.indexOf(currentNode);
if (index == -1) {
throw new RuntimeException("数据异常");
} else if (index == 0) {
// 第一个位置,直接获取锁
return;
} else {
// 监听前一个节点
waitNodePath = rootZNode + "/" + children.get(index - 1);
zk.getData(waitNodePath, true, null);
waitPreNodeReleaseLatch.await();
return;
}
}
}
/**
* 对 Zk 解锁
*/
public void release() throws KeeperException, InterruptedException {
// 1. 删除节点
zk.delete(zNode, -1);
}
}
b. 测试代码
public class DistributedLockTest {
public static void main(String[] args) throws Exception {
DistributedLock lock1 = new DistributedLock();
DistributedLock lock2 = new DistributedLock();
new Thread(() -> {
try {
lock1.lock();
System.out.println("[lock1] 获取锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("[lock1] 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
lock2.lock();
System.out.println("[lock2] 获取锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("[lock2] 释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
5.3 Curator 框架实现分布式锁
a. 引入依赖
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
<version>4.3.0</version>
</dependency>
b. 测试代码
public class CuratorTest {
private String rootNode = "/locks";
private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181";
private int connectionTimeout = 2000;
private int sessionTimeout = 2000;
public static void main(String[] args) {
new CuratorTest().test();
}
private void test() {
InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
new Thread(() -> {
try {
lock1.acquire();
System.out.println("Thread-1 获取锁");
// 测试锁重入
lock1.acquire();
System.out.println("Thread-1 再次获取锁");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("Thread-1 释放锁");
lock1.release();
System.out.println("Thread-1 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
lock2.acquire();
System.out.println("Thread-2 获取锁");
// 测试锁重入
lock2.acquire();
System.out.println("Thread-2 再次获取锁");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("Thread-2 释放锁");
lock2.release();
System.out.println("Thread-2 再次释放锁");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
/**
* 分布式锁初始化
* @return
*/
public CuratorFramework getCuratorFramework() {
// 重试策略,初试时间 3 秒,重试 3 次
RetryPolicy policy = new ExponentialBackoffRetry(3000, 3);
// 通过工厂创建 Curator
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString)
.connectionTimeoutMs(connectionTimeout)
.sessionTimeoutMs(sessionTimeout)
.retryPolicy(policy).build();
// 开启连接
client.start();
System.out.println("zookeeper 初始化完成...");
return client;
}
}
6. Questions
6.1 选举机制
半数机制,超过半数的投票通过,即通过。
- 第一次启动选举规则:投票过半数时,服务器 id 大的胜出
- 第二次启动选举规则:
- EPOCH 大的直接胜出
- EPOCH 相同,事务 id 大的胜出
- 事务 id 相同,服务器 id 大的胜出
6.2 部署数目
生产集群安装多少 zk 合适?安装奇数台。
生产经验:
- 10 台服务器:3 台 zk;
- 20 台服务器:5 台 zk;
- 100 台服务器:11 台 zk;
- 200 台服务器:11 台 zk。
服务器数多:
- 好处,提高可靠性;
- 坏处:提高通信延时。
6.3 常用命令
ls、get、create、delete
标签:14,zk,Zookeeper,private,2181,集群,sanguo,new,节点 From: https://www.cnblogs.com/liujiaqi1101/p/17107408.html