前言
我的电脑内存只有8G,搭建的集群虚拟机配置如下,本案例也是可以跑的,学习视频为尚硅谷的Zookeeper教程:
https://www.bilibili.com/video/BV1to4y1C7gw?p=1&vd_source=c85b4a015a69e82ad4f202bd9b87697f
需求分析
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线服务器注册
首先开启集群zk服务,再在集群上创建/servers节点
[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
服务器端向 Zookeeper 注册代码
1 package com.atguigu.case1; 2 3 import org.apache.zookeeper.*; 4 5 import java.io.IOException; 6 7 /** 8 * @author 疯了疯了要疯了 9 * @create 2022-08-14-14:28 10 **/ 11 public class DistributeServer { 12 13 private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; 14 private int sessionTimeout = 90000; 15 private ZooKeeper zk; 16 17 public static void main(String[] args) throws IOException, KeeperException, InterruptedException { 18 19 DistributeServer server = new DistributeServer(); 20 21 //1. 获取zk连接 22 server.getConnect(); 23 //2. 注册服务器到zk集群,其实就是创建节点(临时,带序号) 24 server.regist(args[0]); 25 //3. 启动业务逻辑(睡觉) 26 server.business(); 27 28 29 } 30 // 业务功能 31 private void business() throws InterruptedException { 32 Thread.sleep(Long.MAX_VALUE); 33 } 34 // 注册服务器 35 private void regist(String hostname) throws KeeperException, InterruptedException { 36 String create = zk.create("/servers"+hostname, hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); 37 38 System.out.println(hostname + "is online" + create); 39 } 40 // 创建到 zk 的客户端连接 41 private void getConnect() throws IOException { 42 43 zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { 44 45 @Override 46 public void process(WatchedEvent event) { 47 48 } 49 }); 50 } 51 }
注意如果程序报错KeeperErrorCode = ConnectionLoss的话,可以将sessionTimeout的时间设置长些。
服务器监听
客户端代码
1 package com.atguigu.case1; 2 3 import org.apache.zookeeper.KeeperException; 4 import org.apache.zookeeper.WatchedEvent; 5 import org.apache.zookeeper.Watcher; 6 import org.apache.zookeeper.ZooKeeper; 7 8 import java.io.IOException; 9 import java.util.ArrayList; 10 import java.util.List; 11 12 /** 13 * @author 疯了疯了要疯了 14 * @create 2022-08-14-14:45 15 **/ 16 public class DistributeClient { 17 18 private String connectString = "hadoop102:2181,hadoop103:2181,hadoop104:2181"; 19 private int sessionTimeout = 90000; 20 private ZooKeeper zk; 21 22 public static void main(String[] args) throws IOException, KeeperException, InterruptedException { 23 DistributeClient client = new DistributeClient(); 24 25 //1. 获取zk连接 26 client.getConnect(); 27 //2. 监听servers下面子节点的增加和删除 28 client.getServerList(); 29 //3. 业务逻辑(睡觉) 30 client.business(); 31 32 } 33 34 private void business() throws InterruptedException { 35 Thread.sleep(Long.MAX_VALUE); 36 } 37 //1.获取服务器子节点信息,并且对父节点进行监听 38 private void getServerList() throws KeeperException, InterruptedException { 39 List<String> children = zk.getChildren("/servers", true); 40 // 2 存储服务器信息列表 41 ArrayList<String> servers = new ArrayList<>(); 42 // 3 遍历所有节点,获取节点中的主机名称信息 43 for (String child : children) { 44 byte[] data = zk.getData("/servers/" + child, false, null); 45 46 servers.add(new String(data)); 47 } 48 49 // 4 打印服务器列表信息 50 System.out.println(servers); 51 } 52 //创建到zk的客户端连接 53 private void getConnect() throws IOException { 54 zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() { 55 56 @Override 57 public void process(WatchedEvent event) { 58 try { 59 //再次启动监听 60 getServerList(); 61 } catch (KeeperException e) { 62 e.printStackTrace(); 63 } catch (InterruptedException e) { 64 e.printStackTrace(); 65 } 66 } 67 }); 68 } 69 }
测试
1)在 Linux 命令行上操作增加,减少服务器
(1)启动 DistributeClient 客户端
(2)在 hadoop102 上 zk 的客户端/servers 目录上创建临时带序号节点
[zk: localhost:2181(CONNECTED) 1] create -e -s /servers/hadoop101 "hadoop101" [zk: localhost:2181(CONNECTED) 2] create -e -s /servers/hadoop101 "hadoop102"
(3)观察 Idea 控制台变化
(4)执行删除操作
[zk: localhost:2181(CONNECTED) 8] delete /servers/hadoop1010000000004