ZooKeeper的api的使用Hadoop的HA模式
初始化ZooKeeper客户端
与HDFS和Hive的链接类似,但是Zookeeper中引入了监听器的机制,关于监听器的使用,会在后面提到
// 初始化zk的客户端
@Before
public void init() throws IOException {
//zk集群的地址
String connectString = "bigdata04:2181,bigdata05:2181,bigdata06:2181";
//zk的会话超时时间(单位:毫秒)
int sessionTimeout = 2000;
//创建zk客户端对象
//使用匿名内部类实训Watcher接口
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
//重写process方法,该方法为监听器的回调方法
//当监听器被注册时 回调函数处于异步状态
//当监听器被触发时 回调函数开始执行
@Override
public void process(WatchedEvent watchedEvent) {
System.out.println("-------------获取监听数据--------------");
try {
List<String> childrenNodeList = zk.getChildren("/", true);
//进行遍历
for (String item :childrenNodeList){
System.out.println("监听器获取了"+item);
}
} catch (KeeperException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
});
}
创建节点
//创建节点
@Test
public void createNode() throws InterruptedException, KeeperException {
//节点的路径
String path = "/node1_api";
//节点中储存的数据
byte[] dataBytes="abc".getBytes();
//设置节点的访问权限列表
//OPEN_ACL_UNSAFE: 表示使用完全开放的ACL权限 允许客户端对节点进行读写操作
ArrayList<ACL> aclList = ZooDefs.Ids.OPEN_ACL_UNSAFE;
//创建持久非顺序节点
CreateMode createMode =CreateMode.PERSISTENT;
//创建持久非顺序节点
String node1 = zk.create(path, dataBytes, aclList, createMode);
System.out.println(node1);
// //创建持久顺序节点
// String node2 = zk.create("/node2_api", "DEF".getBytes(), aclList, CreateMode.PERSISTENT_SEQUENTIAL);
// System.out.println(node2);
// //临时节点只在当前会话有效,会随着java代码的结束会话而结束
// //创建临时非顺序节点
// String node3 = zk.create("/node3_api", "123".getBytes(), aclList, CreateMode.EPHEMERAL);
// System.out.println(node3);
// //创建临时顺序节点
// String node4 = zk.create("/node4_api", "123".getBytes(), aclList, CreateMode.EPHEMERAL_SEQUENTIAL);
// System.out.println(node4);
}
创建了持久非顺序节点
创建了持久顺序节点
idea命令行已经打印了node3临时非顺序节点
但是因为java代码的会话已经关闭,所以查不到临时节点
查询节点
不使用监听器
不使用监听器,效果与zookeeper命令行一致
ls -s /
//查询节点的子节点 不设置监听器
@Test
public void getChildNode() throws InterruptedException, KeeperException {
//设置查询的起始节点
String path="/";
//是否设置监听器
boolean watcher=false;
//查询根节点的所有子节点
List<String> childrenNodeList = zk.getChildren(path, watcher);
for (String item:childrenNodeList){
System.out.println(item);
}
}
使用监听器
使用监听器会在线程阻塞后自动保持监听器的监听状态,默认调用init中的监听器
//查询节点中的子节点,设置监听器
@Test
public void getChildNodeWithWatcher() throws InterruptedException, KeeperException {
List<String> childrenNodeList = zk.getChildren("/", true);
for (String item:childrenNodeList){
System.out.println(item);
}
//为了保持监听器的监听 需要阻塞当前的线程直到当前的监听器生效
Thread.sleep(Long.MAX_VALUE);
}
Hadoop的HA模式配置
安装配置
准备三台克隆的虚拟机,其中包含安装好的Hadoop与jdk
安装ZooKeeper
配置hosts文件与profile文件
配置三台虚拟机之间进行免密登录
部分内容参考:
Hadoop分布式安装
在正常的Hadoop安装中,需要将以下文件内容变更
core-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- hdfs地址,ha中是连接到nameservice -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://mycluster</value>
</property>
<!-- 指定hadoop数据的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/softs/hadoop3.1.3/data</value>
</property>
<!-- 故障转移 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>bigdata07:2181,bigdata08:2181,bigdata09:2181</value>
</property>
<!-- 解决HDFS web页面上删除、创建文件权限不足的问题 -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>root</value>
</property>
<property>
<name>hadoop.proxyuser.root.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.root.groups</name>
<value>*</value>
</property>
</configuration>
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 为namenode集群定义一个services name -->
<property>
<name>dfs.nameservices</name>
<value>mycluster</value>
</property>
<!-- nameservice包含哪些namenode,为各个namenode起名 -->
<property>
<name>dfs.ha.namenodes.mycluster</name>
<value>nn1,nn2</value>
</property>
<!-- 名称为nn1的namenode的rpc地址和端口号,rpc用来和datanode通讯 -->
<property>
<name>dfs.namenode.rpc-address.mycluster.nn1</name>
<value>bigdata07:8020</value>
</property>
<!-- 名称为nn2的namenode的rpc地址和端口号,rpc用来和datanode通讯 -->
<property>
<name>dfs.namenode.rpc-address.mycluster.nn2</name>
<value>bigdata08:8020</value>
</property>
<!-- 名称为nn1的namenode的http地址和端口号,web客户端 -->
<property>
<name>dfs.namenode.http-address.mycluster.nn1</name>
<value>bigdata07:50070</value>
</property>
<!-- 名称为nn2的namenode的http地址和端口号,web客户端 -->
<property>
<name>dfs.namenode.http-address.mycluster.nn2</name>
<value>bigdata08:50070</value>
</property>
<!-- namenode间用于共享编辑日志的journal节点列表 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://bigdata07:8485;bigdata08:8485;bigdata09:8485/mycluster</value>
</property>
<!-- journalnode 上用于存放edits日志的目录 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/opt/softs/hadoop3.1.3/data/dfs/jn</value>
</property>
<!-- 客户端连接可用状态的NameNode所用的代理类 -->
<property>
<name>dfs.client.failover.proxy.provider.ns1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!--sshfence:防止namenode脑裂,当脑裂时,会自动通过ssh到old-active将其杀掉,将standby切换为active -->
<property>
<name>dfs.ha.fencing.methods</name>
<value>sshfence</value>
</property>
<!--ssh密钥文件路径-->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<!-- 故障转移设置为ture -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
</configuration>
mapred-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!--指定mapreduce运行在yarn框架上-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<!--设置mapreduce的历史服务器安装在bigdata007节点上-->
<property>
<name>mapreduce.jobhistory.address</name>
<value>bigdata07:10020</value>
</property>
<!--设置历史服务器的web页面地址和端口号-->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>bigdata07:19888</value>
</property>
</configuration>
yarn-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<!-- 配置yarn的默认混洗方式,选择为mapreduce的默认混洗算法 -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!-- 是否启用日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>
<!-- 是配置聚集的日志在HDFS上最多保存多长时间 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>106800</value>
</property>
<!-- 启用resourcemanager的ha功能 -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!-- 为resourcemanage ha集群起个id -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yarn-cluster</value>
</property>
<!-- 指定resourcemanger ha有哪些节点名 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<!-- 指定第一个节点的所在节点 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>bigdata08</value>
</property>
<!-- 指定第二个节点所在机器 -->
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>bigdata09</value>
</property>
<!-- 指定resourcemanger ha所用的zookeeper节点 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>bigdata07:2181,bigdata08:2181,bigdata09:2181</value>
</property>
<!-- 开启Recovery后,ResourceManger会将应用的状态等信息保存到yarn.resourcemanager.store.class配置的存储介质中,重启后会load这些信息,并且NodeManger会将还在运行的container信息同步到ResourceManager,整个过程不影响作业的正常运行。 -->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<!-- 指定yarn.resourcemanager.store.class的存储介质(HA集群只支持ZKRMStateStore) -->
<property>
<name>yarn.resourcemanager.store.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
</configuration>
在安装流程中,再额外添加的profile配置项
export HADOOP_CONF_DIR=/opt/softs/hadoop3.1.3/etc/hadoop/
export HADOOP_CLASSPATH=`hadoop classpath`
初始化
启动ZooKeeper集群
在各个节点启动journalnode
hadoop-daemon.sh start journalnode
在bigdata07中初始化namenode节点
hdfs namenode -format
在bigdata07执行NameNode的启动
hadoop-daemon.sh start namenode
在另外一个NameNode节点(bigdata08)上同步元数据信息 然后再启动NameNode
同步元数据
hdfs namenode -bootstrapStandby
启动NameNode(在bigdata08中)
hadoop-daemon.sh start namenode
在bigdata07上执行zk的初始化
hdfs zkfc -formatZK
配置zookeeper的操作用户为root
并将新配置同步到其他虚拟机上
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
启动dfs与yarn
start-dfs.sh
start-yarn.sh
在外部Windows中添加主机配置
根据配置查找对应的网页服务