首页 > 其他分享 >spark dataset dataframe 动态添加列

spark dataset dataframe 动态添加列

时间:2023-07-31 17:34:48浏览次数:52  
标签:val dataframe dataset apache org spark Row row

需求

利用SparkSQL计算每一行数据的数据质量,如果数据不为NULL或者不为空字符串(或者符合正则表达式),那么该字段该行数据积一分

网上解决方案

https://blog.csdn.net/Code_LT/article/details/87719115
https://blog.csdn.net/LLJJYY001/article/details/88964961?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-1-88964961-blog-87719115.235^v38^pc_relevant_yljh&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-1-88964961-blog-87719115.235^v38^pc_relevant_yljh&utm_relevant_index=2

package com.emmm.test.scala

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object Emmm {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[*]")
    conf.setAppName(this.getClass.getSimpleName)
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    conf.set("spark.kryo.registrationRequired", "true")
    conf.registerKryoClasses(Array(
      Class.forName("scala.collection.mutable.WrappedArray$ofRef"),
      Class.forName("org.apache.spark.sql.types.StringType$"),
      classOf[TPerson],
      classOf[org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema],
      classOf[org.apache.spark.sql.types.StructType],
      classOf[org.apache.spark.sql.types.StructField],
      classOf[org.apache.spark.sql.types.Metadata],
      classOf[Array[TPerson]],
      classOf[Array[org.apache.spark.sql.Row]],
      classOf[Array[org.apache.spark.sql.types.StructField]],
      classOf[Array[Object]]
    ))
    val spark = SparkSession.builder()
      .config(conf)
      .getOrCreate()
    import spark.implicits._
    // 使用样例类创建RDD并转化成DF后又回到RDD
    spark.sparkContext.parallelize(Seq(TPerson("zs", "21"), TPerson("ls", "25"))).toDF().rdd
      .map(row => {
        // 打印schema
        println(row.schema)
        // 得到Row中的数据并往其中添加我们要新增的字段值
        val buffer = Row.unapplySeq(row).get.map(_.asInstanceOf[String]).toBuffer
        buffer.append("男") //增加一个性别
        buffer.append("北京") //增肌一个地址

        // 获取原来row中的schema,并在原来Row中的Schema上增加我们要增加的字段名以及类型.
        val schema: StructType = row.schema
          .add("gender", StringType)
          .add("address", StringType)
        // 使用Row的子类GenericRowWithSchema创建新的Row
        val newRow: Row = new GenericRowWithSchema(buffer.toArray, schema)
        // 使用新的Row替换成原来的Row
        newRow
      }).map(row => {
      // 打印新的schema
      println(row.schema)
      // 测试我们新增的字段
      val gender = row.getAs[String]("gender")
      // 获取原本就有的字段
      val name = row.getAs[String]("name")
      val age = row.getAs[String]("age")
      // 获取新的字段
      val address = row.getAs[String]("address")
      // 输出查看结果
      println(s"$name-$age-$gender-$address")
      row
    }).collect()
    spark.stop()
  }

  /**
    * 样例类
    *
    * @param name name属性
    * @param age  age属性
    */
  case class TPerson(name: String, age: String)

}


遇到问题

  • Row 行 Value列表 List 新增加一个元素,但是未生效,可能未考虑返回值 newList = List.append('new') 接收,详细查看一个scala List 集合返回值
  • freme.map(fun)(Encoder) 序列化问题,网上推荐一般使用 Encoders.kryo[] ,但是这个如果不传参数的话,默认返回值是一个BinaryType 而后row里面的返回值也变成byte二进制数组和实际需求渐行渐远,故不考虑

解决

部分核心代码,需要导入org.apache.spark.sql.catalyst.encoders.RowEncoder 序列化器,跟了一下SparkSQL的源码 ,底层使用的也是该类


    import org.apache.spark.sql.catalyst.encoders.RowEncoder
    val fields: Array[StructField] = frame.schema.fields
    val newFields = fields :+ StructField("score", IntegerType)
    val scoreDataset: Dataset[Row] = frame
      .map(row => {
        var score = 0
        val map: Map[String, Nothing] = row.getValuesMap(columns)
        map.foreach(m => {
          if (m._2 != null && m._2.toString.trim.nonEmpty) {
            score = score + 1
          }
        })
        // row.toSeq: _* 将原来row内的元素展开存放到新数组里面 .++(Array(score)) 两个数组拼接形成新的数组
        val array: Array[Any] = Array(row.toSeq: _*).++(Array(score))
        //重新创建一个Row对象,将数组中的元素 展开然后形成新的行value
        val newRow = Row(array: _*)
        //将新的row返回
        newRow
        //使用到了Spark同款序列化器,传入新增字段的 score 类型
      })(RowEncoder(StructType(newFields)))
      

参考

标签:val,dataframe,dataset,apache,org,spark,Row,row
From: https://www.cnblogs.com/iullor/p/17588666.html

相关文章

  • 关于spark写clickhouse出现 too many parts(300)错误的最佳解决方式
    出现这个问题的根本原因是clickhouse插入速度超过clickhouse的文件合并速度(默认300)解决方式如下 觉得好用记得点个关注或者赞哈......
  • Spark入门
    一、Spark框架概述1.1spark是什么定义:ApacheSpark是用于大规模数据(large-scaladata)处理的统一(unified)分析引擎。弹性分布式数据集RDD:RDD是一种分布式内存抽象,其使得程序员能够在大规模集群中做内存运算,并且有一定的容错方式。而这也是整个Spark的核心数据结构,Spark整个......
  • CDP7环境下使用SparkSQL Shell方式
    相信很多在用CDP7的小伙伴都遇到了Spark里面不再支持spark-sql的问题这里给出两种解决方案:spark-submit与spark-shellcloudera官方给的解决方案https://docs.cloudera.com/cdp-private-cloud-base/7.1.5/developing-spark-applications/topics/spark-sql-example.html基于这个方案,......
  • dolphinscheduler 调度spark on k8s
    dolphinscheduler对于k8s的支持可以使用spark任务模式选择k8s配置,当然也可以直接通过k8s集成通过容器镜像模式运行,两种方式各有利弊,但是完全基于k8s模式会比较方便些集成玩法说明spark任务模式此模式我们需要配置SPARK_HOME给每个dolphinschedulerworker节点,同时对于......
  • PysparkNote006---pycharm加载spark环境
    pycharm配置pyspark环境,本地执行pyspark代码spark安装、添加环境变量不提了File-Settings-Project-ProjectStructure-addcontentroot添加如下两个路径D:\code\spark\python\lib\py4j-0.10.7-src.zipD:\code\spark\python\lib\pyspark.zip                ......
  • 安装spark local运行出现错误NoClassDefFoundError: org/slf4j/Logger 原来是要设置
    Error:Unabletoinitializemainclassorg.apache.spark.deploy.SparkSubmitCausedby:java.lang.NoClassDefFoundError:org/slf4j/Logger HowtoinstallsparklocallyConsideringsparkwithouthadoopbuilt-in.Downloadhadoopunpackto/opt/hadoop/Downloadsp......
  • 用concat比较两个dataframe
    因为equals会比对索引等,可能出现内容相同但是行序不同比对失败,可以采用concat,去除重复后如果为空则表示数据一致。1#比对两个DataFrame23ifdf1.equals(df2):4returnTrue,None5else:6diff_row=pd.concat([df1,df2]).drop_duplicates(keep=False)......
  • 大数据量时生成DataFrame避免使用效率低的append方法
    转载请注明出处:https://www.cnblogs.com/oceanicstar/p/10900332.html append方法可以很方便地拼接两个DataFramedf1.append(df2)>AB>1A1B1>2A2B2>3A3B3>4A4B4但数据量大时生成DataFrame,应避免使用append方法因为:    与python列表中的appe......
  • Hadoop vs Spark性能对比
    基于Spark-0.4和Hadoop-0.20.21.Kmeans数据:自己产生的三维数据,分别围绕正方形的8个顶点{0,0,0},{0,10,0},{0,0,10},{0,10,10},{10,0,0},{10,0,10},{10,10,0},{10,10,10}Pointnumber189,918,082(1亿9千万个三维点)Capacity10GBHDFSLocation/user/LijieXu/Km......
  • Pandas学习笔记之Dataframe
    一、Dataframe基本概念#二维数组"Dataframe:是一个表格型的数据结构,包含一组有序的列,其列的值类型可以是数值、字符串、布尔值等。data={'name':['Jack','Tom','Mary'],'age':[18,19,20],'gender':['m','m&#......