首页 > 其他分享 >【大数据学习 | Spark-Core】广播变量和累加器

【大数据学习 | Spark-Core】广播变量和累加器

时间:2024-11-24 20:33:08浏览次数:13  
标签:Core 变量 val ip 累加器 conf executor Spark

1. 共享变量

Spark两种共享变量:广播变量(broadcast variable)与累加器(accumulator)。

累加器用来对信息进行聚合,相当于mapreduce中的counter;而广播变量用来高效分发较大的对象,相当于semijoin中的DistributedCache 。

共享变量出现的原因:

我们传递给Spark的函数,如map(),或者filter()的判断条件函数,能够利用定义在函数之外的变量,但是集群中的每一个task都会得到变量的一个副本,并且task在对变量进行的更新不会被返回给driver。

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}

object TestAcc {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test acc")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)

    val rdd = sc.makeRDD(Array(1, 2, 3, 4, 5, 6, 7, 8, 9),3)

    val count = rdd.map(t=> 1).reduce(_+_)

    println(count)

//    val acc = sc.longAccumulator("count")
//
//    rdd.foreach(t=>{
//      acc.add(1)
//    })
//
//    println(acc.value)

//    println(rdd.count())
  }
}

原因总结:

对于executor端,driver端的变量是外部变量。

excutor端修改了变量count,根本不会让driver端跟着修改。如果想在driver端得到executor端修改的变量,需要用累加器实现。

当在Executor端用到了Driver变量,不使用广播变量,在每个Executor中有多少个task就有多少个Driver端变量副本。如果这个变量中的数据很大的话,会产生很高的传输负载,导致执行效率降低,也可能会造成内存溢出。使用广播变量以后,在每个Executor中只有一个Driver端变量副本,在一个executor中的并行执行的task任务会引用该一个变量副本即可,需要广播变量提高运行效率。

2. 累加器

累加器的执行流程:

通过SparkContext创建一个累加器并初始化。当driver端将任务分发给executor时,每个executor会接收一个任务和一个引用到该累加器的副本。每个executor上的任务可以调用累加器的add方法来增加累加器的值,这些操作是线程安全的,因为每个任务都会在自己的executor线程中执行。当每个任务完成,executor将累加器的更新值发送到driver端进行聚合过程,得到最终的聚合结果。

累加器可以很简便地对各个worker返回给driver的值进行聚合。累加器最常见的用途之一就是对一个job执行期间发生的事件进行计数。

用法:

var acc: LongAccumulator = sc.longAccumulator // 创建累加器

acc.add(1) // 累加器累加

acc.value // 获取累加器的值

累加器的简单使用

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}

object WordCountWithAcc {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("test acc")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)
    val acc = sc.longAccumulator("bad word")
    sc.textFile("data/a.txt")
      .flatMap(_.split(" "))
      .filter(t=>{
        if(t.equals("shit")){
          acc.add(1)
          false
        }else
          true
      }).map((_,1))
      .reduceByKey(_+_)
      .foreach(println)

    println("invalid words:"+acc.value)
  }
}

3. 广播变量

ip转换工具

public class IpUtils {

    public static Long ip2Long(String ip) {
        String fragments[] = ip.split("[.]");
        Long ipNum = 0L;
        for(int i=0;i<fragments.length;i++) {
            ipNum = Long.parseLong(fragments[i]) | ipNum << 8L;
        }
        return ipNum;
    }
}

ip案例代码

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}

object IpTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("ip")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)

    val accessRDD = sc.textFile("data/access.log")
      .map(t=>{
        val strs = t.split("\\|")
        IpUtils.ip2Long(strs(1))
      })
    val ipArr:Array[(Long,Long,String)] = sc.textFile("data/ip.txt").map(t=>{
      val strs = t.split("\\|")
      (strs(2).toLong,strs(3).toLong,strs(6)+strs(7))
    }).collect()

//    accessRDD.map(ip=>{
//      ipRDD.filter(t=>{
//        ip>= t._1 && ip<= t._2
//      })
//    }).foreach(println)

    accessRDD.map(ip=>{
      ipArr.find(t=>{
        t._1<= ip && t._2>=ip
      }) match {
        case Some(v) => (v._3,1)
        case None => ("unknow",1)
      }
      //option
    }).reduceByKey(_+_)
      .foreach(println)
  }
}

使用广播变量可以使程序高效地将一个很大的只读数据发送到executor节点,会将广播变量放到executor的BlockManager中,而且对每个executor节点只需要传输一次,该executor节点的多个task可以共用这一个。

用法:

val broad: Broadcast[List[Int]] = sc.broadcast(list) // 把driver端的变量用广播变量包装

broad.value // 从广播变量获取包装的数据,用于计算

我们可能遇到这样的问题:如果我们需要广播的数据为100M,如果需要driver端亲自向每个executor端发送100M的数据,在工作中executor节点的个数可能是很多的,比如是200个,这意味着driver端要发送20G的数据,这对于driver端的压力太大了。所以要用到比特洪流技术。

就是说driver端不必向每个executor发送一份完整的广播变量的数据,而是将一份广播变量切分成200份,发送给两百个executor,然后200个executor间通过BlockManager中的组件transferService与其他executor通信,进行完整的数据。

这样driver端只需要发送一份广播变量的数据,压力就会小很多,而且其他executor也都拿到了这一份广播变量的数据 。

package com.hainiu.spark

import org.apache.spark.{SparkConf, SparkContext}

object IpTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("ip")
    conf.setMaster("local[*]")
    val sc = new SparkContext(conf)

    val accessRDD = sc.textFile("data/access.log")
      .map(t=>{
        val strs = t.split("\\|")
        IpUtils.ip2Long(strs(1))
      })
    val ipArr:Array[(Long,Long,String)] = sc.textFile("data/ip.txt").map(t=>{
      val strs = t.split("\\|")
      (strs(2).toLong,strs(3).toLong,strs(6)+strs(7))
    }).collect()

    val bs = sc.broadcast(ipArr)

    //    accessRDD.map(ip=>{
    //      ipRDD.filter(t=>{
    //        ip>= t._1 && ip<= t._2
    //      })
    //    }).foreach(println)

    accessRDD.map(ip=>{
      bs.value.find(t=>{
        t._1<= ip && t._2>=ip
      }) match {
        case Some(v) => (v._3,1)
        case None => ("unknow",1)
      }
      //option
    }).reduceByKey(_+_)
      .foreach(println)
  }
}

为了提高查找的效率,可以使用二分法查找代码。将时间复杂度由O(n)优化到了O(logn)。

      val start = System.currentTimeMillis()
      val res =  (binarySearch(ip,bs.value),1)
//      val res = bs.value.find(t=>{
//        t._1<= ip && t._2>=ip
//      }) match {
//        case Some(v) => (v._3,1)
//        case None => ("unknow",1)
//      }
      val end = System.currentTimeMillis()
      acc.add(end-start)

累加器实现运行时间的统计

标签:Core,变量,val,ip,累加器,conf,executor,Spark
From: https://blog.csdn.net/2301_80912559/article/details/144000328

相关文章

  • 【大数据学习 | Spark-Core】Spark的kryo序列化
    1.前言由于大多数Spark计算的内存性质,Spark程序可能会受到集群中任何资源(CPU,网络带宽或内存)的瓶颈。通常,如果内存资源足够,则瓶颈是网络带宽。数据序列化,这对于良好的网络性能至关重要。在Spark的架构中,在网络中传递的或者缓存在内存、硬盘中的对象需要进行序列化操作。比如......
  • 【大数据学习 | Spark-Core】RDD的五大特性(包含宽窄依赖)
    分析一下rdd的特性和执行流程Alistofpartitions存在一系列的分区列表Afunctionforcomputingeachsplit每个rdd上面都存在compute方法进行计算AlistofdependenciesonotherRDDs每个rdd上面都存在一系列的依赖关系Optionally,aPartitionerforkey-valueRDDs......
  • spark 写入mysql 中文数据 显示?? 或者 乱码
    目录前言Spark报错:解决办法:总结一下:报错:解决:前言用spark写入mysql中,查看中文数据显示??或者乱码Spark报错:SatNov2319:15:59CST2024WARN:EstablishingSSLconnectionwithoutserver'sidentityverificationisnotrecommended.AccordingtoMySQL5.......
  • 在 ASP.NET Core 中创建 gRPC 客户端和服务器
    前言gRPC是一种高性能、开源的远程过程调用(RPC)框架,它基于ProtocolBuffers(protobuf)定义服务,并使用HTTP/2协议进行通信。新建项目新建解决方案GrpcDemo新建webapi项目GrpcServer作为grpc服务端项目添加包<PackageReferenceInclude="Grpc.AspNetCore"Version="2.67.......
  • 代谢组数据分析(二十二):Zscore标准化后主成分分析(PCA)及热图展示
    禁止商业或二改转载,仅供自学使用,侵权必究,如需截取部分内容请后台联系作者!文章目录介绍Z-score标准化主成分分析(PrincipalComponentAnalysis,PCA)加载R包数据下载导入数据数据预处理填补缺失值Zscore标准化PCA分析热图展示代谢物聚类簇小提琴图聚类......
  • afcore.dll文件丢失影响Omega Strikers运行?Omega Strikers玩家必看afcore.dll文件丢失
    在沉浸于OmegaStrikers这款快节奏、竞技性强的游戏时,突然遭遇afcore.dll文件丢失的问题,无疑会给玩家带来不小的困扰。这一关键文件的缺失,可能导致游戏无法正常启动、运行卡顿,甚至频繁崩溃,严重影响游戏体验。然而,面对这一挑战,玩家无需过度焦虑,因为通过一系列自救措施,你完全有能......
  • 计算机毕业设计Python+Spark知识图谱课程推荐系统 课程用户画像系统 课程大数据 课程
    《Python+Spark知识图谱课程推荐系统》开题报告一、研究背景与意义随着互联网技术的快速发展,在线教育平台已成为人们获取知识、提升技能的重要途径。然而,面对海量的课程资源,用户往往难以快速找到符合自己兴趣和需求的课程。传统的课程推荐系统大多基于简单的规则或统计方法,难......
  • Spark实现PageRank算法
    详细步骤:1、创建Sparksql环境2、读取数据3、数据切分(分为page列,outLink列)形成表pageDF4、新增pr一列 (给定初始值)  形成表initPrDF5、新增avgPr一列(根据出链关系,求每个页面所分到的Pr)6、分组聚合(将outLink列explode炸开,在按照page分组,然后sum求和,这就是表......
  • .net core web api授权、鉴权、API保护
    前言本文整理asp.netcorewebAPI的授权、鉴权以及注册验证、API保护一系列常用技术手段。本文所有的实现代码可以参考:https://gitee.com/xiaoqingyao/web-app-identity.git用户管理授权和鉴权的前提是要有一个用户管理模块,.net提供一个现有的Identity组件,帮我们完成了大部......
  • .NET Core SqlSugar
    概念:1.官方文档:https://www.donet5.com/Home/Doc?typeId=11802.在vsstudio中导包SqlSugarCore创建模型类:1.vsstudio2022中选择项目2.选择6.03.projram.csusingSqlSugar;varbuilder=WebApplication.CreateBuilder(args);//Addservicestothecontain......