首页 > 其他分享 >SPARK数据倾斜,随机数方式

SPARK数据倾斜,随机数方式

时间:2022-08-19 14:23:47浏览次数:97  
标签:聚合 task 倾斜 tuple Long 随机数 new SPARK 执行

1、现象
spark数据倾斜,有两种表现:
大部分的task,都执行的特别特别快,刷刷刷,就执行完了(你要用client模式,standalone client,yarn client,本地机器主要一执行spark-submit脚本,就会开始打印log),task175 finished;剩下几个task,执行的特别特别慢,前面的task,一般1s可以执行完5个;最后发现1000个task,998,999 task,要执行1个小时,2个小时才能执行完一个task。
出现数据倾斜了,还算好的,因为虽然老牛拉破车一样,非常慢,但是至少还能跑。
运行的时候,其他task都刷刷刷执行完了,也没什么特别的问题;但是有的task,就是会突然间,啪,报了一个OOM,JVM Out Of Memory,内存溢出了,task failed,task lost,resubmitting task。反复执行几次都到了某个task就是跑不通,最后就挂掉。
引用自大佬的原创: https://www.codetd.com/article/5201336
2、原因
基本只可能是因为发生了shuffle操作,在shuffle的过程中,出现了数据倾斜的问题。因为某个,或者某些key对应的数据,远远的高于其他的key。由于shuffle导致的,有几种reduceBykey(),groupByKey(),join等多种,因为数据不均衡
3、解决方式,随机数法
查看log日志,一般会报是在你的哪一行代码,导致了OOM异常;或者呢,看log,看看是执行到了第几个stage!!
第一次聚合(局部聚合):对每一个Key值加上一个随机数,执行第一次reduceByKey聚合操作。
第二次聚合(双重聚合):去掉Key值的前缀随机数,执行第二次reduceByKey聚合,最终得到全局聚合的结果。
大佬的博客: https://www.codetd.com/article/5727145
对groupByKey、reduceByKey造成的数据倾斜,有比较好的效果。


/**
		 * 第一步,给每个key打上一个随机数
		 */
		JavaPairRDD<String, Long> mappedClickCategoryIdRDD = clickCategoryIdRDD.mapToPair(

				new PairFunction<Tuple2<Long,Long>, String, Long>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2<String, Long> call(Tuple2<Long, Long> tuple)
							throws Exception {
						Random random = new Random();
						int prefix = random.nextInt(10);
						return new Tuple2<String, Long>(prefix + "_" + tuple._1, tuple._2);
					}

				});

		/**
		 * 第二步,执行第一轮局部聚合
		 */
		JavaPairRDD<String, Long> firstAggrRDD = mappedClickCategoryIdRDD.reduceByKey(

				new Function2<Long, Long, Long>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Long call(Long v1, Long v2) throws Exception {
						return v1 + v2;
					}

				});

		/**
		 * 第三步,去除掉每个key的前缀
		 */
		JavaPairRDD<Long, Long> restoredRDD = firstAggrRDD.mapToPair(

				new PairFunction<Tuple2<String,Long>, Long, Long>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Tuple2<Long, Long> call(Tuple2<String, Long> tuple)
							throws Exception {
						long categoryId = Long.valueOf(tuple._1.split("_")[1]);  
						return new Tuple2<Long, Long>(categoryId, tuple._2);  
					}

				});

		/**
		 * 第四步,最第二轮全局的聚合
		 */
		JavaPairRDD<Long, Long> clickCategoryId2CountRDD = restoredRDD.reduceByKey(

				new Function2<Long, Long, Long>() {

					private static final long serialVersionUID = 1L;

					@Override
					public Long call(Long v1, Long v2) throws Exception {
						return v1 + v2;
					}

				});

```java

标签:聚合,task,倾斜,tuple,Long,随机数,new,SPARK,执行
From: https://www.cnblogs.com/hbym/p/16601816.html

相关文章