首页 > 其他分享 >Hadoop优化

Hadoop优化

时间:2022-08-31 17:12:13浏览次数:70  
标签:map join reduce Hadoop job import 优化 class

天气案例

随机生成温度代码;并写入到文件中

需求:求每年2月份的最高温度

package utils;


import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;

public class RandomWeather {
    public static void main(String[] args) throws ParseException, IOException {
        //创建日期格式
        DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        long start = sdf.parse("2000-01-01 00:00:00").getTime();
        long end = sdf.parse("2022-12-31 00:00:00").getTime();
        long difference = end - start;
        BufferedWriter bw=new BufferedWriter(new FileWriter("D:\\soft\\projects\\bigdata19-project\\
bigdata19-mapreduce\\data\\weather.txt")); for (int i = 0; i < 10000; i++) { //随机生成时间2000-2023 Date date = new Date(start + (long) (Math.random() * difference)); //随机生成一个温度 int temperature = -20 + (int) (Math.random() * 60); //打印生成的结果 // System.out.println(sdf.format(date) + "\t" + temperature); bw.write(sdf.format(date)+"\t"+temperature); bw.newLine(); bw.flush(); } } }

mapreduce代码

package com.shujia.weather;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/*
2014-07-08 14:04:18    -5
2000-10-03 14:28:54    32
2002-05-11 14:40:37    -18
2009-07-08 11:17:50    0
 */
class WeatherMapper extends Mapper<LongWritable,Text,Text,LongWritable>{
    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context) 
throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("\t"); String temp=split[1]; String[] ym = split[0].split("-"); if("02".equals(ym[1])) { String yam=ym[0]+"-"+ym[1]; context.write(new Text(yam), new LongWritable(Long.parseLong(temp))); } } } class WeatherReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException { long max=0; for (LongWritable value : values) { long temp = value.get(); if(temp>max){ max=temp; } } context.write(key,new LongWritable(max)); } } public class WeatherMax { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WeatherMax.class); job.setJobName("求每一年2月份的最高温度"); job.setNumReduceTasks(1); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }

 优化一:Combiner

使用combiner之前

 

使用combiner之后

减少的了reduce 从map拉取数据的过程,提高计算效率。

 

hadoop 的计算特点:将计算任务向数据靠拢,而不是将数据向计算靠拢。

特点:数据本地化,减少网络io。

首先需要知道,hadoop数据本地化是指的map任务,reduce任务并不具备数据本地化特征。

通常输入的数据首先在逻辑上注意这里不是真正物理上划分)将会分片split,每个分片上构建一个map任务,由该任务执行执行用户自定义的map函数,从而处理分片中的每条记录。

那么切片的大小一般是趋向一个HDFS的block块的大小。为什么最佳的分片大小是趋向block块的大小呢?是因为这样能够确保单节点上最大输入块的大小,如果分片跨越两个数据块,没有一个block能够同时存储这两块数据,因此需要通过网络传输将部分数据传输到map任务节点上。这样明显比使用本地数据的map效率更低。

注意,map任务执行后的结果并没有写到HDFS中,而是作为中间结果存储到本地硬盘,那为什么没有存储到HDFS呢?因为,该中间结果会被reduce处理后产生最终结果后,该中间数据会被删除,如果存储到HDFS中,他会进行备份,这样明显没有意义。如果map将中间结果传输到reduce过程中出现了错误,Hadoop会在另一个节点上重新执行map产生中间结果。

那么为什么reduce没有数据本地化的特点呢?对于单个reduce任务来说,他的输入通常是所有mapper经过排序输出,这些输出通过网络传输到reduce节点,数据在reduce节点合并然后由reduce函数进行处理。最终结果输出到HDFS上。当多个有reduce任务的时候,map会针对输出进行分区partition,也就是为每个reduce构建一个分区,分区是由用户指定的partition函数,效率很高。

同时为了高效传输可以指定combiner函数,他的作用就是,减少网络传输和本地传输


注意:将reduce端的聚合操作,放到map 进行执行。适合求和,计数,等一些等幂操作。不适合求平均值,次幂等类似操作

优化二:Join(数据倾斜)

发生数据倾斜解决方法:1.可以设置多个reduce;2.把发生数据倾斜的key打上随机值分配到不同的reduce中;3.再写一个mapreduce,把随机值去掉再做一次聚合;

MapReduce中的join

  其实就是类似于关系型数据库中的连接查询一样。需要计算的数据可能存储在不同的文件中或不同表中,两个文件又有一些相同的字段可以相互关联,这时候我们就可以通过这些关联字段将两个文件中的数据组合到一起进行计算了。

join的三种方式:Map join、SemiJoin、reduce join

Reduce Join:

思路:

  分为两个阶段

   (1)map函数主要是对不同文件中的数据打标签。

  (2)reduce函数获取key相同的value list,进行笛卡尔积。

Map Join:

思路:

比如有两个表,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份,让每个map task内存中保存一个hash map,将小表数据放入这个hash map中,key是小表与大表的内个连接字段,value是小表一条记录,然后只扫描大表:对于大表中的每一条记录key/value,在hash map中查找是否有相同的key的记录,如果有,则连接输出即可。

Semi Join :

Semi Join 这个SemiJoin其实就是对reduce join的一种优化。

就是在map端过滤掉不参加join操作的数据,则可以大大减少数据量,提高网络传输速度。

这三种join方式适用于不同的场景:

       Reduce join要考虑数据量过大时的网络传输问题。

  Map join和SemiJoin则要考虑数据量过大时的内存问题。 如果只考虑网络传输,忽略内存问题则。

  Map join效率最高,其次是SemiJoin,最低的是reduce join。

DistributedCache(分布式缓存):

DistributedCache DistributedCache是Hadoop提供的文件缓存工具,它能够自动将指定的文件分发到各个节点上,缓存到本地,供用户程序读取使用。一般用户数据字典的分发,和map join使用。一般缓存的文件都是只读。

优化三:根据实际情况调整切片大小

为什么默认切片是128MB和block大小一致?(优化)

1 切片大小默认一致,是为了数据本地化,减少数据拉取消耗网络io

2 并不是越大越好,也不是越小越好。根据集群的资源情况而定。

当集群计算资源充足的情况下:将切片的大小调小,增加map数量,提高读取效率。

当集群计算资源紧张的情况下:将切片的大小调大,减少资源占用,让任务正常运转。

mapred.min.split.size、mapred.max.split.size、blockSize

优化四:可以设置yarn资源和队列

调整计算资源:参考博客:https://blog.csdn.net/qq_36753550/article/details/83065546

设置队列:参考博客:https://blog.51cto.com/u_13525470/4723358

 

标签:map,join,reduce,Hadoop,job,import,优化,class
From: https://www.cnblogs.com/wqy1027/p/16642353.html

相关文章

  • SpringBoot使用@Async和@Transactional注解优化接口
    1、业务背景:项目上有一个接口需要按照前端传递的时间段范围修改6个表的数据,接口V1版本开发完成是使用的同步方式全局@Transactional注解的方式去做的,但存在一个问题就......
  • hadoop小结
    Hadoop是一个适合海量数据的分布式存储和分布式计算的平台主要有以下功能:HadoopCommon:基础型功能HadoopDistributedFileSystem(HDFS™):一种分布式文件系统,可提供对......
  • 力扣 110. 平衡二叉树 [基础+优化]
    110.平衡二叉树给定一个二叉树,判断它是否是高度平衡的二叉树。本题中,一棵高度平衡二叉树定义为:一个二叉树每个节点 的左右两个子树的高度差的绝对值不超过1。......
  • vue-cli 配置优化
    本文整理了一些vue开发中常用vue-cli配置,使用的vue-cli版本为3.11.0,主要内容包括:移除preload与prefetch使用webpack-bundle-analyzer做打包分析使用ters......
  • hadoop配置的几个小问题
    1.ssh免密登陆配置也要给本机进行配置,否则后续会出现问题。2.hadoop配置文件中需要看好value值的最后不能有空格。在hdfs-site.xml中,登陆网址是否自己进行了配置。 ......
  • 1.MySQL优化
    MySQL中的索引管理​ 在MySQL中,对索引的查看和删除操作是所有索引类型通用的。6.1普通索引​ 这是最基本的索引,它没有任何限制MyIASM中默认的BTREE类型的索......
  • in notin exists not exists 性能优化算法总结
    innotinexistsnotexists性能优化算法总结1.1.in和exists区别1.2.notin能不能走索引1.3.notin和join的关系1.4.和notExists的关系1.5.in的实......
  • 题解 AT5635 Shortest Path on a Line(线段树优化建图)
    题解AT5635ShortestPathonaLineDescription题目传送门题面翻译有一张有\(N\)个点,编号为\(1-N\)的无向图。做\(M\)次操作,每次操作给出三个正整数\(L,R,......
  • HTTP/1.1 如何优化
    3种优化思路:尽量避免发送HTTP请求;在需要发送HTTP请求时,考虑如何减少请求次数;减少服务器的HTTP响应的数据大小;一、尽量避免发送HTTP请求实现方法:对于⼀些......
  • 前端性能优化(一)---时间角度优化:减少耗时
    一、为什么要进行性能优化对于一个产品来说,用户的体验是最重要的。当页面加载时间过长,交互操作不流畅时,会给用户带来很糟糕的体验,会导致用户流失。二、前端常见的性能优......