首页 > 其他分享 >HDFS与MAPREDUCE操作

HDFS与MAPREDUCE操作

时间:2023-11-21 13:22:24浏览次数:39  
标签:HDFS 文件 import hadoop MAPREDUCE job 操作 apache org

 

HDFS文件操作

在分布式文件系统上验证HDFS文件命令,如下。

hadoop fs [genericOpitions]

[-ls <path>]  //显示目标路径当前目录下的所有文件

[-lsr <path>]  //递归显示目标路径下的所有目录及文件(深度优先)

[-du <path>]  //以字节为单位显示目录中所有文件的大小,或该文件的大小(如果path为文件)

[-dus <paht>]  //以字节为单位显示目标文件大小(用于查看文件夹大小)

[-count [-q] <path>]  //将目录的大小、包含文件(包括文件)个数的信息输出到屏幕(标准stdout)

[-mv <src> <dst>]  //把文件或目录移动到目标路径,这个命令允许同时移动多个文件,但是只允许移动到一个目标路径中,参数中的最有一个文件夹即为目标路径

[-cp <src> <dst>]  //复制文件或目录到目标路径,这个命令允许同时复制多个文件,如果复制多个文件,目标路径必须是文件夹

[-rm [-skipTrash] <path>]  //删除文件,这个命令不能删除文件夹

[-rmr [-skipTrash] <path>]  //删除文件夹及其下的所有文件

[-expunge]

[-put <localsrc> ... <dst>]  //从本地文件系统上传文件到HDFS中

[-copyFromLocal <localsrc> ... <dst>]  //与put相同

[-moveFromLocal <localsrc> ... <dst>]  //与put相同,但是文件上传之后会从本地文件系统中移除

[-get [-ignoreCrc] [-crc] <src> <localdst>]  //复制文件到本地文件系统。这个命令可以选择是否忽视校验和,忽视校验和和下载主要用于挽救那些已经发生错误的文件

[-getmerge <src> <localdst> [addnl]]  //将源目录中的所有文件进行排序并写入目标文件中,文件之间以换行符分隔

[-cat <src>]  //在终端显示(标准输出stdout)文件中的内容,类似Linux系统中的cat

[-text <src>]

[-copyToLocal [-ignoreCrc] [-crc] <src> <localdst>] //与get相同

[-moveToLocal [-crc] <src> <localdst>]

[-mkidr <path>]  //创建文件夹

[-setrep [-R] [-w] <rep> <path/file>]  //改变一个文件的副本个数。参数-R可以递归地对该目录下的所有文件做统一操作

[-touchz <path>]  //类似Linux中的touch,创建一个空文件

[-test -[ezd] <path>]  //将源文件输出为文本格式显示到终端上,通过这个命令可以查看TextRecordInputStream(SequenceFile等)或zip文件

[-stat [format] <path>]  //以指定格式返回路径的信息

[-tail [-f] <file>]  //在终端上显示(标准输出stdout)文件的最后1kb内容。-f选项的行为与LInux中一致,会持续监测先添加到文件中的内容,这在查看日志文件时会显得非常方便。

[-chmod [-R] <MODE[,MODE]...| OCTALMODE> PATH...]  //改变文件的权限,只有文件的所有者或者是超级用户才能使用这个命令。-R可以递归地改变文件夹内的所有文件的权限

[-chown [-R] [OWNER] [:[GROUP] PATH...]]  //改变文件的拥有者,-R可以递归地改变文件夹内所有文件的拥有者。同样,这个命令只有超级用户才能使用

[-chgrp [-R] GROUP PATH...]  //改变文件所属的组,-R可以递归地改变文件夹内所有文件所属的组。这个命令必须是超级用户才能使用

[-help [cmd]]  //这是命令的帮助信息

 

[-ls <path>]  //显示目标路径当前目录下的所有文件

 

[-du <path>]  //以字节为单位显示目录中所有文件的大小,或该文件的大小(如果path为文件)

 

HDFS接口编程

调用HDFS文件接口实现对分布式文件系统中文件的访问,如创建、修改、删除等。

参考代码:

 

 

 

 

 

MAPREDUCE并行程序开发

3.1  求每年最高气温

原始数据如下:

2014010114

2014010216

2014010317

2014010410

2014010506

2012010609

2012010732

2012010812

2012010919

2012011023

2001010116

2001010212

2001010310

2001010411

2001010529

2013010619

2013010722

2013010812

2013010929

2013011023

2008010105

2008010216

2008010337

2008010414

2008010516

2007010619

2007010712

2007010812

2007010999

2007011023

2010010114

2010010216

2010010317

2010010410

2010010506

2015010649

2015010722

2015010812

2015010999

2015011023

 代码

import java.io.IOException;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Temperature {

    /**

     * 四个泛型类型分别代表:

     * KeyIn        Mapper的输入数据的Key,这里是每行文字的起始位置(0,11,...)

     * ValueIn      Mapper的输入数据的Value,这里是每行文字

     * KeyOut       Mapper的输出数据的Key,这里是每行文字中的“年份”

     * ValueOut     Mapper的输出数据的Value,这里是每行文字中的“气温”

     */

    static class TempMapper extends

            Mapper<LongWritable, Text, Text, IntWritable> {

        @Override

        public void map(LongWritable key, Text value, Context context)

                throws IOException, InterruptedException {

            // 打印样本: Before Mapper: 0, 2000010115

            System.out.print("Before Mapper: " + key + ", " + value);

            String line = value.toString();

            String year = line.substring(0, 4);

            int temperature = Integer.parseInt(line.substring(8));

            context.write(new Text(year), new IntWritable(temperature));

            // 打印样本: After Mapper:2000, 15

            System.out.println(

                    "======" +

                    "After Mapper:" + new Text(year) + ", " + new IntWritable(temperature));

        }

    }

 

    /**

     * 四个泛型类型分别代表:

     * KeyIn        Reducer的输入数据的Key,这里是每行文字中的“年份”

     * ValueIn      Reducer的输入数据的Value,这里是每行文字中的“气温”

     * KeyOut       Reducer的输出数据的Key,这里是不重复的“年份”

     * ValueOut     Reducer的输出数据的Value,这里是这一年中的“最高气温”

       static class TempReducer extends

            Reducer<Text, IntWritable, Text, IntWritable> {

        @Override

        public void reduce(Text key, Iterable<IntWritable> values,

                Context context) throws IOException, InterruptedException {

            int maxValue = Integer.MIN_VALUE;

            StringBuffer sb = new StringBuffer();

            //取values的最大值

            for (IntWritable value : values) {

                maxValue = Math.max(maxValue, value.get());

                sb.append(value).append(", ");

            }

            // 打印样本: Before Reduce: 2000, 15, 23, 99, 12, 22, 

            System.out.print("Before Reduce: " + key + ", " + sb.toString());

            context.write(key, new IntWritable(maxValue));

            // 打印样本: After Reduce: 2000, 99

            System.out.println(

                    "======" +

                    "After Reduce: " + key + ", " + maxValue);

        }

    }

 

    public static void main(String[] args) throws Exception {

        //输入路径

        String dst = "hdfs://localhost:9000/intput.txt";

        //输出路径,必须是不存在的,空文件加也不行。

        String dstOut = "hdfs://localhost:9000/output";

        Configuration hadoopConfig = new Configuration();

         

        hadoopConfig.set("fs.hdfs.impl", 

            org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()

        );

        hadoopConfig.set("fs.file.impl",

            org.apache.hadoop.fs.LocalFileSystem.class.getName()

        );

        Job job = new Job(hadoopConfig);

         

        //如果需要打成jar运行,需要下面这句

        //job.setJarByClass(NewMaxTemperature.class);

 

        //job执行作业时输入和输出文件的路径

        FileInputFormat.addInputPath(job, new Path(dst));

        FileOutputFormat.setOutputPath(job, new Path(dstOut));

 

        //指定自定义的Mapper和Reducer作为两个阶段的任务处理类

        job.setMapperClass(TempMapper.class);

        job.setReducerClass(TempReducer.class);

         

        //设置最后输出结果的Key和Value的类型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(IntWritable.class);       

        //执行job,直到完成

        job.waitForCompletion(true);

        System.out.println("Finished");

    }

}

将程序发布为jar包,并上传到hadoop平台运行。

 

 

 

 

3.2 词频统计

maven建立quick-start工程。

pom.xml 

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

  <modelVersion>4.0.0</modelVersion>

 

  <groupId>cn.edu.bupt.wcy</groupId>

  <artifactId>wordcount</artifactId>

  <version>0.0.1-SNAPSHOT</version>

  <packaging>jar</packaging>

 

  <name>wordcount</name>

  <url>http://maven.apache.org</url>

 

  <properties>

    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

  </properties>

 

  <dependencies>

    <dependency>

      <groupId>junit</groupId>

      <artifactId>junit</artifactId>

      <version>3.8.1</version>

      <scope>test</scope>

    </dependency>

    <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-common</artifactId>

            <version>2.7.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-hdfs</artifactId>

            <version>2.7.1</version>

        </dependency>

        <dependency>

            <groupId>org.apache.hadoop</groupId>

            <artifactId>hadoop-client</artifactId>

            <version>2.7.1</version>

        </dependency>

  </dependencies>

</project>

 

 

 

 

 

 

 

 

 

3个java代码,mapper、reducer、runner主类:

mapper:

package cn.edu.bupt.wcy.wordcount;

 

import java.io.IOException;

 

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Mapper;

 

 

public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable>{

 

@Override

protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)

throws IOException, InterruptedException {

// TODO Auto-generated method stub

//super.map(key, value, context);

//String[] words = StringUtils.split(value.toString());

  String[] words = StringUtils.split(value.toString(), " ");

for(String word:words)

{

  context.write(new Text(word), new LongWritable(1));

 

}

}

}

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

reducer:

package cn.edu.bupt.wcy.wordcount;

 

import java.io.IOException;

 

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

 

public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

 

@Override

protected void reduce(Text arg0, Iterable<LongWritable> arg1,

Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {

// TODO Auto-generated method stub

//super.reduce(arg0, arg1, arg2);

int sum=0;

for(LongWritable num:arg1)

{

sum += num.get();

 

}

context.write(arg0,new LongWritable(sum));

 

 

}

}

 

 

 

 

 

 

 

 

runner:

package cn.edu.bupt.wcy.wordcount;

 

import java.io.IOException;

 

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

 

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

 

public class WordCountRunner {

 

public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {

Configuration conf = new Configuration();  

    Job job = new Job(conf);  

    job.setJarByClass(WordCountRunner.class);  

    job.setJobName("wordcount");  

    job.setOutputKeyClass(Text.class);  

    job.setOutputValueClass(LongWritable.class);  

    job.setMapperClass(WordCountMapper.class);  

    job.setReducerClass(WordCountReducer.class);  

    job.setInputFormatClass(TextInputFormat.class);  

    job.setOutputFormatClass(TextOutputFormat.class);  

    FileInputFormat.addInputPath(job, new Path(args[1]));  

    FileOutputFormat.setOutputPath(job, new Path(args[2]));  

    job.waitForCompletion(true);  

}

 

}

 

 

 

 

 

 

 

 

 

 

 

打包成jar包后,放到集群上运行。先在集群上新建一个文件夹:

hdfs dfs -mkdir /input_wordcount 再放入单词文件,比如:

hello world 

I like playing basketball

hello java。。。

运行hadoop jar WordCount.jar(jar包) WordCountRunner(主类) /input_wordcount /output_wordcount

运行完成后,查看:

hdfs dfs -ls /output_wordcount。已经生成了结果,在cat一下查看内容即可。

 

 

 

 

 

标签:HDFS,文件,import,hadoop,MAPREDUCE,job,操作,apache,org
From: https://www.cnblogs.com/zhouzhengyang/p/17846383.html

相关文章

  • django连接mysql pycharm操作sqlite和mysql
    1如果项目使用sqlite,不需要额外配置,直接操作即可2django默认情况链接mysql,用的驱动是mysqldb模块,python3.x以后,这个模块用不了了,咱们用的全都是pymysql,需要做个替换3showmigrations:查看哪些记录更改了,但是没有同步到数据库中3如果使用mysql,需要配置如下: -1配置文件中配置......
  • 05-基础SQL-DDL(数据定义语言)-数据库操作
    DDL-数据库操作1.查询查询所有数据库SHOWDATABASES;查询当前数据库SELECTDATABASE();2.创建创建一个数据库CREATEDATABASE[IFNOTEXISTS]数据库名[DEFAULTCHARSET字符集][COLLATE排序规则];3.删除删除一个数据库DROPDATABASE[IFEXISTS]数据库名;......
  • JavaScript-触摸操作
    触摸操作概述浏览器的触摸API由三个部分组成。Touch:一个触摸点TouchList:多个触摸点的集合TouchEvent:触摸引发的事件实例Touch接口的实例对象用来表示触摸点(一根手指或者一根触摸笔),包括位置、大小、形状、压力、目标元素等属性。有时,触摸动作由多个触摸点(多根手指)组成,多个触摸点的......
  • kubectl 操作
    kubectl实现对pod的端口转发kubectl--namespacedrone-serviceport-forward$POD_NAME8080:$CONTAINER_PORT 列出pod使用的镜像:$kubectlgetpods--all-namespaces-ojsonpath="{.items[*].spec.containers[*].image}"|\tr-s'[[:space:]]''\n&......
  • Redis的其他操作、celery
    Redis的其他操作'''delete(*names)exists(name)keys(pattern='*')expire(name,time)rename(src,dst)move(name,db))randomkey()type(name)'''redis的key值,最大可以是多少? #最大不超过512M一般1KBredis的value值,最大可以是多少? #最大不超过51......
  • go使用context.withtimtout取消一个超时操作
    3 使用context.WithTimeout:package mainimport ("context""fmt""time")func main() {timeout := 5 * time.Secondctx, cancel := context.WithTimeout(context.Background(), timeout)defer cancel()done := make(chan bool)go func() {    // ......
  • Linux操作系统 I/O重定向读书笔记
    1.理解I/O重定向的基本概念1.1输入重定向在Linux系统中,输入重定向是指将命令的输入从键盘改变为来自文件或其他命令的输出。使用<符号可以实现输入重定向,例如:$command<input.txt这将使command命令从input.txt文件中读取输入而不是从键盘。1.2输出重定向输出重定......
  • 【Redis使用】一年多来redis使用笔记md文档,第(2)篇:命令和数据库操作
    Redis是一个高性能的key-value数据库。本文会让你知道:什么是nosql、Redis的特点、如何修改常用Redis配置、写出Redis中string类型数据的增删改查操作命令、写出Redis中hash类型数据的增删改查相关命令、说出Redis中list保存的数据类型、使用StrictRedis对象对string类型数据......
  • Netty-操作篇
    服务端创建步骤步骤一:创建ServerBootstrap实例。步骤二:设置并绑定Reactor线程池。步骤三:设置并绑定服务端Channel。步骤四:链路建立的时候创建并初始化ChannelPipeline(非必须)。用于处理网络事件:1.链路注册、激活、断开、发生异常2.接收到请求消息3.请求消息接收并处理完毕......
  • C#文件操作
    使用FIle的静态方法进行文件操作1//使用file的静态方法进行复制2File.Copy(path,destpath);3//使用File的静态方法删除路径下的一个文件4File.Delete(path);5//使用File的静态方法移动路径下的一个文件6File.Move(path,destpath);7File.ReadAllText(path);//打......