首页 > 其他分享 >16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN

16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN

时间:2023-05-15 19:06:05浏览次数:34  
标签:自定义 示例 hadoop job import apache org 序列化 class




文章目录

  • Hadoop系列文章目录
  • 一、pom.xml与测试数据说明、日志配置
  • 1、pom.xml
  • 2、数据字段说明
  • 3、日志配置
  • 二、序列化
  • 1、需求
  • 2、实现说明
  • 3、实现
  • 1)、bean
  • 2)、Mapper
  • 3)、Reducer
  • 4)、Driver
  • 4、验证
  • 三、排序
  • 1、需求
  • 2、实现说明
  • 3、实现
  • 1)、bean
  • 2)、Mapper
  • 3)、Reducer
  • 4)、Driver
  • 4、验证
  • 四、分区
  • 1、需求
  • 2、实现说明
  • 3、实现
  • 1)、bean
  • 2)、Mapper
  • 3)、Reducer
  • 4)、分区
  • 5)、Driver
  • 4、验证
  • 五、分组
  • 1、需求
  • 2、实现说明
  • 3、实现
  • 1)、bean
  • 2)、Mapper
  • 3)、Reducer
  • 4)、分组
  • 5)、Driver
  • 4、验证
  • 六、topN
  • 1、需求
  • 2、实现说明
  • 3、实现
  • 1)、bean
  • 2)、Mapper
  • 3)、Reducer
  • 4)、分组
  • 5)、Driver
  • 4、验证



本文介绍MapReduce常见的基本用法。
前提是hadoop环境可正常运行。
本文分为五个部分,即介绍自定义序列化、排序、分区、分组和topN。

一、pom.xml与测试数据说明、日志配置

1、pom.xml

<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.22</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-client</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-hdfs</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
			<version>3.1.4</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-mapreduce-client-core</artifactId>
			<version>3.1.4</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.springframework/spring-core -->
		<dependency>
			<groupId>org.springframework</groupId>
			<artifactId>spring-core</artifactId>
			<version>2.5.6</version>
		</dependency>

2、数据字段说明

date(日期),county(县),state(州),fips(县编码code),cases(累计确诊病例),deaths(累计死亡病例)

16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN_大数据

3、日志配置

log4j.properties文件放在resources目录下。log4j.properties内容如下:

# Define some default values that can be overridden by system properties
hadoop.root.logger=INFO,console
hadoop.log.dir=.
hadoop.log.file=hadoop.log

# Define the root logger to the system property "hadoop.root.logger".
log4j.rootLogger=${hadoop.root.logger}, EventCounter

# Logging Threshold
log4j.threshold=ALL

# Null Appender
log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender

#
# Rolling File Appender - cap space usage at 5gb.
#
hadoop.log.maxfilesize=256MB
hadoop.log.maxbackupindex=20
log4j.appender.RFA=org.apache.log4j.RollingFileAppender
log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}

log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}

log4j.appender.RFA.layout=org.apache.log4j.PatternLayout

# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
# Debugging Pattern format

# Daily Rolling File Appender
#

log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}

# Rollover at midnight
log4j.appender.DRFA.DatePattern=.yyyy-MM-dd

log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout

# Pattern format: Date LogLevel LoggerName LogMessage
log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n

#
# TaskLog Appender
#
log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender

log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n

log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter

二、序列化

1、需求

统计美国2021-01-28,每个州state累计确诊案例数、累计死亡案例数

2、实现说明

自定义对象CovidBean,用于封装每个州的确诊病例数和死亡病例数。
以州作为map阶段输出的key,以CovidBean作为value,这样属于同一个州的数据就会变成一组进行reduce处理,进行累加即可得出每个州累计确诊病例。

3、实现

1)、bean

import org.apache.hadoop.io.Writable;

import lombok.Data;

@Data
public class CovidBean implements Writable {
	private String state;
	private long cases;
	private long deaths;

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(state);
		out.writeLong(cases);
		out.writeLong(deaths);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.state = in.readUTF();
		this.cases = in.readLong();
		this.deaths = in.readLong();
	}

	public String toString() {
		return this.cases + "," + this.deaths;
	}
}

2)、Mapper

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.hadoop.mr.covid.bean.CovidBean;

//KEYIN, VALUEIN, KEYOUT, VALUEOUT
public class EachStateMapper extends Mapper<LongWritable, Text, Text, CovidBean> {

	Text outKey = new Text();
	CovidBean outValue = new CovidBean();

//	2021-01-28,Autauga,Alabama,01001,5554,69
	/**
	 * LongWritable key 行的偏移量
	 * Text value  每行值
	 * Context context 上下文
	 */
	@Override
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		//根据每行的数据标志进行截取
		String values[] = value.toString().split(",");

		//输出key赋值
		outKey.set(values[2]);

		//输出value赋值
		outValue.setState(values[2]);
		outValue.setCases(Long.parseLong(values[values.length - 2]));
		outValue.setDeaths(Long.parseLong(values[values.length - 1]));

		//将输出key-value输出
		context.write(outKey, outValue);
	}
}

3)、Reducer

import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.hadoop.mr.covid.bean.CovidBean;

//KEYIN,VALUEIN,KEYOUT,VALUEOUT
public class EachStateReducer extends Reducer<Text, CovidBean, Text, CovidBean> {
	
	/**
	 * Text key map的输出key
	 * Iterable<CovidBean> values 根据key分组后的value,类型是map的输出value类型
	 * Context context 上下文
	 */
	@Override
	protected void reduce(Text key, Iterable<CovidBean> values, Context context) throws IOException, InterruptedException {

		long cases = 0, deaths = 0;
		
		CovidBean outValue = new CovidBean();
		for (CovidBean cb : values) {
			cases += cb.getCases();
			deaths += cb.getDeaths();
		}
		
		outValue.setState(key.toString());
		outValue.setCases(cases);
		outValue.setDeaths(deaths);
		
		context.write(key, outValue);

	}
}

4)、Driver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.hadoop.mr.covid.bean.CovidBean;

/**
 * @author alanchan 
 *  
 */
public class EachStateDriver extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/covid";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new EachStateDriver(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf(), EachStateDriver.class.getSimpleName());

		job.setJarByClass(EachStateDriver.class);

		job.setMapperClass(EachStateMapper.class);
		job.setReducerClass(EachStateReducer.class);

		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(CovidBean.class);

		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(CovidBean.class);

		FileInputFormat.addInputPath(job, new Path(in));
		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}
		FileOutputFormat.setOutputPath(job, new Path(out));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}

4、验证

输出结果如下:

16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN_mapreduce_02


以上完成了基本的计算,主要是展示自定义对象实现序列化。

三、排序

1、需求

将美国2021-01-28,每个州state的确诊案例数进行倒序排序。

2、实现说明

MapReduce中key有默认(按字典序升序)排序行为。

  • 如果是正序,且数据类型是Hadoop封装好的类型,这种情况下不需要修改,直接使用Hadoop类型作为key即可。
  • 如果是倒序,或者数据类型是自定义对象。需要重写排序规则。对象实现Comparable接口重写CompareTo方法。

    compareTo方法用于将当前对象与方法的参数进行比较。
  • 如果指定的数小于参数返回 -1。
  • 如果指定的数大于参数返回 1。
    例如:o1.compareTo(o2);
    返回正数的话,当前对象(调用compareTo方法的对象o1)要排在比较对象(compareTo传参对象o2)后面,返回负数的话,放在前面。
    本示例需要按照州进行倒序排序,如此,则需要自己实现排序。

3、实现

1)、bean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.WritableComparable;
import lombok.Data;

@Data
public class CovidBean implements WritableComparable<CovidBean> {
	private String state;
	private long cases;
	private long deaths;

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(state);
		out.writeLong(cases);
		out.writeLong(deaths);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.state = in.readUTF();
		this.cases = in.readLong();
		this.deaths = in.readLong();
	}

	public String toString() {
		return this.state + "\t" + this.cases + "\t" + this.deaths;

	}

//	  /** Compares two LongWritables. */
//	  @Override
//	  public int compareTo(LongWritable o) {
//	    long thisValue = this.value;
//	    long thatValue = o.value;
//	    return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
//	  }
	@Override
	public int compareTo(CovidBean o) {
		long thisCases = this.cases;
		long thatCases = o.getCases();
		int result = 0;
		result = (thisCases > thatCases ? -1 : (thisCases == thatCases ? 0 : 1));
		return result;
	}
}

2)、Mapper

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EachStateDescSortMapper extends Mapper<LongWritable, Text, CovidBean, NullWritable> {
	CovidBean outKey = new CovidBean();

	// 数据样式
//	Alabama	452734	7340
//	Arizona	745976	12861
//	Arkansas	290856	4784
//	California	3272207	39521
	@Override
	public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		// 根据每行的数据标志进行截取
		String values[] = value.toString().split("\t");
				
		// 输出key赋值
		outKey.setState(values[0]);
		outKey.setCases(Long.parseLong(values[1]));
		outKey.setDeaths(Long.parseLong(values[2]));

		// 将输出key-value输出
		context.write(outKey, NullWritable.get());
	}

}

3)、Reducer

import java.io.IOException;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class EachStateDescSortReducer extends Reducer<CovidBean, NullWritable, CovidBean, NullWritable> {
	// 数据样式
//	Alabama	452734	7340
//	Arizona	745976	12861
//	Arkansas	290856	4784
//	California	3272207	39521
	@Override
	protected void reduce(CovidBean key, Iterable<NullWritable> values, Context context)
			throws IOException, InterruptedException {
		// 由于map的输出仅仅是key的输出,故value的值为空,
		// 并且本例仅仅是需要key,且是针对key值的部分进行倒序排列好了,故直接输出key即可
		context.write(key, NullWritable.get());
	}
}

4)、Driver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author alanchan
 * 
 * 每个州state的确诊案例数进行倒序排序
 */
public class EachStateDescSortDriver extends Configured implements Tool {
	//本示例是在req1的基础上做的,即需要req1的输出文件
	static String in = "D:/workspace/bigdata-component/hadoop/test/out/covid/req1";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/covid/descsort";

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new EachStateDescSortDriver(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf(), EachStateDescSortDriver.class.getSimpleName());

		job.setJarByClass(EachStateDescSortDriver.class);

		job.setMapperClass(EachStateDescSortMapper.class);
		job.setReducerClass(EachStateDescSortReducer.class);

		// map阶段输出的key-value类型
		job.setMapOutputKeyClass(CovidBean.class);
		job.setMapOutputValueClass(NullWritable.class);

		// reducer阶段输出的key-value类型
		job.setOutputKeyClass(CovidBean.class);
		job.setOutputValueClass(NullWritable.class);

		FileInputFormat.addInputPath(job, new Path(in));
		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}

		FileOutputFormat.setOutputPath(job, new Path(out));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}

4、验证

16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN_大数据_03


以上,则完成了倒序排序操作。

四、分区

分区个数是由reducer的task数量决定的,即一个task对应一个输出结果。如果希望按照一定规则的输出到不同的文件中,则需要根据一定的分区规则定义task的数量。如果分区规则不适用,则需要自定义分区规则。

1、需求

将美国疫情数据不同州的输出到不同文件中,属于同一个州的各个县输出到同一个结果文件中

2、实现说明

hadoop默认的分区实现

package org.apache.hadoop.mapreduce.lib.partition;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Partitioner;

/** Partition keys by their {@link Object#hashCode()}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class HashPartitioner<K, V> extends Partitioner<K, V> {

  /** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,int numReduceTasks) {
      //& Integer.MAX_VALUE 是避免key.hashCode()是负数
    return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
  }

}

本示例通过自定义分区规则实现该需求。

3、实现

1)、bean

如果仅仅是实现本示例,可以不建立java bean即可完成。即使用上文中的bean即可。

2)、Mapper

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EachStateResultMapper extends Mapper<LongWritable, Text, Text, Text> {
	Text outKey = new Text();

//	数据格式
//	2021-01-28,Autauga,Alabama,01001,5554,69
//	2021-01-28,Baldwin,Alabama,01003,17779,225
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String[] line = value.toString().split(",");
		outKey.set(line[2]);
		context.write(outKey, value);
	}
}

3)、Reducer

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class EachStateResultReducer extends Reducer<Text, Text, NullWritable, Text> {

	protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
		for (Text value : values) {
			context.write(NullWritable.get(), value);
		}
	}
}

4)、分区

本示例仅仅为示例性的,列出了6个分区,如果超过6个,则会系统自动放入第七个分区。

import java.util.HashMap;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class EachStateResultPartition extends Partitioner<Text, Text> {
    
	public static HashMap<String, Integer> stateMap = new HashMap<String, Integer>();
	static {
		stateMap.put("Alabama", 0);
		stateMap.put("Arkansas", 1);
		stateMap.put("California", 2);
		stateMap.put("Florida", 3);
		stateMap.put("Indiana", 4);
		stateMap.put("Arizona", 5);
	}

	@Override
	public int getPartition(Text key, Text value, int numPartitions) {
		Integer code = stateMap.get(key.toString());
		if (code != null) {
               return code;
		}
		return 6;
	}
}

5)、Driver

该driver中,明确指定了数据分区class以及reducetask的数量

// 设置数据分区
job.setPartitionerClass(EachStateResultPartition.class);
// 设置reducer的任务数
job.setNumReduceTasks(7);

注意:
数据分区=reducetask数量,程序按照期望的结果输出到不同的结果文件中
数据分区>reducetask数量,程序会出错,不能正常的运行
数据分区<reducetask数量,程序正常运行,但会出现空的结果文件,即结果文件的大小为0

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**
 * @author alanchan 
 */
public class EachStateResultDriver extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/covid/result";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new EachStateResultDriver(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf(), EachStateResultDriver.class.getSimpleName());

		job.setJarByClass(EachStateResultDriver.class);

		job.setMapperClass(EachStateResultMapper.class);
		job.setReducerClass(EachStateResultReducer.class);

		// map阶段输出的key-value类型
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);

		// reducer阶段输出的key-value类型
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(Text.class);

		// 设置数据分区
		job.setPartitionerClass(EachStateResultPartition.class);
		// 设置reducer的任务数
		job.setNumReduceTasks(7);

		FileInputFormat.addInputPath(job, new Path(in));
		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}

		FileOutputFormat.setOutputPath(job, new Path(out));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}

4、验证

按照分区生成结果文件

stateMap.put(“Alabama”, 0);
 stateMap.put(“Arkansas”, 1);
 stateMap.put(“California”, 2);
 stateMap.put(“Florida”, 3);
 stateMap.put(“Indiana”, 4);
 stateMap.put(“Arizona”, 5);

16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN_大数据_04


16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN_分布式_05


上面的crc文件可以不用管,因为该示例,本人做了其他的例子。

以上,则完成了分区统计示例。

五、分组

  • 分组在发生在reduce阶段,决定了同一个reduce中哪些数据将组成一组去调用reduce方法处理
  • 默认分组规则是:key相同的就会分为一组(前后两个key直接比较是否相等)
  • 在reduce阶段进行分组之前,因为进行了数据排序,因此排序+分组将会使得key一样的数据一定被分到同一组,一组去调用reduce方法处理

1、需求

统计美国2021-01-28,每个州state的确诊案例数最多的县是哪一个。

2、实现说明

  • 在map阶段将“州state、县county、县确诊病例cases”通过自定义对象封装,作为key输出
  • 重写对象的排序规则,首先根据州的正序排序,如果州相等,按照确诊病例数cases倒序排序,发送到reduce
  • 在reduce端利用自定义分组规则,将州state相同的分为一组,然后取第一个即是最大值
  • 写类继承 WritableComparator,重写Compare方法。只要Compare方法返回为0,MapReduce框架在分组的时候就会认为前后两个相等,分为一组
  • 在job对象中进行设置,让自己的重写分组类生效。job.setGroupingComparatorClass(xxxx.class)

3、实现

1)、bean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import lombok.Data;

@Data
public class CovidBean implements WritableComparable<CovidBean> {
	private String state;
	private String country;
	private long cases;
	private long deaths;

	public String toString() {
		return this.state + "\t" + this.country + "\t" + this.cases + "\t" + this.deaths;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.state);
		out.writeUTF(this.country);
		out.writeLong(this.cases);
		out.writeLong(this.deaths);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.state = in.readUTF();
		this.country = in.readUTF();
		this.cases = in.readLong();
		this.deaths = in.readLong();
	}

	// 排序规则 根据州state正序进行排序 如果州相同 则根据确诊数量cases倒序排序
	@Override
	public int compareTo(CovidBean o) {
		int result = 0;
		int i = state.compareTo(o.getState());
		if (i > 0) {
			result = 1;
		} else if (i < 0) {
			result = -1;
		} else {
			// 确诊病例数倒序排序
			result = cases > o.getCases() ? -1 : 1;
		}
		return result;
	}

}

2)、Mapper

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

//KEYIN, VALUEIN, KEYOUT, VALUEOUT
public class EachStateGroupingMapper extends Mapper<LongWritable, Text, CovidBean, NullWritable> {
	CovidBean outKey = new CovidBean();

//	2021-01-28,Autauga,Alabama,01001,5554,69
//	2021-01-28,Baldwin,Alabama,01003,17779,225
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		String[] line = value.toString().split(",");
		outKey.setState(line[2]);
		outKey.setCountry(line[1]);
		outKey.setCases(Long.parseLong(line[line.length - 2]));
		outKey.setDeaths(Long.parseLong(line[line.length - 1]));
		context.write(outKey, NullWritable.get());
	}
}

3)、Reducer

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

//KEYIN, VALUEIN, KEYOUT, VALUEOUT
public class EachStateGroupingReducer extends Reducer<CovidBean, NullWritable, CovidBean, NullWritable> {

	protected void reduce(CovidBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
		context.write(key, NullWritable.get());
	}
}

4)、分组

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CovidBeanGroupingComparator extends WritableComparator {
	public CovidBeanGroupingComparator() {
		super(CovidBean.class, true);
	}

	public int compare(WritableComparable a, WritableComparable b) {
		CovidBean aBean = (CovidBean) a;
		CovidBean bBean = (CovidBean) b;
		return aBean.getState().compareTo(bBean.getState());
	}
}

5)、Driver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class EachStateGroupingDriver extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/covid/grouping";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new EachStateGroupingDriver(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf(), EachStateGroupingDriver.class.getSimpleName());

		job.setJarByClass(EachStateGroupingDriver.class);

		job.setMapperClass(EachStateGroupingMapper.class);
		job.setReducerClass(EachStateGroupingReducer.class);

		// map阶段输出的key-value类型
		job.setMapOutputKeyClass(CovidBean.class);
		job.setMapOutputValueClass(NullWritable.class);

		// reducer阶段输出的key-value类型
		job.setOutputKeyClass(CovidBean.class);
		job.setOutputValueClass(NullWritable.class);

		//設置分組規則
		job.setGroupingComparatorClass(CovidBeanGroupingComparator.class);

		FileInputFormat.addInputPath(job, new Path(in));
		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}

		FileOutputFormat.setOutputPath(job, new Path(out));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}

4、验证

16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN_mapreduce_06


以上完成了分组统计的功能。

六、topN

1、需求

找出美国2021-01-28,每个州state的确诊案例数最多前3个县

2、实现说明

  • 在map阶段将“州state、县county、县确诊病例cases”通过自定义对象封装,作为key输出
  • 重写对象的排序规则,首先根据州的正序排序,如果州相等,按照确诊病例数cases倒序排序,发送到reduce。
  • 在reduce端利用自定义分组规则,将州state相同的分为一组,然后取前N个即是TopN

3、实现

1)、bean

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
import lombok.Data;

@Data
public class CovidBean implements WritableComparable<CovidBean> {
	private String state;
	private String country;
	private long cases;
	private long deaths;

	public String toString() {
		return this.state + "\t" + this.country + "\t" + this.cases + "\t" + this.deaths;
	}

	@Override
	public void write(DataOutput out) throws IOException {
		out.writeUTF(this.state);
		out.writeUTF(this.country);
		out.writeLong(this.cases);
		out.writeLong(this.deaths);
	}

	@Override
	public void readFields(DataInput in) throws IOException {
		this.state = in.readUTF();
		this.country = in.readUTF();
		this.cases = in.readLong();
		this.deaths = in.readLong();
	}

	// 排序规则 根据州state正序进行排序 如果州相同 则根据确诊数量cases倒序排序
	@Override
	public int compareTo(CovidBean o) {
		int result = 0;
		int i = state.compareTo(o.getState());
		if (i > 0) {
			result = 1;
		} else if (i < 0) {
			result = -1;
		} else {
			// 确诊病例数倒序排序
			result = cases > o.getCases() ? -1 : 1;
		}
		return result;
	}

}

2)、Mapper

import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class EachStateTopNMapper extends Mapper<LongWritable, Text, CovidBean, NullWritable> {
	CovidBean outKey = new CovidBean();
	LongWritable outValue = new LongWritable();

//	2021-01-28,Autauga,Alabama,01001,5554,69
//	2021-01-28,Baldwin,Alabama,01003,17779,225
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

		String[] line = value.toString().split(",");
		outKey.setState(line[2]);
		outKey.setCountry(line[1]);
		outKey.setCases(Long.parseLong(line[line.length - 2]));
		outKey.setDeaths(Long.parseLong(line[line.length - 1]));

		outValue.set(Long.parseLong(line[line.length - 2]));
		context.write(outKey, NullWritable.get());

	}
}

3)、Reducer

import java.io.IOException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class EachStateTopNReducer extends Reducer<CovidBean, NullWritable, CovidBean, NullWritable> {
	protected void reduce(CovidBean key, Iterable<NullWritable> values, Context context)
			throws IOException, InterruptedException {
		int  topN = 0;
		for (NullWritable value : values) {
			if ( topN < 3) { // 输出每个州最多的前3个
				context.write(key, NullWritable.get());
				 topN++;
			} else {
				return;
			}
		}
		System.out.println("values=" + topN);
	}
}

4)、分组

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class CovidBeanGroupingComparator extends WritableComparator {
	public CovidBeanGroupingComparator() {
		super(CovidBean.class, true);
	}

	public int compare(WritableComparable a, WritableComparable b) {
		CovidBean aBean = (CovidBean) a;
		CovidBean bBean = (CovidBean) b;
		return aBean.getState().compareTo(bBean.getState());
	}
}

5)、Driver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class EachStateTopNDriver extends Configured implements Tool {
	static String in = "D:/workspace/bigdata-component/hadoop/test/in";
	static String out = "D:/workspace/bigdata-component/hadoop/test/out/covid/topn";

	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		int status = ToolRunner.run(conf, new EachStateTopNDriver(), args);
		System.exit(status);
	}

	@Override
	public int run(String[] args) throws Exception {
		Job job = Job.getInstance(getConf(), EachStateTopNDriver.class.getSimpleName());

		job.setJarByClass(EachStateTopNDriver.class);

		job.setMapperClass(EachStateTopNMapper.class);
		job.setReducerClass(EachStateTopNReducer.class);

		// map阶段输出的key-value类型
		job.setMapOutputKeyClass(CovidBean.class);
		job.setMapOutputValueClass(NullWritable.class);

		// reducer阶段输出的key-value类型
		job.setOutputKeyClass(CovidBean.class);
		job.setOutputValueClass(NullWritable.class);

		// 設置分組規則
		job.setGroupingComparatorClass(CovidBeanGroupingComparator.class);

		FileInputFormat.addInputPath(job, new Path(in));
		FileSystem fs = FileSystem.get(getConf());
		if (fs.exists(new Path(out))) {
			fs.delete(new Path(out), true);
		}

		FileOutputFormat.setOutputPath(job, new Path(out));

		return job.waitForCompletion(true) ? 0 : 1;
	}

}

4、验证

16、MapReduce的基本用法示例-自定义序列化、排序、分区、分组和topN_hadoop_07

至此,完成了MR的基本用法,其中示例中的数据来源于网上。


标签:自定义,示例,hadoop,job,import,apache,org,序列化,class
From: https://blog.51cto.com/alanchan2win/6280443

相关文章

  • 反序列化漏洞-Apache Shiro
    ApacheShiro一、shiro的相关介绍1、Shiro概述ApacheShiro是一个强大易用的Java安全框架,提供了认证、授权和会话管理等功能。Shiro框架直观、易用,同时也能提供更健壮的安全性。2、Shiro历史漏洞(Shiro-550)​ApacheShiro框架提供了记住我的功能(RememberMe),用户登录成......
  • C#中的对象序列化
    其实在以前的开发过程中,除了做爬虫用到json的时候用到序列化之外,其它地方我很少用到序列化。像我所处的上位机开发,硬件软件参数会经常发生更改,如果使用序列化,就会导致配置文件升级时候出现问题。至少以我目前的水平,还不知道其它更好的解决方案, 言归正传,在最近集成一个医院PAC......
  • 第十三篇——通达信指标公式绘图函数简介——自定义指标颜色、线型等(从零起步编写通达
    内容提要:本文简单介绍了通达信指标公式绘图函数的三种类型——画线函数、标记函数、修饰函数。 在之前的8篇文章中,介绍了通达信指标公式编写常用的18个函数。开始讲的几个函数比较简单,到后面一些函数理解起来都比较困难,更别说使用了,大家在看文章的时候可能也是云里雾里。学......
  • 常用设计模式之.Net示例代码合集
    每一次初学者粉丝朋友,在后台向我咨询编程问题,我除了给他们指导学习路线,我都会建议他们学完基础知识后,一定要要注重编程规范,学习设计模式,修炼内功。虽然说很多程序员,他们日常主要工作是CRUD,但是学习设计模式也是有助于学习公司的框架,另外设计模式是为了可重用代码、让代码更容易被......
  • C# opc ua/da通信源代码示例,应用简单直接可使用。 工业
    C#opcua/da通信源代码示例,应用简单直接可使用。工业上位机必备代码,不含界面,不含界面,不含界面,重要的事说三遍ID:1630626613944731......
  • AOP —— 切面类 —— 通用配置和自定义配置
    //通用配置/*@Pointcut("execution(**..*.*(..))")第一个*表示匹配所有返回值类型,包括void第二个*表示所有层级下的包第三个*表示当前包下的所有类最后一个*表示匹配任意方法名..表示0到多个参数**表示1或多个文件......
  • WPF入门教程系列二十四——DataGrid使用示例(1)
    WPF入门教程系列目录WPF入门教程系列二——Application介绍WPF入门教程系列三——Application介绍(续)WPF入门教程系列四——Dispatcher介绍WPF入门教程系列五——Window介绍WPF入门教程系列十一——依赖属性(一)WPF入门教程系列十五——WPF中的数据绑定(一)      ......
  • 滚动条样式自定义
    ::-webkit-scrollbar-track-piece{  width:1px;  height:1px;  background-color:#0c1e26;}::-webkit-scrollbar{  width:1px;  height:1px;  background-color:#16d0ff;}::-webkit-scrollbar-thumb{  width:1px;  heig......
  • FCM聚类,代码对数据先进行归一化然后聚类 可自定义聚类个数,求
    FCM聚类,代码对数据先进行归一化然后聚类可自定义聚类个数,求得每类的具体数据matlab代码,备注清楚,更改为自己的数据和要求即可ID:4730705546827267......
  • Copilot Chat示例应用程序
    微软在SemanticKernel博客上发布了文章:https://devblogs.microsoft.com/semantic-kernel/announcing-copilot-chat/。CopilotChat示例应用程序展示了开发人员如何将AI和LLM智能集成到自己的应用程序中,使开发人员能够使用自然语言处理,文件上传和语音识别等功能更轻松地构建聊天机......