首页 > 编程语言 >大数据学习笔记hadoop、hive、java

大数据学习笔记hadoop、hive、java

时间:2023-06-09 21:00:34浏览次数:43  
标签:fs java hadoop hive org import txt root

HDFS伪分布式集群搭建

tar -zxvf hadoop-2.7.7.tar.gz
echo $JAVA_HOME
/root/software/jdk1.8.0_221

#查看loaclhost
vim /root/software/hadoop-2.7.7/etc/hadoop/hadoop-env.sh


vim /root/software/hadoop-2.7.7/etc/hadoop/core-site.xml


<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/root/hadoopData/temp</value>
</property>


vim /root/software/hadoop-2.7.7/etc/hadoop/hdfs-site.xml
<property>
<name>dfs.namenode.name.dir</name>
<value>/root/hadoopData/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>/root/hadoopData/data</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>


vim /etc/profile
export HADOOP_HOME=/root/software/hadoop-2.7.7
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

source /etc/profile


检测 Hadoop 环境变量是否设置成功,使用如下命令查看 Hadoop 版本:
hadoop version


#### 格式化文件系统

  具体指令如下:
hdfs namenode -format
  
在本机上使用以下指令启动 NameNode 进程:
hadoop-daemon.sh start namenode  
 
 
在本机上使用以下指令启动 SecondaryNameNode 进程:


hadoop-daemon.sh start secondarynamenode


脚本一键启动和关闭
start-dfs.sh

 

 

YARN伪分布式集群搭建

#配置环境变量yarn-env.sh

vim /root/software/hadoop-2.7.7/etc/hadoop/yarn-env.sh

/root/software/jdk1.8.0_221

#配置计算框架mapred-site.xml
cp mapred-site.xml.template mapred-site.xml

#接着,打开 “mapred-site.xml” 文件进行修改:
vim /root/software/hadoop-2.7.7/etc/hadoop/mapred-site.xml


<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>


#配置YARN系统yarn-site.xml

vim /root/software/hadoop-2.7.7/etc/hadoop/yarn-site.xml


<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>


# 启动和关闭 YARN 集群

start-dfs.sh


#下面二选一

#单节点逐个启动和关闭
yarn-daemon.sh start resourcemanager

yarn-daemon.sh start nodemanager


#脚本一键启动和关闭
start-yarn.sh

#http://localhost:8088



 

 

HDFS的shell命令——增删改查

# 常用命令:
# (1)ls:查看目录下文件或文件夹。
hadoop fs -cmd < args >

# 查看根目录下的目录和文件
hadoop fs -ls -R /

#(2)put:本地上传文件到HDFS。
# -f:覆盖目标文件。
# -p:保留访问和修改时间、权限。
# 将本地根目录下a.txt上传到HDFS根目录下


hadoop fs -put /a.txt /

#上传多个文件
hadoop fs -mkdir /merge
hadoop fs -put 1.txt 2.txt 3.txt /merge



#(3)get:HDFS文件下载到本地。
#格式:
hadoop fs -get < hdfs file > < local file or dir>

# 将HDFS根目录下a.txt复制到本地/root/目录下
hadoop fs -get /a.txt /root/

#(4)rm:删除文件/空白文件夹。
#格式
hadoop fs -rm [-skipTrash] <路径>

# 删除HDFS根目录下的a.txt
hadoop fs -rm /a.txt

#(5)mkdir:创建空白文件夹。
#格式:

hadoop fs -mkdir <hdfs路径>


# 在HDFS根目录下创建a文件夹
hadoop fs -mkdir /a


#(6)cp:复制。
#格式:

hadoop fs -cp < hdfs file > < hdfs file >

# 将HDFS根目录下的a.txt复制到HDFS/root/目录下
hadoop fs -cp /a.txt /root/


#(7)mv:移动。
#格式:
hadoop fs -mv < hdfs file > < hdfs file >

# 将HDFS根目录下a.txt移动到HDFS /root/目录下
hadoop fs -mv /a.txt /root/


#查看、追加、合并文本

#1. cat:查看文本内容命令。
#命令:

hadoop fs -cat [-ignoreCrc] <src>



# 查看根目录下的hadoop文件
hadoop fs -cat /hadoo.txt


#2. appendToFile:追加一个或者多个文件到hdfs指定文件中。
#命令:

hadoop fs -appendToFile <localsrc> ... <dst>


# 将当前文件*.xml追加到HDFS文件big.xml中。
hadoop fs -appendToFile *.xml /big.xml


#3. getmerge:合并下载多个文件。
#参数: 加上nl后,合并到local file中的hdfs文件之间会空出一行。
#命令:

hadoop fs -getmerge -nl < hdfs dir > < local file >


# 将HDFS目录下的log.*文件合并下载到本地log.sum文件中
hadoop fs -getmerge /aaa/log.* ./log.sum


#创建文件名命令
hadoop fs -touchz /abc.txt

 

 

 

实训

 

#shell命令实操:
#(1)查看HDFS根目录下结构。
#提示:因为我们根目录下没有问价或文件夹,因此不显示内容。
hadoop fs -ls -R /

#(2)在HDFS根目录下创建root文件夹。
hadoop fs -mkdir /root

#(3)在本地/root目录下创建hadoop.txt文件,添加如下内容:
#hadoop hdfs yarn
#hello hadoop

vi hadoop.txt
hadoop hdfs yarn
hello hadoop

#(4)将本地hadoop.txt上传到HDFS目录/root/下。
hadoop fs -put hadoop.txt /root/

#(5)将HDFS目录文件/root/hadoop.txt复制到根目录下并查看内容。
hadoop fs -get /root/hadoop.txt /

#(6)删除HDFS目录文件/root/hadoop.txt。
hadoop fs -rm /root/hadoop.txt

#(7)将HDFS目录文件/hadoop.txt迁移到HDFS目录/root/下并查看迁移是否成功。
hadoop fs -put /hadoop.txt /root/

#(8)将HDFS目录文件/root/hadoop.txt复制到本地根目录下并查看。
hadoop fs -get /root/hadoop.txt /

cat /hadoop.txt



#appendToFile命令实操:
#(1)在本地当前目录(/headless)下创建a.txt,b.txt,c.txt文件。
cd /headless
echo 123 > a.txt && echo 456 > b.txt && echo 789 > c.txt


#(2)分别添加内容123,456,789。

#(3)在HDFS根目录下创建abc.txt文件并查看。
#提示:创建文件名命令hadoop fs -touchz /abc.txt
hadoop fs -touchz /abc.txt

#(4)将本地a.txt,b.txt,c.txt追加到abc.txt文件。
hadoop fs -appendToFile *.txt /abc.txt

#(5)查看abc.txt文件。
hadoop fs -cat /abc.txt

#5. getmerge命令实操:
#(1)将刚才创建的a.txt,b.txt,c.txt文件上传到HDFS根目录。
hadoop fs -put a.txt b.txt c.txt /

#(2)将HDFS根目录下*.txt文件下载到本地/root/sum.txt。
hadoop fs -getmerge /*.txt /root/sum.txt

 

 

 

 

 

Linux下Eclipse连接Hadoop

 

1.Linux下Eclipse连接Hadoop:
(1)已上传hadoop-eclipse-plugin-2.7.7.jar,路径已设置为plugins目录。
(2)启动Eclipse,选择默认路径即可。
(3)关闭Welcome页面。
(4)点击Window->Preferences,左侧对话框选择Hadoop Map/Reduce选项。
(5)设置hadoop-2.7.7的安装路径。(实验环境hadoop-2.7.7的安装目录为:/root/software/hadoop-2.7.7)。
(6)点击Apply and Close。
(7)点击Window->Show View->Other,弹出Show View对话框,选中MapReduce Tools下的Map/Reduce Locations,点击Open。
(8)Eclipse底部出现Map/Reduce Locations窗口,点击其右边的蓝色小象图标。
(9)弹出New Hadoop locaiton...对话框,将Location name命名为hadoop。
(10)Map/Reduce(V2)Master下将Host修改为YARN集群主节点的IP地址或主机名localhost。
(11)DFS Master下将Host修改为HDFS集群主节点的IP地址或主机名localhost,Port修改为9000,将User name设置为搭建集群用户root。
2.测试连接
(1)选择File->New->Project->Map/Reduce Project->Next,弹出new MapReduce Project Wizard对话框。
(2)为Project name起名为Test,点击Finsh。
(3)弹出Open Associated Perspective对话框选择No。
(4)在左侧Project Explorer下出现DFS Locations列表栏和Test。
至此,我们Hadoop已经可以连接上Eclipse。后续可进行程序开发。

 

 

 

 

 

1. 创建项目:
(1)选择File->New->Project->Map/Reduce Project->Next,弹出new MapReduce Project Wizard对话框。Project name命名为:getFS,点击Finsh
(2)点击左侧getFS,在src下创建包结构:com.hongya.getfs。

 

(3)在包下创建测试类Test1,Test2。
2. 通过配置获取FileSystem对象:
(1)在Test1下使用new构造Configuration对象,获取配置信息类。
(2)通过set方法设置配置项,指定文件系统为HDFS,即fs.defaultFS参数的值为hdfs://localhost:9000。
(3)获取文件系统对象并打印URI地址。


package com.hongya.getfs;


import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

public class Test1 {
public static void main(String[] args) throws IOException {
Configuration conf=new Configuration();
//如果我们没有给conf设置文件系统,那么fileSystem默认获取的是本地文件系统的一个实例
//若是我们设置了“fs.defaultFS”参数,这表示获取的是该 URI 的文件系统的实例,就是我们需要的 HDFS 集群的一个 fs 对象
conf.set("fs.defaultFS", "hdfs://localhost:9000");
//获取文件系统对象
FileSystem fs=FileSystem.get(conf);
//打印URI地址
System.out.println(fs.getUri());
}
}

#####

 

 

3. 直接获取FileSystem对象:
(1)在Test2中用new构造Configuration对象,获取配置信息类。
(2)通过get方法获取HDFS文件系统对象,设置URI为hdfs://localhost:9000,安装集群用户名为root。
(3)打印获取的文件系统URI。

 

 

 

package com.hongya.getfs;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;


public class Test2 {
public static void main(String[] args) throws InterruptedException, URISyntaxException, IOException {
       Configuration conf = new Configuration();
       // 不需要配置“fs.defaultFS”参数,直接传入URI和用户身份,最后一个参数是安装Hadoop集群的用户,我的是“root”
       FileSystem fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf, "root");
       System.out.println(fs.getUri());
  }
}

 

 

 

 

上传查看文件操作

 

1.创建项目:
(1)项目名:Api_hdfs。
(2)包名:com.hongya.api_hdfs 。
(3)类:
  • 创建目录类:MkDemo

  • 上传文件类:PutDemo

  • 遍历文件类:ListDemo

2. 代码逻辑:
(1)在MkDemo类中使用单元测试方法。
(2)直接创建FileSystem对象。
(3)使用mkdirs方法在根目录下创建test文件夹。
(4)关闭资源。
(5)运行程序并在Eclipse中的DFS Locations列表栏查看文件夹创建是否成功。

#####

 

创建目录类:MkDemo

package com.hongya.api_hdfs;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class MkDemo {
FileSystem fs = null;

// 每次执行单元测试前都会执行该方法
@Before
public void setUp() throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
// 不需要配置“fs.defaultFS”参数,直接传入URI和用户身份,最后一个参数是安装Hadoop集群的用户,我的是“root”
fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf, "root");
}

// 单元方法:创建目录
@Test
public void mkdir() throws IllegalArgumentException, IOException {
boolean mkdirs = fs.mkdirs(new Path("/test"));
System.out.println(mkdirs);
}

// 每次执行单元测试后都会执行该方法,关闭资源
@After
public void tearDown() {
if (null != fs) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

 

 

1. 在本地/root目录下创建test.txt文件。添加如下内容:
HDFS_API
2. 代码逻辑:
(1)在PutDemo类中使用单元测试方法。
(2)创建FileSystem对象。
(3)使用copyFromLocalFile方法上传文件test.txt。
(4)关闭资源。
(5)运行程序并在Eclipse中的DFS Locations列表栏查看文件上传是否成功。

注:如果查看test文件夹没有文件,右击test选择Refresh刷新即可。

 

PutDemo类

package com.hongya.api_hdfs;


import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class PutDemo {
FileSystem fs = null;

// 每次执行单元测试前都会执行该方法
@Before
public void setUp() throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
// 不需要配置“fs.defaultFS”参数,直接传入URI和用户身份,最后一个参数是安装Hadoop集群的用户,我的是“root”
fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf, "root");
}

//单元方法:上传文件
@Test
public void addFileToHdfs() throws IOException{
/*
* src:要上传的文件所在的本地路径
* dst:要上传到HDFS的目标路径
*/
Path src=new Path("/root/test.txt");
Path dst=new Path("/test");
//默认不删除本地源文件,覆盖HDFS同名文件
fs.copyFromLocalFile(src, dst);
}

// 每次执行单元测试后都会执行该方法,关闭资源
@After
public void tearDown() {
if (null != fs) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

}

 

 

遍历文件

 

  任务二中我们已经将文件上传到HDFS,接下来我们演示通过代码来遍历出创建的文件夹以及上传的文件

遍历文件代码逻辑:
(1)在ListDemo类中使用单元测试方法。
(2)获取FileSystem对象。
(3)通过listFiles方法获取RemoteIterator得到所有的文件或者文件夹。
(4)递归遍历并打印文件夹或文件路径。
(5)关闭资源。
(6)运行程序查看控制台输出。

 

 

package com.hongya.api_hdfs;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.FileNotFoundException;
import org.apache.hadoop.fs.FileStatus;
import java.io.FileNotFoundException;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;

public class ListDemo {
FileSystem fs = null;

// 每次执行单元测试前都会执行该方法
@Before
public void setUp() throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
// 不需要配置“fs.defaultFS”参数,直接传入URI和用户身份,最后一个参数是安装Hadoop集群的用户,我的是“root”
fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf, "root");
}


//单元方法:查看目录信息,只显示该目录下的文件信息
   @Test
   public void listFiles() throws FileNotFoundException, IllegalArgumentException, IOException{
       //使用迭代器递归获取该目录下的所有文件
       RemoteIterator<LocatedFileStatus> listfile=fs.listFiles(new Path("/"), true);
       while(listfile.hasNext()){
           LocatedFileStatus fileStatus=listfile.next();
           System.out.println("文件路径:"+fileStatus.getPath());
           System.out.println("文件名称:"+fileStatus.getPath().getName());
           System.out.println("块的大小:"+fileStatus.getBlockSize());
           System.out.println("文件所有者:"+fileStatus.getOwner());
           System.out.println("文件所属组:"+fileStatus.getGroup());
           System.out.println("文件权限:"+fileStatus.getPermission());
           System.out.println("副本个数:"+fileStatus.getReplication());
           System.out.println("文件长度:"+fileStatus.getLen());
           System.out.println("-----块的信息-----");
           BlockLocation[] blockLocations=fileStatus.getBlockLocations();
           for(BlockLocation bLocation:blockLocations){
               System.out.println("块的长度:"+bLocation.getLength()+"\t块起始偏移量:"+bLocation.getOffset());
               //块所在的DataNode节点
               String[] hosts = bLocation.getHosts();
               System.out.print("DataNode: ");
               for(String str:hosts){
                   System.out.print(str+"\t");
              }
               System.out.println();
          }
           System.out.println("------------------------");
      }
  }


// 每次执行单元测试后都会执行该方法,关闭资源
@After
public void tearDown() {
if (null != fs) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

 

 

 

 

修改删除文件操作

 

 

1. 创建项目:
(1)项目名:Api_hdfs。
(2)包名:com.hongya.api_hdfs 。
(3)类:
  • 下载文件类:GetDemo

  • 重命名目录文件类:ReDemo

  • 删除文件目录类:RmDemo

2. 数据准备:
(1)在本地/目录下创建test.txt文件,添加内容如下:
copyTolocalfile()
rename()
delete()
(2)创建并上传test.txt文件到HDFS目录:/test
需求:将HDFS文件test.txt下载到本地/root目录。
3. 下载文件代码逻辑:
(1)在GetDemo类中使用单元测试方法。
(2)直接创建FileSystem对象。
(3)使用copyTolocalfile方法将test.txt文件下载到/root。
(4)关闭资源。

 

package com.hongya.api_hdfs;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class GetDemo {
FileSystem fs = null;

// 每次执行单元测试前都会执行该方法
@Before
public void setUp() throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
// 不需要配置“fs.defaultFS”参数,直接传入URI和用户身份,最后一个参数是安装Hadoop集群的用户,我的是“root”
fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf, "root");
}

// 单元方法:下载文件
@Test
public void downLoadFileToLocal() throws IOException {
/*
* src:要下载的文件所在的HDFS路径 dst:要下载到本地的目标路径
*/
Path src = new Path("/test/test.txt");
Path dst = new Path("/root/");
// 默认不删除HDFS源路径的文件,覆盖本地同名文件
fs.copyToLocalFile(src, dst);
}

// 每次执行单元测试后都会执行该方法,关闭资源
@After
public void tearDown() {
if (null != fs) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

 

 

本节任务针对任务一中的HDFS文件**/test/test.txt**进行操作。

需求1:将test.txt文件重命名为tmp.txt。
需求2:将test文件夹重命名为tmp。
代码逻辑:
(1)在ReDemo类中使用单元测试方法。
(2)直接创建FileSystem对象。
(3)使用rename方法重命名目录文件。
(4)关闭资源。
(5)运行程序并在Eclipse中的DFS Locations列表栏查看是否修改成功。

 

 

 

package com.hongya.api_hdfs;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class ReDemo {
FileSystem fs = null;

// 每次执行单元测试前都会执行该方法
@Before
public void setUp() throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
// 不需要配置“fs.defaultFS”参数,直接传入URI和用户身份,最后一个参数是安装Hadoop集群的用户,我的是“root”
fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf, "root");
}


// 单元方法:重命名文件或者文件夹
   @Test
   public void renameFileOrDir() throws IllegalArgumentException, IOException {
       //重命名文件
       fs.rename(new Path("/test/test.txt"), new Path("/test/tmp.txt"));
       //重命名文件夹
       fs.rename(new Path("/test"), new Path("/tmp"));
  }

// 每次执行单元测试后都会执行该方法,关闭资源
@After
public void tearDown() {
if (null != fs) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

 

 

 

 

需求1:删除任务二中修改后文件tmp.txt。
需求2:删除任务二中修改后文件夹tmp。
代码逻辑:
(1)在RmDemo类中使用单元测试方法。
(2)直接创建FileSystem对象。
(3)使用delete方法删除目录文件。
(4)关闭资源。
(5)运行程序并在Eclipse中的DFS Locations列表栏查看目录文件删除是否成功。

 

package com.hongya.api_hdfs;



import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class RmDemo {
FileSystem fs = null;

// 每次执行单元测试前都会执行该方法
@Before
public void setUp() throws IOException, InterruptedException, URISyntaxException {
Configuration conf = new Configuration();
// 不需要配置“fs.defaultFS”参数,直接传入URI和用户身份,最后一个参数是安装Hadoop集群的用户,我的是“root”
fs = FileSystem.get(new URI("hdfs://localhost:9000"), conf, "root");
}

   //单元方法:删除文件或者文件夹
   @Test
   public void deleteFileOrDir() throws IllegalArgumentException, IOException{
       //删除文件,第二参数:是否递归,若是文件或者空文件夹时可以为false,若是非空文件夹则需要为true
       fs.delete(new Path("/tmp/tmp.txt"),false);
       //删除文件夹
       fs.delete(new Path("/tmp"), true);
  }

// 每次执行单元测试后都会执行该方法,关闭资源
@After
public void tearDown() {
if (null != fs) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

 

 

 

 

小文件合并

 

 

1. 创建项目:
(1)项目名:Api_hdfs。
(2)包结构:com.hongya.api_hdfs。
(3)合并文件实现类:MergeDemo
2. 数据准备:
(1)在本地/root目录下创建目录input。
(2)在input目录下创建a.txt文件,添加如下内容:
123
(3)在input目录下创建b.txt文件,添加如下内容:
456
(4)在input目录下创建c.txt文件,添加如下内容:
789
3. 代码逻辑:
(1)获取HDFS文件系统对象FileSystem。
(2)在HDFS根目录创建merge.txt文件。
(3)获取本地文件系统。
(4)获取本地文件系统文件列表集合。
(5)迭代遍历文件列表获取数据并进行数据拷贝。
(6)关闭资源。

#####

 

package com.hongya.api_hdfs;




import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocalFileSystem;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.After;
import org.junit.Test;









public class MergeDemo {
FileSystem fs = null;

// 每次执行单元测试前都会执行该方法


// 单元方法:下载文件
@Test
public void mergeFile() throws  Exception{
   //?????????
   FileSystem fileSystem = FileSystem.get(new URI("hdfs://localhost:9000"), new Configuration(),"root");
   //????
   FSDataOutputStream outputStream = fileSystem.create(new Path("/merge.txt"));
   //????????
   LocalFileSystem local = FileSystem.getLocal(new Configuration());
   //????????????????
   FileStatus[] fileStatuses = local.listStatus(new Path("/root/input/"));
   //????????
   for (FileStatus fileStatus : fileStatuses) {
       //????
       FSDataInputStream inputStream = local.open(fileStatus.getPath());
//????
       IOUtils.copy(inputStream,outputStream);
//???????
       IOUtils.closeQuietly(inputStream);
  }
   //????
   IOUtils.closeQuietly(outputStream);
   local.close();
   fileSystem.close();
}


// 每次执行单元测试后都会执行该方法,关闭资源
@After
public void tearDown() {
if (null != fs) {
try {
fs.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

 

 

 

Map端程序编写

 

1. MapReduce项目创建:
(1)项目名:MRDemo。
(2)包结构:com.hongya.mrdemo。
(3)Map端程序业务类:WordCountMapper。
2. 需求介绍:

  按照给定的文本,内容如下所示,对其进行WordCount统计,完成Map端程序代码

hello,hadoop
hello,word
hello,hdfs
3. Map端编码步骤:
(1)定义的WordCountMapper类继承Mapper,设置泛型类型。
(2)重写map方法:
  • 获取每一行数据并转换成String。

  • 按逗号切分每行数据获取单词 。

  • 遍历数组,获取每个单词。

(3)将单词作为key,value记为1写入上下文。
4.代码样例:

 

package com.hongya.mrdemo;


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Mapper;


//(1)定义的WordCountMapper类继承Mapper,设置泛型类型。
public class WordCountMapper extends Mapper<LongWritable, Text, Text, Writable>{
//map方法的生命周期: 框架每传一行数据就被调用一次
//key : 这一行的起始点在文件中的偏移量
//value: 这一行的内容
/*
(2)重写map方法
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//拿到一行数据转换为string
String line = value.toString();
//将这一行切分出各个单词
String[] words = line.split(",");
//遍历数组,输出<单词,1>
for(String word:words){
//(3)将单词作为key,value记为1写入上下文。
context.write(new Text(word), new LongWritable (1));
}
}
}

 

 

Reduce端程序编写

 

 

回顾1.1 Map端程序编写例一,我们对文本数据做WordCount实现了Map端程序,接着针对它实现Reduce端程序业务处理。文本数据如下所示:

hello,hadoop
hello,word
hello,hdfs
1. MapReduce项目创建:
(1)项目名:MRDemo。
(2)包结构:com.hongya.mrdemo。
(3)Reduce端程序业务类:WordCountReducer。
2. Reduce端编程步骤:
(1)定义的WordCountReducer类继承Reducer,设置泛型类型。
(2)重写reduce方法,对每个单词数求和统计。
(3)收集统计结果kv写入上下文。

######

package com.hongya.mrdemo;

import java.io.IOException;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;






public class WordCountReducer extends Reducer<Text,LongWritable,Text,LongWritable> {
//生命周期:框架每传递进来一个kv 组,reduce方法被调用一次
@Override
protected void reduce(Text key, Iterable<LongWritable > values, Context context) throws IOException, InterruptedException {
//定义一个计数器
int count = 0;
//遍历这一组kv的所有v,累加到count中
for(LongWritable value:values){
count += value.get();
}
//(3)收集统计结果kv写入上下文
context.write(key, new LongWritable (count));
}
}

 

 

 

 

 

 

 

Driver端程序编写

 

 

1. MapReduce创建项目:
(1)项目名:MRDemo。
(2)包结构:com.hongya.mrdemo。
(3)Driver端程序业务类:WordCountDriver。
2. 准备工作:
(1)点击按钮上传前两节Map和Reduce端代码,上传路径为:/root/eclipse-workspace/MRDemo/src/com/hongya/。

上传代码

(2)将其进行解压删除压缩包。
(3)在/root目录下创建wordcount.txt文件,添加我们例一单词统计数据,如下所示:
hello,hadoop
hello,word
hello,hdfs
3. Driver端代码逻辑:
(1)在main方法中创建job对象。
(2)指定job所在的jar包。
(3)指定Mapper类为例一的WordCountMaaper,Reducer类为例一的WordCountReducer。
(4)指定MapTask和ReduceTask的输出key-value类型。若两者输出类型一致,MapTask输出类型可省略。
(5)设置输入路径为/root/wordcount.txt,输出路径为/root/output1。
(6)将job提交给yarn集群。
4. 代码示例:
package com.hongya.mrdemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class WordCountDriver{

 

   public static void main(String[] args) throws Exception {
       //创建配置文件
Configuration conf = new Configuration();

//conf.set("mapreduce.framework.name", "local");// 指定MapReduce运行时框架为本地作业运行器
//conf.set("fs.defaultFS", "file:///"); // 获取本地文件系统实例

       // 1:创建job对象
Job job = Job.getInstance(conf);
//2:指定job所在的jar包。
job.setJarByClass(WordCountDriver.class);
//3:指定 mapper 类和 reducer 类
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
//4.1:指定 MapTask 的输出key-value类型(可以省略)
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//4.2:指定 ReduceTask 的输出key-value类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//5:设置输入输出路径
Path inPath=new Path("/root/wordcount.txt");
Path outpath=new Path("/root/output1");
FileSystem fs=FileSystem.get(conf);
//判断输出路径是否存在,存在将删除目录
if(fs.exists(outpath)){
fs.delete(outpath,true);
}
FileInputFormat.setInputPaths(job,inPath);
FileOutputFormat.setOutputPath(job, outpath);
//6:代码提交yarn集群,等待运行完反馈信息,客户端退出
boolean bl = job.waitForCompletion(true);
       System.exit(bl?0:1);
}
}

  至此,我们整个例一单词统计涉及到的程序已经编写完毕,接下来将运行程序查看我们处理结果。

 

 

 

 

程序打包

 

程序打包,如下示例所示。

右键项目MyMR,选择Export

img > 选择Java->JAR file,之后点击Next img

在弹出的对话框中,勾选MyMR项目中的src,然后在JAR file中选择保存的路径名(包含最终的jar包的名字),之后点击Finish

img

(3)jar包上传Linux,使用命令运行jar包。
hadoop jar WordCount.jar com.hongya.mrdemo.WordCountDriver
解释说明:
  • hadoop jar:用来执行一个Hadoop的jar包程序。

  • WordCount.jar:执行的jar包的名字。

  • com.hongyaa.mr.WordCount:包名.类名,此处的类名为我们主程序类的名字。

  •  

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

标签:fs,java,hadoop,hive,org,import,txt,root
From: https://www.cnblogs.com/Cuckoo123456/p/17470231.html

相关文章

  • Hive执行计划之什么是hiveSQL向量化模式及优化详解
    Hive开启向量化模式也是hiveSQL优化方法中的一种,可以提升hive查询速率,也叫hive矢量化。问题1:那么什么是hive向量化模式呢?问题2:hive向量化什么情况下可以被使用,或者说它有哪些使用场景呢?问题3:如何查看hive向量化使用的相关信息?1.什么是hive向量化模式hive向量化模式是hive的一......
  • java集成chatGpt完整案例代码(效果和官网一样逐字输出)
    背景要集成chatGpt参考我上一篇文章即可。但是,如果要实现官网一样的效果,逐字输出,难度就提升了不少了。经过在官网的研究发现它应该是采用了SSE技术,这是一种最新的HTTP交互技术。SSE(Server-SentEvents):通俗解释起来就是一种基于HTTP的,以流的形式由服务端持续向客户端发送数据的......
  • Java内存分析
    一、Java内存分区java内存分区****方法区主要用来存储已被虚拟机加载的类的信息、常量、静态变量和即时编译器编译后的代码等数据。****堆java堆是所有线程所共享的一块内存,在虚拟机启动时创建,几乎所有的对象实例都在这里创建,因此该区域经常发生垃圾回收操作。****虚拟机栈......
  • Java对象中转换空值的字段
    在后端返回数据给前端时,公司的框架会把值为null的数据去掉,方便前端取值。如:Person对象为:{name:"浩二",age:24,weight:null,height:114},那返回给前端的就为{name:"浩二",age:24,height:114}。如果这个时候有个需求: Integer类型的字段为null给-1 Long类型的字段为null......
  • 大家都说Java有三种创建线程的方式!并发编程中的惊天骗局!
    在Java中,创建线程是一项非常重要的任务。线程是一种轻量级的子进程,可以并行执行,使得程序的执行效率得到提高。Java提供了多种方式来创建线程,但许多人都认为Java有三种创建线程的方式,它们分别是继承Thread类、实现Runnable接口和使用线程池。但是,你们知道吗?其实在创建线程的过程中......
  • JavaScript 构造器模式:创建可重用的对象
    前言JavaScript是一种基于对象的语言,对象是JavaScript中最重要的概念之一。在JavaScript中,我们可以使用构造器模式来创建可重用的对象。本文将介绍JavaScript构造器模式的概念、用法和实例,并给出博客标题《JavaScript构造器模式:创建可重用的对象》。构造器模式构造器模......
  • Caused by: java.lang.ClassNotFoundException: com.alibaba.fastjson2.util.Wrap
    1.情景展示使用fastjson2,运行时报错:Causedby:java.lang.ClassNotFoundException:com.alibaba.fastjson2.util.Wrap2.具体分析出现这个问题,是因为pom.xml当中引用的有关fastjson的jar包冲突造成的。只要我们把冲突的jar包排除掉就可以了。3.解决方案在idea当中,使用插件......
  • Java 集合框架体系简介
    为什么要使用集合存储多个数据可以使用数组,但由于数组在内存中是连续存储的,所以会有一些限制。比如数组在创建时就要指定长度,即可以容纳的元素个数,且指定后无法更改;数组在创建时需要指定元素的类型,并且所有元素都必须是该类型或其子类;添加或删除数组中的元素需要创建一个新数组再......
  • java的接口和抽象类简单理解
    接口是定义了系统各模块应该遵守的标准。实现者对外提供哪些服务,调用者可以调用哪些服务以及如何调用服务。抽象类作为系统中各个子类的共同父类,所表现的是一种模板设计,只实现了最基础的共通功能,相当于一个中间产品,各个子类具体实现抽象方法。抽象类可以没有抽象方法,......
  • java8如何校验ssh-keygen生成的公私钥
    如果你的公私钥文件不是PEM格式的,而是其他格式,如OpenSSH格式(通常以`id_rsa`和`id_rsa.pub`命名),你可以使用Java的`JSch`库来验证它们的有效性。下面是一个示例代码,演示如何使用`JSch`库验证OpenSSH格式的公私钥对:首先,你需要在项目中引入JSch库的依赖。你可以使用Maven或手动下载并......