【博学谷IT技术支持】
一、介绍
MapReduce是将一个大的计算任务拆分成一个个小任务,让小任务在不同的计算机中进行处理,最后将任务的结果进行汇总的过程。
MR的工作流程可以分为三个阶段,分别是map、shuffle、reduce
二、编程
Mapper阶段
自定义一个类来集成Mapper类,重写map方法,将方法中的k1、v1转化为k2、v2。同时输入输出的数据类型以键值对的形式,如<key, value>
public class WordCountMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
/**
* Context context 整个MR的上下文对象
* */
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 根据字符串的不同来切开字符串,比如“1|2|3|4” 就切割|,如果是,就切割
// 例如数组是hello|age|world|age|age
// k1代表各行文本相较于文本开头的偏移量,v1代表这行数据
String[] wordArray = value.toString().split("|");
// 根据得到的数组进行处理,根据需求设置k2,v2,同时这里String的类型是Text,Int的类型是LongWritable
for (String k2 : wordArray) {
// 这里写入的就是k2,v2
context.write(new Text(k2), new LongWritable(1));
}
}
}
注意: 这里只有两个主要的位置,第一个是如何对进来的v1进去获取,第二点是设置k2,v2并使用context.write
写入
Shuffle阶段
map阶段要对数据进行了切片,为每个切片分配一个MapTask任务,在通过处理后得到键值对。此后进入shuffle阶段。
shuffle阶段又分四个阶段分别是分区、排序、规约、分组
分区
将不同键值对的数据,输入到不同的文件中,对数据进行拆分。自定义类,继承Partitioner
类,将该类中重写getPartition
方法,定义分区规则。getPartition
有三个参数,分别是k,v以及设置reduce任务的数量,默认是1
public class MyPartitioner extends Partitioner<Text, Text> {
@Override
public int getPartition(Text text, Text text2, int i) {
// 这里定义分区的规则
//比如说这里给不同的文件打标记
String newK2 = text.toString() + " " + text.toString().length();
return (newK2.hashCode() & 2147483647) % i;//这里用来返回分区的编号
}
}
排序
MR的排序只能根据K2进行排序,因此如果要排序,k2中应该包含关键字。
定义javabean类,同时需要实现WritableComparable接口。该类必须要满足能够序列化与反序列化,序列化与反序列化的过程字段的读写顺序要一致。这里的排序在compareTo
方法中实现。
public class Covidbean implements WritableComparable<Covidbean> {
private Integer cases;
private Integer deaths;
public Integer getCases() {
return cases;
}
public void setCases(Integer cases) {
this.cases = cases;
}
public Integer getDeaths() {
return deaths;
}
public void setDeaths(Integer deaths) {
this.deaths = deaths;
}
@Override
public String toString() {
return cases + '\t' + deaths;
}
@Override
public int compareTo(Covidbean o) {
int result = this.cases - o.cases;
if(result == 0) {
return this.deaths - o.deaths;
} else {
return result;
}
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(state);
dataOutput.writeUTF(country);
dataOutput.writeInt(cases);
dataOutput.writeInt(deaths);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.state = dataInput.readUTF();
this.country = dataInput.readUTF();
this.cases = dataInput.readInt();
this.deaths = dataInput.readInt();
}
}
规约
Combiner是MR的优化手段,将Map的数据进行提前聚合,减少Map端和Reduce端网络传输的数据量。combiner没有默认的实现,需要显式的设置在conf中才有作用。
public class MyCombiner extends Reducer<Text, LongWritable, Text, LongWritable> {
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
// 得到k2,v2,通过不同的逻辑处理,得到k3,v3
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
分组
分组的作用就是根据k2进行去重,将相同的k2分入同一组,相当于group by k2的作用。如果没有指定分组规则,系统会默人调用k2类中的compareTo方法
- 自定义分组类需要继承 WritableComparator 父类并重写 compare() 方法
public class GroupingComparator extends WritableComparator {
public GroupingComparator() {
super(CovidBean.class,true); //将k2类传给父类,并允许父类能够通过反射创建CovidBean对象
}
//3:重写compare方法,在该方法中指定分组的规则
@Override
public int compare(WritableComparable a, WritableComparable b) {
//分组规则:同一个州的数据分到一个组
// 根据不同的需求设置分组规则
CovidBean a1 = (CovidBean) a;
CovidBean b1 = (CovidBean) b;
//如果return返回的0,则就会将a1 和 b2分到同一组
return a1.getState().compareTo(b1.getState());
}
}
ReducerJoin
map端只完成文件合并 利用相同的关联条件id作为key输出到reducer端,reduce端完成join操作。一般是大表join大表
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1:确定读取的是哪个源数据文件
FileSplit fileSplit = (FileSplit) context.getInputSplit(); //获取文件切片
String fileName = fileSplit.getPath().getName(); //获取源文件的名字
String[] array = value.toString().split("\|");
//2:处理订单文件
if ("itheima_order_goods.txt".equals(fileName)) { //订单文件
// 1|107860|7191
//2.1:获取K2
String k2 = array[1];
//2.2:获取v2
String v2 = "o_"+array[0] + "\t" + array[2];
//2.3:将k2和v2写入上下文中
context.write(new Text(k2), new Text(v2));
}
//3:处理商品文件
if ("itheima_goods.txt".equals(fileName)) { //商品文件
// 107860|3786028|黑色硅胶腕带
//3.1 获取K2
String k2 = array[0];
String v2 = "g_"+array[0] + "\t" + array[2];
//3.2:将k2和v2写入上下文中
context.write(new Text(k2), new Text(v2));
}
}
}
MapJoin
在map端实现文件合并 重写 setup和map方法 没有reduce。
一般用于小表join大表,小表全部加载到内存中,在map阶段直接拿另外一个表的数据和内存中表数据做匹配,由于在map端是进行了join操作,省去了reduce 运行的时间
setup方法会在map方法执行之前先执行,而且只会执行一次,主要用来做初始化工作
public class MapJoin1Mapper extends Mapper<LongWritable, Text, Text, NullWritable> {
HashMap<String, String> goodsMap = new HashMap<>();
//将小表从分布式缓存中读取,存入Map集合
@Override
protected void setup(Context context) throws IOException {
//缓冲流 字符流 字节流
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream("itheima_goods.txt")));
String line = null;
while ((line = bufferedReader.readLine()) != null) {
String[] array = line.split("\|");
goodsMap.put(array[0], array[2]);
}
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context) throws IOException, InterruptedException {
String[] array = value.toString().split("\|");
String k2 = array[1];
String v2 = array[0] + "\t" + array[2];
String mapValue = goodsMap.get(k2);
context.write(new Text(v2 + "\t"+ mapValue), NullWritable.get());
}
}
Reducer阶段
自定义类继承Reducer类,重写reduce方法,在该方法中将k2,v2转化为k3和v3
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
context.write(key, new LongWritable(count));
}
}
主方法的调用
public class WordCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
.......
}
}
封装job任务
创建一个job对象,设置主类的名称
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "WordCountBase");
job.setJarByClass(WordCountDriver.class);
设置文件的读取写入路径
FileInputFormat.addInputPath(job, new Path(args[0]));
Path outputPath = new Path(args[1]);
FileOutputFormat.setOutputPath(job, outputPath);
关联Mapper
设置三个参数,自定义类,k2,v2
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
关联Reducer
设置三个参数,自定义类,k3,v3
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
设置shuffle分区类以及分区个数
job.setPartitionerClass(MyPartitioner.class);
job.setNumReduceTasks(50);
设置Shuffle的Combiner
job.setCombinerClass(MyCombiner.class);
设置Shuffle的分组类
job.setGroupingComparatorClass(GroupingComparator.class);
文件多次执行的处理
FileSystem fileSystem = FileSystem.get(new URI("file:///"), new Configuration());
boolean is_exists = fileSystem.exists(outputPath);
if(is_exists == true) {
//如果目标文件存在,则删除
fileSystem.delete(outputPath, true);
yran执行
将job提交给yarn执行
```js
boolean bl = job.waitForCompletion(true);
System.exit(bl ? 0 : 1);
JobControl控制作业流顺序
ControlledJob cj1 = new ControlledJob(configuration);
cj1.setJob(job1);
ControlledJob cj2 = new ControlledJob(configuration);
cj2.setJob(job2);
//设置作业之间的依赖关系
cj2.addDependingJob(cj1);
// 创建主控制器
JobControl jc = new JobControl("myCtrl");
jc.addJob(cj1);
jc.addJob(cj2);
// 使用线程启动JobControl
Thread t = new Thread(jc);
t.start();
while (true) {
if (jc.allFinished()) {
jc.stop();
break;
}
}
标签:String,编程,MapReduec,class,k2,job,超强,new,public
From: https://www.cnblogs.com/neilniu/p/17448108.html