天气案例
随机生成温度代码;并写入到文件中
需求:求每年2月份的最高温度
package utils; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class RandomWeather { public static void main(String[] args) throws ParseException, IOException { //创建日期格式 DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = sdf.parse("2000-01-01 00:00:00").getTime(); long end = sdf.parse("2022-12-31 00:00:00").getTime(); long difference = end - start; BufferedWriter bw=new BufferedWriter(new FileWriter("D:\\soft\\projects\\bigdata19-project\\
bigdata19-mapreduce\\data\\weather.txt")); for (int i = 0; i < 10000; i++) { //随机生成时间2000-2023 Date date = new Date(start + (long) (Math.random() * difference)); //随机生成一个温度 int temperature = -20 + (int) (Math.random() * 60); //打印生成的结果 // System.out.println(sdf.format(date) + "\t" + temperature); bw.write(sdf.format(date)+"\t"+temperature); bw.newLine(); bw.flush(); } } }
mapreduce代码
package com.shujia.weather; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /* 2014-07-08 14:04:18 -5 2000-10-03 14:28:54 32 2002-05-11 14:40:37 -18 2009-07-08 11:17:50 0 */ class WeatherMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("\t"); String temp=split[1]; String[] ym = split[0].split("-"); if("02".equals(ym[1])) { String yam=ym[0]+"-"+ym[1]; context.write(new Text(yam), new LongWritable(Long.parseLong(temp))); } } } class WeatherReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException { long max=0; for (LongWritable value : values) { long temp = value.get(); if(temp>max){ max=temp; } } context.write(key,new LongWritable(max)); } } public class WeatherMax { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WeatherMax.class); job.setJobName("求每一年2月份的最高温度"); job.setNumReduceTasks(1); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }
优化一:Combiner
使用combiner之前
使用combiner之后
减少的了reduce 从map拉取数据的过程,提高计算效率。
hadoop 的计算特点:将计算任务向数据靠拢,而不是将数据向计算靠拢。
特点:数据本地化,减少网络io。
首先需要知道,hadoop数据本地化是指的map任务,reduce任务并不具备数据本地化特征。
通常输入的数据首先在逻辑上(注意这里不是真正物理上划分)将会分片split,每个分片上构建一个map任务,由该任务执行执行用户自定义的map函数,从而处理分片中的每条记录。
那么切片的大小一般是趋向一个HDFS的block块的大小。为什么最佳的分片大小是趋向block块的大小呢?是因为这样能够确保单节点上最大输入块的大小,如果分片跨越两个数据块,没有一个block能够同时存储这两块数据,因此需要通过网络传输将部分数据传输到map任务节点上。这样明显比使用本地数据的map效率更低。
注意,map任务执行后的结果并没有写到HDFS中,而是作为中间结果存储到本地硬盘,那为什么没有存储到HDFS呢?因为,该中间结果会被reduce处理后产生最终结果后,该中间数据会被删除,如果存储到HDFS中,他会进行备份,这样明显没有意义。如果map将中间结果传输到reduce过程中出现了错误,Hadoop会在另一个节点上重新执行map产生中间结果。
那么为什么reduce没有数据本地化的特点呢?对于单个reduce任务来说,他的输入通常是所有mapper经过排序输出,这些输出通过网络传输到reduce节点,数据在reduce节点合并然后由reduce函数进行处理。最终结果输出到HDFS上。当多个有reduce任务的时候,map会针对输出进行分区partition,也就是为每个reduce构建一个分区,分区是由用户指定的partition函数,效率很高。
同时为了高效传输可以指定combiner函数,他的作用就是,减少网络传输和本地传输
注意:将reduce端的聚合操作,放到map 进行执行。适合求和,计数,等一些等幂操作。不适合求平均值,次幂等类似操作
优化二:Join(数据倾斜)
发生数据倾斜解决方法:1.可以设置多个reduce;2.把发生数据倾斜的key打上随机值分配到不同的reduce中;3.再写一个mapreduce,把随机值去掉再做一次聚合;
MapReduce中的join
其实就是类似于关系型数据库中的连接查询一样。需要计算的数据可能存储在不同的文件中或不同表中,两个文件又有一些相同的字段可以相互关联,这时候我们就可以通过这些关联字段将两个文件中的数据组合到一起进行计算了。
join的三种方式:Map join、SemiJoin、reduce join
Reduce Join:
思路:
分为两个阶段
(1)map函数主要是对不同文件中的数据打标签。
(2)reduce函数获取key相同的value list,进行笛卡尔积。
Map Join:
思路:
比如有两个表,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中保存一个hash map,将小表数据放入这个hash map中,key是小表与大表的内个连接字段,value是小表一条记录,然后只扫描大表:对于大表中的每一条记录key/value,在hash map中查找是否有相同的key的记录,如果有,则连接输出即可。
Semi Join :
Semi Join 这个SemiJoin其实就是对reduce join的一种优化。
就是在map端过滤掉不参加join操作的数据,则可以大大减少数据量,提高网络传输速度。
这三种join方式适用于不同的场景:
Reduce join要考虑数据量过大时的网络传输问题。
Map join和SemiJoin则要考虑数据量过大时的内存问题。 如果只考虑网络传输,忽略内存问题则。
Map join效率最高,其次是SemiJoin,最低的是reduce join。
DistributedCache(分布式缓存):
DistributedCache DistributedCache是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用。一般用户数据字典的分发,和map join使用。一般缓存的文件都是只读。
优化三:根据实际情况调整切片大小
为什么默认切片是128MB和block大小一致?(优化)
1 切片大小默认一致,是为了数据本地化,减少数据拉取消耗网络io
2 并不是越大越好,也不是越小越好。根据集群的资源情况而定。
当集群计算资源充足的情况下:将切片的大小调小,增加map数量,提高读取效率。
当集群计算资源紧张的情况下:将切片的大小调大,减少资源占用,让任务正常运转。
mapred.min.split.size、mapred.max.split.size、blockSize
优化四:可以设置yarn资源和队列
调整计算资源:参考博客:https://blog.csdn.net/qq_36753550/article/details/83065546
设置队列:参考博客:https://blog.51cto.com/u_13525470/4723358
标签:map,join,reduce,Hadoop,job,import,优化,class From: https://www.cnblogs.com/wqy1027/p/16642353.html