首页 > 其他分享 >spark的共享变量

spark的共享变量

时间:2024-12-13 18:59:59浏览次数:5  
标签:__ 变量 共享 environ conf sc spark os line

因为RDD在spark中是分布式存储

1、python中定义的变量仅仅在driver中运行,在excutor中是获取不到值的——广播变量

2、若定义了一个变量进行累加,先分别在driver和excutor中进行累加,但是结果是不会主动返回给driver的——累加器

Broadcast Variables广播变量

  • driver中存放python变量广播到别的excutor中

  • 若不使用,就会每个task存放一个

  • 不能修改,只能读

  • 通过value使用该变量

if __name__ == '__main__':
        # 配置环境
        os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
        # 配置Hadoop的路径,就是前面解压的那个路径
        os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
        # 配置base环境Python解析器的路径
        os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
        os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

        # 获取 conf 对象
        # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
        #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
        #  appName 任务的名字
        conf = SparkConf().setMaster("local[*]").setAppName("第一个Spark程序")
        # 假如我想设置压缩
        # conf.set("spark.eventLog.compression.codec","snappy")
        # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
        sc = SparkContext(conf=conf)

        fileRdd = sc.textFile("../datas/user.tsv",2)
        city_dict = {
                1: "北京",
                2: "上海",
                3: "广州",
                4: "深圳",
                5: "苏州",
                6: "无锡",
                7: "重庆",
                8: "厦门",
                9: "大理",
                10: "成都"
        }
       
        # 将一个变量广播出去,广播到executor中,不是task中
        city_dict_broad = sc.broadcast(city_dict)
        
        # 广播变量
        # <class 'pyspark.broadcast.Broadcast'>
        print(type(city_dict_broad ))
        # <class 'dict'>
        print(type(city_dict_broad.value))
        
        def getLine(line):
                list01 = line.split(" ")
                #cityName = city_dict.get(int(list01[3]))
                # 使用广播变量的变量获取数据
                cityName = city_dict_broad.value.get(int(list01[3]))
                # print(cityName)
                return line + " " + cityName
        mapRdd = fileRdd.map(getLine)
        mapRdd.foreach(print)

        # 释放广播变量
        city_dict_broad.unpersist()
        # 使用完后,记得关闭
        sc.stop()

 累加器

将所有的excutor中的变量返回到driver中,进行汇总。

否则变量是放在excutor中的,而打印的是driver中,变量值不会改变。

用于修改——汇总

import os
import re

import jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel

"""
------------------------------------------
  Description : TODO:
  SourceFile : _06SouGou案例
  Author  : yange
  Date  : 2024/10/31 星期四
-------------------------------------------
"""
if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")
    # 假如我想设置压缩
    # conf.set("spark.eventLog.compression.codec","snappy")
    # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
    sc = SparkContext(conf=conf)

    mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
     .filter(lambda line:len(re.split("\s+",line)) == 6) \
     .map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)

    # 统计一天每小时点击量并按照点击量降序排序
    _sum = 0
    def sumTotalLine(tuple1):
        global _sum # 把_sum 设置为全局变量
        timeStr = tuple1[0] # 10:19:18
        if timeStr[0:2] == '10':
            _sum += 1

    mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))
    print(_sum) # 结果是0


    # 使用完后,记得关闭
    sc.stop()

上面程序最终结果是:0,因为 sum=0 是在 Driver 端的内存中的,executor 中程序再累加也是无法改变 Driver 端的结果的。下面的则为正确的

import os
import re

import jieba
# 导入pyspark模块
from pyspark import SparkContext, SparkConf
from pyspark.storagelevel import StorageLevel

"""
------------------------------------------
  Description : TODO:
  SourceFile : _06SouGou案例
  Author  : yange
  Date  : 2024/10/31 星期四
-------------------------------------------
"""
if __name__ == '__main__':
    # 配置环境
    os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk1.8.0_241'
    # 配置Hadoop的路径,就是前面解压的那个路径
    os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1'
    # 配置base环境Python解析器的路径
    os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
    os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

    # 获取 conf 对象
    # setMaster  按照什么模式运行,local  bigdata01:7077  yarn
    #  local[2]  使用2核CPU   * 你本地资源有多少核就用多少核
    #  appName 任务的名字
    conf = SparkConf().setMaster("local[*]").setAppName("搜索热词案例")
    # 假如我想设置压缩
    # conf.set("spark.eventLog.compression.codec","snappy")
    # 根据配置文件,得到一个SC对象,第一个conf 是 形参的名字,第二个conf 是实参的名字
    sc = SparkContext(conf=conf)
    accCounter = sc.accumulator(0)

    mapRdd = sc.textFile("../../datas/zuoye/sogou.tsv",minPartitions=8) \
     .filter(lambda line:len(re.split("\s+",line)) == 6) \
     .map(lambda line:(re.split("\s+",line)[0],re.split("\s+",line)[1],re.split("\s+",line)[2][1:-1])).persist(StorageLevel.MEMORY_AND_DISK_2)

    # 统计一天每小时点击量并按照点击量降序排序
    #_sum = 0
    def sumTotalLine(tuple1):
        #global _sum # 把_sum 设置为全局变量
        timeStr = tuple1[0] # 10:19:18
        if timeStr[0:2] == '10':
            accCounter.add(1)

    mapRdd.foreach(lambda tuple1:sumTotalLine(tuple1))
    print(accCounter.value) # 104694

    # 假如我不知道累加器这个操作,这个题目怎么做?
    print(mapRdd.filter(lambda tuple1: tuple1[0][0:2] == '10').count())


    # 使用完后,记得关闭
    sc.stop()

 

标签:__,变量,共享,environ,conf,sc,spark,os,line
From: https://blog.csdn.net/weixin_52642840/article/details/144436920

相关文章

  • 明明正确配置了export PATH=XXX环境变量,但是source ~/.bashrc和/etc/profile之后还是
    问题:明明正确配置了exportPATH="/***:$PATH",比如:exportPATH="/usr/bin:/bin:$PATH"exportPATH=$PATH:/home/jie/myapp/dealii-9.2.0但是source~/.bashrc之后还是不生效然后在/etc/profile配置了之后也是不自动生效。问题根源:后面有一个PATH配置错了没有加后缀:$......
  • JAVA中 普通方法/变量,静态方法/变量,之间的相互调用
    publicclassMyapp{//普通变量inta=1;//静态变量staticStringstr="字符串";//普通方法publicvoidMethod1(){//普通方法中可以使用普通属性/方法,静态属性/静态方法均是直接调用即可System.out.println(a);S......
  • 鸿蒙Next状态变量Watch使用方法总结
    一、@Watch装饰器概述@Watch装饰器用于监听状态变量的变化,当被装饰的状态变量发生改变时,会触发对应的回调函数执行。其在ArkUI框架内部基于严格相等(===)来判断数值是否更新,仅当严格相等判断为false时,才会触发回调。这为开发者提供了一种有效的方式来响应状态变量的变化,从而实现应......
  • Apache Spark 的基本概念和在大数据分析中的应用。
    ApacheSpark是一个开源大数据处理框架,被广泛应用于大规模数据分析、机器学习和图形处理等领域。它具有以下几个基本概念:RDD(ResilientDistributedDataset):RDD是Spark中最基本的数据抽象概念,代表了一个分布式的不可变的数据集合。RDD可以从外部数据源创建,也可以通过转换操作(......
  • Go语言变量 (值类型和引用类型)
    所有像int、float、bool和string这些基本类型都属于值类型,使用这些类型的变量直接指向内存中的值当使用等号“=”将一个变量赋值给另一个变量时,如:j=i,实际上是在内存中将i的值进行了拷贝你可以通过&var来获取变量var的地址值类型变量通常储存在栈中,尤其当它们时......
  • Springboot基于SpringBoot共享单车管理信息平台m28dx(程序+源码+数据库+调试部署+开发
    本系统(程序+源码+数据库+调试部署+开发环境)带论文文档1万字以上,文末可获取,系统界面在最后面。系统程序文件列表用户,共享单车,单车类型,租赁信息,交易信息,在线充值,充值信息,余额信息开题报告内容一、研究背景与意义随着城市化进程的加快和人们对环保出行方式的追求,共享......
  • 判断变量类型
    在JavaScript中,判断变量的类型有多种方式。以下是一些常用的方法:typeof操作符:typeof是JavaScript中用于判断变量类型的基本操作符。它可以返回以下字符串之一:"number"、"string"、"boolean"、"object"、"function"、"undefined"和"symbol"(ES6新增)。letnum......
  • var声明变量
    在JavaScript中,使用var关键字声明的变量具有一些特定的行为和特性,尤其是与对象(object)相关的声明。以下是一些关键点,帮助你理解var声明对象的行为:函数作用域(FunctionScope):使用var声明的变量是函数作用域的,这意味着它们在整个函数内部都可以访问,但在函数外部无法访问。......
  • Shell浅浅谈(三)Shell 变量与操作详解
    1.变量基础1.1定义变量语法:variable_name=value注意事项:变量名和等号之间不能有空格变量名规则:只能包含字母、数字和下划线_,例如hello_world="helloword"。不能以数字开头,例如1aaa则不行。避免使用Shell关键字,这是所有语言都需要注意的。1.2使用变量使......
  • Linux如何挂载windows共享文件夹(包含Linux报错解决)
     目录前言windows如何共享文件夹windows如何查看共享Linux端挂载共享文件夹关于挂载共享文件夹的报错解决1.关闭windows防火墙2.确认windows的CIFS文件共享功能是否开启3.检查源路径与挂载路径是否正确4.检查用户名及密码是否正确5.检查共享文件夹权限6.......