首页 > 其他分享 >spark基础(1)

spark基础(1)

时间:2023-02-13 22:45:35浏览次数:55  
标签:count val sum 基础 cost 分组 spark total

将相同国家进行分组,然后将count相加sum(count), 对sum(count)进行排序,输出top5

    val path="/Volumes/Data/BigData_code/data/flight-data/csv/2015-summary.csv"
    val data = spark.read.option("inferSchema", "true").option("header", "true").csv(path)
    //查询前5个count max 的国家
    data.groupBy("DEST_COUNTRY_NAME").sum("count")
      .withColumnRenamed("sum(count)", "destination_total")
      .sort(desc("destination_total"))
      .limit(5).show()

代码的执行如图:

查看用户在一天内进行采集所用费用最多的日期:
下面是表格的格式:

    //添加一个列用于统计总费用,并查看用户话费最多的是哪个日期
    val selectData = staticData.selectExpr("CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate")
    selectData.show()
    //进行分组:分组的标准:客户的ID 和 购买的时间(一天内为相同标准), 并对分组的内容进行统计
    val groupData = selectData.groupBy(
      col("CustomerId"), window(col("InvoiceDate"), "1 day")
    ).sum("total_cost")
    groupData.show(5)

window函数:https://blog.csdn.net/weixin_38653290/article/details/83962789

使用流处理实现相同功能

    //进行分组:分组的标准:客户的ID 和 购买的时间(一天内为相同标准), 并对分组的内容进行统计
    val streamData = spark.readStream.schema(staticSchema) //设置分区
      .option("maxFilesPerTrigger", 1) //设置一次读入的文件个数
      .format("csv")
      .option("header", "true")
      .load(path)
    //执行相同的逻辑操作
    val streamGroupData = streamData.selectExpr(
      "CustomerId", "(UnitPrice * Quantity) as total_cost", "InvoiceDate"
    ).groupBy(
      $"CustomerId", window($"InvoiceDate", "1 day")
    ).sum("total_cost")

注意由于流处理和静态处理不一样,所以无法使用静态处理中的动作操作。流处理是将流处理的结果放入内存的一个表中。每一次处理完,不断的更新这个表即可

    //将结果存入内存中
    streamGroupData.writeStream.format("memory")    //表示存入内存中
      .queryName("streamGroupData")     //表示存入内存的表的名字
      .outputMode("complete")     //complete表示表中所有记录
      .start()

然后查询

    //对流处理后的结果进行查询
    spark.sql(
      """
        |select *
        |from streamGroupData
        |order by 'sum(total_cost)' desc
        |""".stripMargin).show(5)

标签:count,val,sum,基础,cost,分组,spark,total
From: https://www.cnblogs.com/ALINGMAOMAO/p/17118123.html

相关文章

  • spark中的聚合操作和分组操作
    聚合操作注意:任何的聚合操作都有默认的分组,聚合是在分组的基础上进行的。比如,对整体进行求和,那么分组就是整体。所以,在做聚合操作之前,一定要明确是在哪个分组上进行聚合操......
  • spark读写文件
    valpath="/Volumes/Data/BigData_code/data/retail-data/by-day/2010-12-01.csv"spark.read.format("csv").option("header","true").option("inferSchema","t......
  • spark的DataFrame的schema模式:读时模式, 指定模式
    读时模式valpath="/Volumes/Data/BigData_code/data/"//读取json生成dataframevaldf=spark.read.format("json").load(path+"flight-data/json/2015-......
  • spark DataFrame聚合操作
    在聚合操作中,需要指定键或分组方式,以及指定如何转换一列或多列数据的聚合函数。s除了处理任意类型的值之外,Spark还可以创建以下分组类型:最简单的分组通过在select语句......
  • MySQL基础
    基础篇通用语法及分类DDL:数据定义语言,用来定义数据库对象(数据库、表、字段)DML:数据操作语言,用来对数据库表中的数据进行增删改DQL:数据查询语言,用来查询数据库中......
  • vue基础:前端发展历史、Vue的介绍的基本使用
    目录一、前端发展历史二、Vue的介绍的基本使用1、Vue简介2、Vue特点3、M-V-VM思想4、组件化开发、单页面开发5、版本选择6、引入方式7、补充8、简单使用一、前端发展历史......
  • ideal的基础使用2022版本,黑马程序员的基础使用
    1.    2.配xml    <dependencies>    <dependency>        <groupId>javax.servlet</groupId>        <artifactId>javax.servl......
  • 基础-Linux网络
    查看路由表[root@localhost~]#route-nDestinationGatewayGenmaskFlagsMetricRefUseIface0.0.0.0172.18.4.2540.0.0.0......
  • 2.13python基础知识
      编程语言的发展史1.机器语言:内部用0和1表示2.汇编语言:简单的字母表示二进制3.高级语言:人类可以理解的1、执行效率:机器语言>汇编语言>高级语言(编译型>解释型)2......
  • python基础学习第一天
    目录一、typora软件及使用1、typora下载官方网站下载:https://www.typoraio.cn/windos系统下,下载和安装都不要在c盘,c盘是系统盘2、typora的使用书写标题警号+空......