首页 > 其他分享 >05 Hadoop简单使用

05 Hadoop简单使用

时间:2024-06-05 15:29:17浏览次数:26  
标签:05 org hadoop yarn Hadoop 简单 apache import class

目录

一、hadoop安装配置

​ 1、下载解压hadoop-x.x.x.tar.gz

tar -xzvf hadoop-x.x.x.tar.gz

​ 2、下载解压jdk

tar -xzvf jdkx.x.x_xxx.tar.gz

3、配置环境变量

vi /etc/profile

export JAVA_HOME=/root/env/jdk1.8.0_301
export HADOOP_HOME=/root/env/hadoop-3.3.1
export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin

​ 3.1、hadoop3.x版本需要额外添加

export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root

​ 3.2、加载环境变量

source /etc/profile

4、配置主机名&主机映射

​ 4.1、主机名

vi /etc/hostname

​ 4.2、主机映射

vi /etc/hosts

172.16.162.71 namenode01
172.16.162.75 resourcemanager01
172.16.162.199 secondaryNN01
172.16.162.223 database

5、修改hadoop配置文件(路径:$HADOOP_HOME/etc/hadoop)

​ 5.1、core-site.xml

<configuration>
  <!-- 指定namenode地址 -->
  <property>
    <name>fs.defaultFS</name>
    <value>hdfs://namenode01:9820</value>
  </property>

  <!-- 指定hadoop数据的存储目录 -->
  <property>
    <name>hadoop.tmp.dir</name>
    <value>/root/env/hadoop-3.3.1/data</value>
  </property>

  <!-- namenode网页用户 -->
  <property>
    <name>hadoop.http.staticuser.user</name>
    <value>root</value>
  </property>
</configuration>

​ 5.2、hdfs-site.xml

<configuration>
  <!-- 指定Hadoop辅助名称节点主机配置 -->
  <property>
    <name>dfs.namenode.secondary.http-address</name>
    <value>secondaryNN01:9868</value>
  </property>
</configuration>

​ 5.3、mapred-site.xml

<configuration>
  <!-- 指定Mapreduce在yarn上运行 -->
  <property>
    <name>mapreduce.framework.name</name>
    <value>yarn</value>
  </property>

  <!-- hadoop3.x版本中需要添加classpath配置 -->
  <property>
    <name>mapreduce.application.classpath</name>
    <value>/root/env/hadoop-3.3.1/etc/hadoop:/root/env/hadoop-3.3.1/share/hadoop/common/lib/*:/root/env/hadoop-3.3.1/share/hadoop/common/*:/root/env/hadoop-3.3.1/share/hadoop/hdfs:/root/env/hadoop-3.3.1/share/hadoop/hdfs/lib/*:/root/env/hadoop-3.3.1/share/hadoop/hdfs/*:/root/env/hadoop-3.3.1/share/hadoop/mapreduce/*:/root/env/hadoop-3.3.1/share/hadoop/yarn:/root/env/hadoop-3.3.1/share/hadoop/yarn/lib/*:/root/env/hadoop-3.3.1/share/hadoop/yarn/*</value>
  </property>
</configuration>

​ 5.4、yarn-site.xml

<configuration>
  <!-- 指定MR走shuffle -->
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle</value>
  </property>

  <!-- 指定resourcemanager地址 -->
  <property>
    <name>yarn.resourcemanager.hostname</name>
    <value>resourcemanager01</value>
  </property>
</configuration>

​ 5.5、在workers文件中配置所有节点的ip或者主机名(hadoop2.x版本中是slaves)

​ 5.6、配置完后,分发配置

rsync [path] [user]@[ip地址]:[path]

二、运行hadoop

1、使用命令格式namenode

hadoop namenode -format

hdfs namenode -format

2、在namenode节点启动hdfs

start-dfs.sh

3、在resourcemanager节点启动yarn

start-yarn.sh

三、hadoop2.x和hadoop3.x变化

1、功能

功能hadoop2.xhadoop3.x
支持的最低Java版本Hadoop 2.x - java的最低支持版本是java 7Hadoop 3.x - java的最低支持版本是java 8
容错Hadoop 2.x - 可以通过复制(浪费空间)来处理容错。Hadoop 3.x - 可以通过Erasure编码处理容错。
数据平衡Hadoop 2.x - 对于数据,平衡使用HDFS平衡器。Hadoop 3.x - 对于数据,平衡使用Intra-data节点平衡器,该平衡器通过HDFS磁盘平衡器CLI调用。
存储SchemeHadoop 2.x - 使用3X副本SchemeHadoop 3.x - 支持HDFS中的擦除编码。
存储开销Hadoop 2.x - HDFS在存储空间中有200%的开销。Hadoop 3.x - 存储开销仅为50%。
存储开销示例Hadoop 2.x - 如果有6个块,那么由于副本方案(Scheme),将有18个块占用空间。Hadoop 3.x - 如果有6个块,那么将有9个块空间,6块block,3块用于奇偶校验。
YARN时间线服务Hadoop 2.x - 使用具有可伸缩性问题的旧时间轴服务。Hadoop 3.x - 改进时间线服务v2并提高时间线服务的可扩展性和可靠性。
兼容的文件系统Hadoop 2.x - HDFS(默认FS),FTP文件系统:它将所有数据存储在可远程访问的FTP服务器上。Amazon S3(简单存储服务)文件系统Windows Azure存储Blob(WASB)文件系统。Hadoop 3.x - 它支持所有前面以及Microsoft Azure Data Lake文件系统。
Datanode资源Hadoop 2.x - Datanode资源不专用于MapReduce,我们可以将它用于其他应用程序。Hadoop 3.x - 此处数据节点资源也可用于其他应用程序。
MR API兼容性Hadoop 2.x - 与Hadoop 1.x程序兼容的MR API,可在Hadoop 2.X上执行Hadoop 3.x - 此处,MR API与运行Hadoop 1.x程序兼容,以便在Hadoop 3.X上执行
HDFS联盟Hadoop 2.x - 在Hadoop 1.0中,只有一个NameNode来管理所有Namespace,但在Hadoop 2.0中,多个NameNode用于多个Namespace。Hadoop 3.x - Hadoop 3.x还有多个名称空间用于多个名称空间。
更快地访问数据Hadoop 2.x - 由于数据节点缓存,我们可以快速访问数据。Hadoop 3.x - 这里也通过Datanode缓存我们可以快速访问数据。
平台Hadoop 2.x - 可以作为各种数据分析的平台,可以运行事件处理,流媒体和实时操作。Hadoop 3.x - 这里也可以在YARN的顶部运行事件处理,流媒体和实时操作。

2、端口

应用Haddop 2.x portHaddop 3.x port
Namenode80209820
NN HTTP UI500709870
NN HTTPS UI504709871
SNN HTTP500919869
SNN HTTP UI500909868
DN IPC500209867
DN500109866
DN HTTP UI500759864
Datanode504759865

其余端口没有变化,reourcemanager网页端口 8088

四、HDFS常用命令

(此时操作的是hdfs的目录,并不是Linux的目录)

1、帮助命令

hdfs dfs --help

查看

Usage: hadoop fs [generic options]
	[-appendToFile <localsrc> ... <dst>]
	[-cat [-ignoreCrc] <src> ...]
	[-checksum [-v] <src> ...]
	[-chgrp [-R] GROUP PATH...]
	[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
	[-chown [-R] [OWNER][:[GROUP]] PATH...]
	[-concat <target path> <src path> <src path> ...]
	[-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>]
	[-copyToLocal [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] [-s] <path> ...]
	[-cp [-f] [-p | -p[topax]] [-d] <src> ... <dst>]
	[-createSnapshot <snapshotDir> [<snapshotName>]]
	[-deleteSnapshot <snapshotDir> <snapshotName>]
	[-df [-h] [<path> ...]]
	[-du [-s] [-h] [-v] [-x] <path> ...]
	[-expunge [-immediate] [-fs <path>]]
	[-find <path> ... <expression> ...]
	[-get [-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>]
	[-getfacl [-R] <path>]
	[-getfattr [-R] {-n name | -d} [-e en] <path>]
	[-getmerge [-nl] [-skip-empty-file] <src> <localdst>]
	[-head <file>]
	[-help [cmd ...]]
	[-ls [-C] [-d] [-h] [-q] [-R] [-t] [-S] [-r] [-u] [-e] [<path> ...]]
	[-mkdir [-p] <path> ...]
	[-moveFromLocal [-f] [-p] [-l] [-d] <localsrc> ... <dst>]
	[-moveToLocal <src> <localdst>]
	[-mv <src> ... <dst>]
	[-put [-f] [-p] [-l] [-d] [-t <thread count>] <localsrc> ... <dst>]
	[-renameSnapshot <snapshotDir> <oldName> <newName>]
	[-rm [-f] [-r|-R] [-skipTrash] [-safely] <src> ...]
	[-rmdir [--ignore-fail-on-non-empty] <dir> ...]
	[-setfacl [-R] [{-b|-k} {-m|-x <acl_spec>} <path>]|[--set <acl_spec> <path>]]
	[-setfattr {-n name [-v value] | -x name} <path>]
	[-setrep [-R] [-w] <rep> <path> ...]
	[-stat [format] <path> ...]
	[-tail [-f] [-s <sleep interval>] <file>]
	[-test -[defswrz] <path>]
	[-text [-ignoreCrc] <src> ...]
	[-touch [-a] [-m] [-t TIMESTAMP (yyyyMMdd:HHmmss) ] [-c] <path> ...]
	[-touchz <path> ...]
	[-truncate [-w] <length> <path> ...]
	[-usage [cmd ...]]

Generic options supported are:
-conf <configuration file>        specify an application configuration file
-D <property=value>               define a value for a given property
-fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations.
-jt <local|resourcemanager:port>  specify a ResourceManager
-files <file1,...>                specify a comma-separated list of files to be copied to the map reduce cluster
-libjars <jar1,...>               specify a comma-separated list of jar files to be included in the classpath
-archives <archive1,...>          specify a comma-separated list of archives to be unarchived on the compute machines

The general command line syntax is:
command [genericOptions] [commandOptions]

整体命令类似于Linux

2、HDFS上传下载操作

​ 2.1、上传

hdfs dfs -put [src file] [HDFS path]

​ 2.2、下载

hdfs dfs -get [HDFS path] [local path]

3、查看文件系统健康状态

hdfs dfsadmin -report

4、安全模式

安全模式是hadoop的一种保护机制,用于保证集群中的数据块的安全性。当hdfs进入安全模式时不允许客户端进行任何修改文件的操作,包括上传文件,删除文件,重命名,创建文件夹等操作。

当集群启动的时候,会首先进入安全模式。当系统处于安全模式时会检查数据块的完整性。假设我们设置的副本数(即参数dfs.replication)是5,那么在datanode上就应该有5个副本存在,假设只存在3个副本,那么比例就是3/5=0.6。通过配置dfs.safemode.threshold.pct定义最小的副本率,默认为0.999。

​ 4.1、查看安全模式

hdfs dfsadmin -safemode get

​ 4.2、强制进入安全模式

hdfs dfsadmin -safemode enter

​ 4.3、强制离开安全模式

hdfs dfsadmin -safemode leave

五、Java操作HDFS

1、pom导入依赖

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.10.1</version>
</dependency>

2、编码(使用模板方法设计模式)

​ 2.1、Template接口

import org.apache.hadoop.fs.FileSystem;

public interface Template {

    public void template(FileSystem fileSystem) throws Exception;

}

​ 2.2、Template实现类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;

import java.net.URI;

public class HadoopTemplate {

    public static void exec(URI uri, Configuration configuration, String user, Template template)throws Exception{
        //1.获取到客户端对象
        FileSystem fileSystem = FileSystem.get(uri, configuration, user);

        //2.执行操作
        template.template(fileSystem);

        //3.关闭资源
        fileSystem.close();
    }
}

​ 2.3、调用类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.net.URI;

public class Test01 {

    public static void main(String[] args)throws Exception {
        String uri = "hdfs://172.16.106.56:9000";
        String user = "root";
        String path = "/test/input/a.txt";

        HadoopTemplate.exec(new URI(uri), new Configuration(), user, new Template() {
            @Override
            public void template(FileSystem fileSystem) throws Exception {
                //执行操作
                FSDataInputStream stream = fileSystem.open(new Path(path));
                byte[] bytes = new byte[1024];
                int length = 0;
                while ((length = stream.read(bytes)) != -1){
                    System.out.println(new String(bytes, 0, length));
                }
                stream.close();
            }
        });

    }
}

可对HDFS上的文件及目录进行增删改查操作

六、MapReduce

使用新API写法

1、MR程序

import lombok.extern.java.Log;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class WordCount extends Configured implements Tool {
    /** 一次读取一行数据
	泛型LongWritable, Text, Text, IntWritable
    LongWritable为偏移量,就是每行数据开始的下标
    Text为当前行
    Text, IntWritable Map阶段输出的k,v类型(自定义)
    */
    public static class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
		
        private Text outKey = new Text();
        private IntWritable outValue = new IntWritable(1);

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 指定行内每个字段的分割符
            String[] words = value.toString().split(" ");
            for (String word : words) {
                outKey.set(word);
                // 循环写出给reduce处理
                context.write(outKey, outValue);
            }
        }
    }

    // Text, IntWritable 对应Map阶段的输出k,v
    // Text, IntWritable reduce阶段输出的k,v类型
    public static class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
		/** 
		传进来的数据类似于
		key:hello
		values:(1,1,1)
		key为map阶段设置的key,values为map阶段输出的所有同一个key的value组成的可迭代对象
		*/
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(key, new IntWritable(sum));
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        // 获取job
        Job job = Job.getInstance(this.getConf());

        //设置jar包路径
        job.setJarByClass(this.getClass());

        //关联mapper和reducer
        job.setMapperClass(WCMapper.class);
        job.setReducerClass(WCReducer.class);

        //设置map输出的k,v类型(对应mapper的输出类型)
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //设置最终输出的k,v类型(对应reducer输出类型)
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //设置输入、输出路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //提交job
        boolean result = job.waitForCompletion(true);
        return result ? 0 : 1;
    }
	
    // args参数为命令行下输入的运行时参数
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        Path output = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(configuration);

        //判断HDFS上是否存在输出路径,有则删除
        if(fileSystem.exists(output)){
            System.out.println("==========目录已存在,执行删除==========");
            fileSystem.delete(output, true);
        }

        //运行
        int status = ToolRunner.run(configuration, new WordCount(), args);
        System.exit(status);
    }
}

2、自定义对象作为key或者value

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

// lombok自动生成get/set方法
@Data
// lombok自动生成无参构造器
@NoArgsConstructor
// lombok自动生成全参构造器
@AllArgsConstructor
/**
bean对象需要实现hadoop序列化接口
作为map/reduce的value只需要实现 Writable 接口,实现write和readFields方法
作为map/reduce的key需要再实现Comparable<T>比较接口,可直接实现WritableComparable<T>达到效果
*/
public class User implements WritableComparable<FlowBean> {

    private String name;
    private Integer age;

    // write和readFields属性顺序必须一致
    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.name);
        dataOutput.writeInt(this.age);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.name = dataInput.readUTF();
        this.age = dataInput.readInt;
    }
	
    // 重写object的toString方法,不然MR输出的对象为内存地址
    @Override
    public String toString() {
        return this.name + "\t" + this.age;
    }
	
    // Comparable<T>的比较方法,用于key排序
    @Override
    public int compareTo(FlowBean o) {
        return totalFlow.compareTo(o.getTotalFlow());
    }
}

3、MR其余操作

​ 3.1、多个小文件

job.setInputFormatClass(CombineTextInputFormat.class);

​ 通过设置InpuFormat类型处理,不设置则会几个文件启动几个maptask 即使该文件没有达到切片大小

​ 3.2、分区,reduce之后输出的文件

/**
Text, FlowBean为map阶段输出的k,v
通过方法return数字决定输出文件的分区,从0开始
*/
public static class FlowPartitioner extends Partitioner<Text, FlowBean>{
        @Override
        public int getPartition(Text text, FlowBean flowBean, int i) {
            return 0;
        }
}

​ 在run方法中设置

// 有几个分区就设置几个reduceTask
job.setNumReduceTasks(1);
// 分区的实现类
job.setPartitionerClass(FlowPartitioner.class);

​ 3.3、reduce之前预聚合,MR优化(全局排序等不能使用)

// 可直接把自定义的redue类的class放入
job.setCombinerClass(mrEnum.getReducerClass());

4、输出到database

import mapreducer.DBWordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import entity.DBWCBean;

public class File2DB extends Configured implements Tool {
    
    public static class DBMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        private Text outKey = new Text();
        private IntWritable outValue = new IntWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            String[] words = value.toString().split(" ");
            for (String word : words) {
                outKey.set(word);
                context.write(outKey, outValue);
            }
        }
    }

    public static class DBReducer extends Reducer<Text, IntWritable, DBWCBean, NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Reducer<Text, IntWritable, DBWCBean, NullWritable>.Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable value : values) {
                sum += value.get();
            }
            context.write(new DBWCBean(key.toString(), sum), NullWritable.get());
        }
    }
    
    @Override
    public int run(String[] args) throws Exception {
        DBConfiguration.configureDB(this.getConf(), "com.mysql.cj.jdbc.Driver", "jdbc:mysql://database:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC", "root", "123456");

        Job job = Job.getInstance(this.getConf());
        job.setJarByClass(this.getClass());
        job.setMapperClass(DBMapper.class);
        job.setReducerClass(DBReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(DBWCBean.class);
        job.setOutputValueClass(NullWritable.class);

        job.setOutputFormatClass(DBOutputFormat.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        DBOutputFormat.setOutput(job, "wc", "word", "count");

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        System.exit(ToolRunner.run(configuration, new File2DB(), args));
    }
}

5、可以自定义OutputFormat

​ 5.1、继承FileOutputFormat<K,V>,实现getRecordWriter

​ 5.2、继承RecordWriter<K,V>,并对其按业务需求进行实现

不做演示

6、MR实现Join操作

​ 6.1、reduce端实现

​ 实例文件

​ order.xt

01 1001 1
02 1002 2
03 1003 3
01 1004 4
02 1005 5
03 1006 6

​ pd.txt

01 小米
02 华为
03 苹果

​ 自定义bean对象(包含所有值)

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

@Data
@NoArgsConstructor
@AllArgsConstructor
public class TableJoinBean implements WritableComparable<TableJoinBean> {

    private String orderId = "";
    private String productName = "";
    private int sum;
    private boolean flag;

    public TableJoinBean(String orderId, int sum){
        this.orderId = orderId;
        this.sum = sum;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(this.orderId);
        dataOutput.writeUTF(this.productName);
        dataOutput.writeInt(this.sum);
        dataOutput.writeBoolean(this.flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.orderId = dataInput.readUTF();
        this.productName = dataInput.readUTF();
        this.sum = dataInput.readInt();
        this.flag = dataInput.readBoolean();
    }

    @Override
    public int compareTo(TableJoinBean o) {
        return this.orderId.compareTo(o.getOrderId());
    }

    @Override
    public String toString() {
        return orderId + '\t' + productName + '\t' + sum;
    }
}

​ Reduce&Mapper

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;
import java.util.ArrayList;

public class TableJoin {

    public static class TJMapper extends Mapper<LongWritable, Text, Text, TableJoinBean>{
        private String fileName;
        private Text k = new Text();
        private TableJoinBean v = new TableJoinBean();

        // map方法执行前,并且只执行一下   map方法每来一行数据执行一次
        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            FileSplit inputSplit = (FileSplit) context.getInputSplit();
            // 获取当前文件的名称
            fileName = inputSplit.getPath().getName();
        }

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] strings = value.toString().split(" ");

            k.set(strings[0]);
            // 对象文件名称进行判断,并给对象对应的属性赋值
            v.setFlag(fileName.contains("order"));
            if(v.isFlag()){
                v.setOrderId(strings[1]);
                v.setSum(Integer.parseInt(strings[2]));
            }else {
                v.setProductName(strings[1]);
            }

            context.write(k, v);
        }
    }

    public static class TJReducer extends Reducer<Text, TableJoinBean, Text, TableJoinBean>{
        @Override
        protected void reduce(Text key, Iterable<TableJoinBean> values, Context context) throws IOException, InterruptedException {
            String productName = "";
            ArrayList<TableJoinBean> tableJoinBeans = new ArrayList<>();
			
            // 对同一key的对象属性进行聚合操作
            for (TableJoinBean value : values) {
                if(value.isFlag()){
                    // 因为Hadoop重写了Iterable,复用对象以达到优化的效果  所有得每次得重新创建对象
                    tableJoinBeans.add(new TableJoinBean(value.getOrderId(), value.getSum()));
                }else {
                    productName = value.getProductName();
                }
            }

            for (TableJoinBean tableJoinBean : tableJoinBeans) {
                tableJoinBean.setProductName(productName);
                context.write(key, tableJoinBean);
            }
        }
    }

}

​ 6.2、Map端实现(效率会比reduce端快,mapTask数量多余reduceTask)

​ 思路:

​ 不需要reduce,设置reduceTask为0

job.setNumReduceTasks(0);

​ 在map端把较小的一张表加载为缓存,并把k,v用map存起来

job.addCacheFile(new URL("HDFS上文件的路径,或者本地文件的路径"));

setup()方法中:

  1. 获取缓存的文件

    URL[] cacheFiles = context.getCacheFiles();
    FileSystem fs = FileSystem.get(context.getConfiguration());
    FSDataInputStream fis = fs.open(new Path(cacheFiles[0]));
    
  2. 循环读取文件的一行

    BufferdReader reader = new BufferedReader(new InputStreamReader(fis, "UTF-8"));
    
    // java io流读取操作
    while(...){
        ...
    }
    
  3. 切割

  4. 缓存数据到HashMap

  5. 关闭流

map()方法中:

  1. 获取一行
  2. 截取
  3. 获取对应HashMap的key
  4. 获取其余值
  5. 拼接
  6. 写出

七、压缩

1、MR支持的压缩编码

压缩格式Hadoop自带算法文件扩展名是否支持切片是否修改程序
DEFAULTDEFAULT.default
GzipDEFAULT.gz
bzip2bzip2.bz2
LZO否(需要安装)LOZ.loz需要创建索引,指定输入格式
SnappySnappy`.snappy

2、输入端

​ 2.1、数据量小于块大小,优先考虑压缩速度 LZO/Snappy

​ 2.2、数据量大,优先考虑支持切片 Bzip/LZO

3、mapper输出端

​ 重点考虑压缩/解压速度 LZO/Snappy

4、Reduce输出端

​ 4.1、如果数据永久保存,考虑压缩率较高的 Bzip2/Gzip

​ 4.2、如果传给下一个MR做处理需考虑数据量和是否支持切片

5、实际配置

参数默认值阶段
io.compression.codecs(core-site.xml)输入压缩
mapreduce.map.output.compress(mapred-site.xml)falsemapper输出
mapreduce.map.output.compress.codec(mapred-site.xml)org.apache.hadoop.io.compress.DefaultCodecmapper输出
mapreduce.output.fileoutputformat.compress(mapred-site.xml)falsereduce输出
mapreduce.output.fileoutputformat.compress.codec(mapred-site.xml)org.apache.hadoop.io.compress.DefaultCodecreduce输出

6、在Java代码中配置

...
Configuration conf = new Configuration();

// map输出端开启压缩
/**
conf.setBoolean("mapreduce.map.output.compress", true);
conf.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);
*/

// reduce输出端开启压缩
/**
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, Bzip2Codec.class);
*/

八、yarn常用命令

1、yarn application 查看任务

​ 1.1、列出所有任务

yarn application -list

​ 1.2、根据 application 状态过滤

​ 所有状态:ALL、NEW、NEW_SAVING、SUBMITTED、ACCEPTED、RUNNING、FINISHED、FAILED、KILLED

yarn application -list -appStates [状态]

​ 1.3、kill 掉 application

yarn application -kill [applicaionId]

​ 1.4、查询 application 日志

yarn logs -applicationId [applicationId]

​ 1.5、查看 container日志

yarn logs -applicationId [applicationId] -containerId [containerId]

​ 1.6、列出 application 尝试的列表

yarn applicationattempt -list [applicationId]

​ 1.7、打印 applicationAttemp状态

yarn applicationattempt -status [applicationAttemptId]

​ 1.8、列出所有容器

yarn container -list [applicationAttemptId]

​ 1.9、打印容器状态

yarn container -status [containerId]

​ 1.10、查看node状态

yarn node -list -all

​ 1.11、加载队列配置

yarn remadmin -reefreshQueues

​ 1.12、查看队列

yarn queue -status [queueName]

九、yarn生产环境核心参数配置

1、resourceManager相关

参数作用
yarn.resourcemanager.scheduler.class配置调度器
yarn.resourcemanager.scheduler.client.thread.countresourcemanager处理的线程数,默认50

2、nodeManager相关(每个节点单独配置)

参数作用
yarn.nodemanager.resource.detect-hardware-capabilitiesyarn自己检查硬件进行配置,默认false
yarn.nodemanager.resource.count-logical-processors-as-cores是否将虚拟核数当中cpu核数,默认false
yarn.nodemanager.resource.pcores-vcores-multiplier虚拟核数与物理核数的比值,默认1.0
yarn.nodemanager.resource.memory-mbnodeManager使用内存,默认8G
yarn.nodemanager.resource.system-reserved-memore-mbnodeManager为系统保留多少内存
yarn.nodemanager.resource.cpu-vcoresnodeManager使用CPU核数,默认8
yarn.nodemanager.pmem-check-enabled是否开启物理内存检查限制container,默认true
yarn.nodemanager.vmem-check-enabled是否开启虚拟内存检查限制container,默认true
yarn.nodemanager.vmem-pmem-ratio虚拟内存物理内存比例,默认2.1

3、container相关

参数作用
yarn.scheduler.minimum-allocation-mb容器最小内存,默认1G
yarn.scheduler.maximum-allocation-mb容器最大内存,默认8G
yarn.scheduler.minimum-allocation-vcores容器最小cpu核数,默认1个
yarn.scheduler.maximum-allocation-vcores容器最大cpu核数,默认4个

es-vcores-multiplier | 虚拟核数与物理核数的比值,默认1.0 |
| yarn.nodemanager.resource.memory-mb | nodeManager使用内存,默认8G |
| yarn.nodemanager.resource.system-reserved-memore-mb | nodeManager为系统保留多少内存 |
| yarn.nodemanager.resource.cpu-vcores | nodeManager使用CPU核数,默认8 |
| yarn.nodemanager.pmem-check-enabled | 是否开启物理内存检查限制container,默认true |
| yarn.nodemanager.vmem-check-enabled | 是否开启虚拟内存检查限制container,默认true |
| yarn.nodemanager.vmem-pmem-ratio | 虚拟内存物理内存比例,默认2.1 |

3、container相关

参数作用
yarn.scheduler.minimum-allocation-mb容器最小内存,默认1G
yarn.scheduler.maximum-allocation-mb容器最大内存,默认8G
yarn.scheduler.minimum-allocation-vcores容器最小cpu核数,默认1个
yarn.scheduler.maximum-allocation-vcores容器最大cpu核数,默认4个

标签:05,org,hadoop,yarn,Hadoop,简单,apache,import,class
From: https://blog.csdn.net/BIN_2011464841/article/details/139471944

相关文章

  • nuxt简单入门安装
    参考:https://www.jianshu.com/p/fd99718a63e9@目录概要具体流程小结概要听说直接使用vue写前端对百度的seo不够友好,于是便考虑使用nuxt生成静态化来处理具体流程首先你的本机环境要有npm,如下图然后可以使用npx安装nuxt,npx是npm5点几就支持的了,但是我这一开始还不行,还需要手......
  • 有什么方法可以批量把图片转BMP格式?几个操作起来简单且效果好的方法
    图片转BMP怎么弄?在数字图像处理中,不同的文件格式具有不同的特点和适用场景。BMP是一种常用的图像文件格式,以其高质量的图像表现和简单的结构而广受欢迎。有时,我们可能需要将其他格式的图片转换为BMP格式以满足特定需求。本文将为大家详细介绍几个方法轻松实现图片转BMP的操作,感......
  • 【JS】JavaScript编程语言-谷歌浏览器调试之前端代码(2024-06-05)
    1、在浏览器中调试调试是指在一个脚本中找出并修复错误的过程。所有的现代浏览器和大多数其他环境都支持调试工具——开发者工具中的一个令调试更加容易的特殊用户界面。它也可以让我们一步步地跟踪代码以查看当前实际运行情况。在这里我们将会使用Chrome(谷歌浏览器)。2......
  • 操作简单中医电子处方中药划价系统软件视频教程,佳易王诊所电子处方管理系统软件
    操作简单中医电子处方中药划价系统软件视频教程,佳易王诊所电子处方管理系统软件一、前言以下软件操作教程以,佳易王中西医诊所电子处方软件为例说明软件文件下载可以点击最下方官网卡片——软件下载——试用版软件下载 1、软件支持中医和西医处方开单2、系统自动计算费用......
  • AI推介-多模态视觉语言模型VLMs论文速览(arXiv方向):2024.05.25-2024.05.31
    文章目录~1.EmpoweringVisualCreativity:AVision-LanguageAssistanttoImageEditingRecommendations2.Bootstrap3D:Improving3DContentCreationwithSyntheticData3.Video-MME:TheFirst-EverComprehensiveEvaluationBenchmarkofMulti-modalLLMsin......
  • perfers-color-scheme 使用简单介绍
    perfers-color-scheme简介prefers-color-scheme媒体查询属性用于检测用户操作系统是否使用深色模式。属性值dark表示用户操作系统使用深色模式light表示用户操作系统使用浅色模式no-preference表示用户操作系统没有偏好,或者操作系统不支持该属性示例@media(prefers......
  • 百万级表格实测,excel是时侯使用XLOOKUP了,比VLOOKUP简单、不出错、快
    使用场景:用“证件号码”查找“Sheet4表”(272181行)的“姓名”列,填充到“Sheet1”表的空白列中。为了演示传说中的二进制查找,我们预先对“Sheet4”进行了按“证件号码”排序。1、VLOOKUP:我们第一次使用公式为:=VLOOKUP(B2,Sheet4!$A$2:$B$272181,1,FALSE),这个公式并没有找到数据......
  • Hadoop完全分布式安装
    Hadoop完全分布式安装一.集群搭建前期准备1.三台机器防火墙都是关闭的2.确保三台机器网络配置畅通3.三台机器确保/etc/hosts⽂件配置了ip和hostname的映射关系4.确保三台机器配置了ssh免密登录认证二.前期环境搭建免密登录1.修改主机名为server1,配置hosts文件vi/e......
  • 简单4步,带你用华为云MetaStudio制作数字人短片
    本文分享自华为云社区《使用MetaStudio生产线四步制作数字人视频》,作者:yd_298097624。随着AIGC新技术尤其是大模型技术的发展,音视频行业、数字内容生产行业正在经历这从生产方式和生产效率上的一个巨大变化。预测到2030年有AI大模型生成的数字内容比例将高达90%,包括通过AIGC来生......
  • 带DSP音效处理D类数字功放TAS5805M中文资料
    国产替代D类数字功放中文资料访问下方链接ACM86282×41W立体声1×82W单通道数字功放中文寄存器表内置DSP多种音频处理效果ACM8628M-2×41W立体声或1×82W单通道数字功放1特性具有增强处理能力和低功率损耗的TAS5805M23W、无电感器、数字输入、立体声、闭环D类音频......