首页 > 数据库 >SparkSQL

SparkSQL

时间:2022-10-27 20:44:47浏览次数:74  
标签:PYSPARK df DataFrame PYTHON SparkSQL sql spark

DataFrame

创建DataFrame

1.转换为DataFrame方式1

将RDD[元组或列表] 转换为DataFrame
  • 定义RDD,每个元素是Row类型

  • 将上面的RDD[Row]转换成DataFrame,df=spark.createDataFrame(row_rdd)

  • 代码

    # -*- coding:utf-8 -*-
    # Desc:This is Code Desc
    import os
    # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
    from pyspark import Row
    from pyspark.sql import SparkSession
    
    os.environ['SPARK_HOME'] = '/export/server/spark'
    PYSPARK_PYTHON = "/root/anaconda3/bin/python"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    
    #方式1-通过RDD<Row>,转换成DataFrame
    if __name__ == '__main__':
        #1-创建上下文对象
        #第二代的上下文对象SparkSession,第一代是SparkContext。
        #SparkSession功能更强大,兼容SparkContext,
        spark=SparkSession\
            .builder\
            .appName('test')\
            .master('local[*]')\
            .getOrCreate()
        #可以从SparkSession获取SparkContext
        sc=spark.sparkContext
        #2-加载文本文件形成RDD,每个元素是string
        rdd1=sc.textFile('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')
        rdd1.foreach(lambda x:print(x))
        #3-将RDD的每个元素转换成Row对象
        #比如将 ‘Justin, 19’ 转换成 Row(name='Justin',age=19)
        rdd2=rdd1.map(lambda x:x.split(','))
        rdd2.foreach(lambda x: print(x))
        rdd3=rdd2.map(lambda x:Row(name=x[0],age=int(x[1].strip())))
        rdd3.foreach(lambda x: print(x))
        #4-将RDD转换成DataFrame
        df=spark.createDataFrame(rdd3)
        #5-打印DataFrame的schema
        df.printSchema()
        #6-打印DataFrame的数据
        df.show()
        #7-用sparkSQL筛选年龄介于13~19岁的人员
        #将DataFrame注册成临时表
        df.createOrReplaceTempView('people')
        #查询临时表
        df2=spark.sql('select * from people where age>=13 and age<=19')
        #打印结果
        df2.show()
    

2.转换为DataFrame方式2

RDD[元组或列表]+自定义Schema信息 -->DataFrame
  • 核心步骤

    • 1、RDD的每个元素转换为【元组或列表】
    • 2、依据元组的值自定义schema
    • 3、spark.createDataFrame(rdd,schema)
  • 代码

    直接复制上面的文件稍加修改,不用从头写

# -*- coding:utf-8 -*-
# Desc:This is Code Desc
import os
# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
from pyspark import Row
from pyspark.sql import SparkSession


os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

#2-RDD<元组或列表> -> 定义schema  -> spark.createDataFrame(RDD,schema)
if __name__ == '__main__':
    #1-创建上下文对象
    spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    sc=spark.sparkContext
    #2-加载文本文件形成RDD
    #仍然需要通过SparkContext对象来创建RDD
    #SparkSession对象无法直接加载数据得到RDD
    rdd1=sc.textFile('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')

    #3-将RDD的每个元素转换成元组
    rdd2=rdd1.map(lambda x: (x.split(',')[0],int(x.split(',')[1].strip()) ))
    #4-为上面的元组,自定义schema
    from pyspark.sql.types import StructType, StructField, StringType, IntegerType
    schema = StructType([
        StructField('name', StringType(), True),
        StructField('age', IntegerType(), True)
    ])
    #5-将RDD转换成DataFrame
    df=spark.createDataFrame(rdd2 , schema)
    #6-打印DataFrame的schema
    df.printSchema()
    #7-打印DataFrame的数据
    df.show()


3.转换为DataFrame方式3

RDD[容器]+toDF(指定列名)
  • 步骤:

    • RDD的每个元素转换为【元组或列表】。
    • 再加上toDF(指定多个列名)
  • 代码

    直接复制上面的文件稍加修改,不用从头写

# -*- coding:utf-8 -*-
# Desc:This is Code Desc
import os
# 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
from pyspark import Row
from pyspark.sql import SparkSession


os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

#3-RDD<元组或列表>  -> RDD.toDF([字段名1,字段名2])
if __name__ == '__main__':
    #1-创建上下文对象
    spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    sc=spark.sparkContext
    #2-加载文本文件形成RDD
    #仍然需要通过SparkContext对象来创建RDD
    #SparkSession对象无法直接加载数据得到RDD
    rdd1=sc.textFile('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')

    #3-将RDD的每个元素转换成元组
    rdd2=rdd1.map(lambda x: (x.split(',')[0],int(x.split(',')[1].strip()) ))

    #4-将RDD转换成DataFrame
    df=rdd2.toDF(['name','age'])
    #5-打印DataFrame的schema
    df.printSchema()
    #6-打印DataFrame的数据
    df.show()

4.转换为DataFrame方式4(了解)

【了解】Pandas构建DataFrame
  • 语法:Spark的DataFrame=spark.createDataFrame(pandas的DataFrame)

  • 注意:虽然都叫Dataframe,但是pandas的Dataframe是单机版的,Spark的Dataframe是多机器分布式的。

  • 代码

    # -*- coding:utf-8 -*-
    # Desc:This is Code Desc
    import os
    # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
    from pyspark import Row
    from pyspark.sql import SparkSession
    
    
    os.environ['SPARK_HOME'] = '/export/server/spark'
    PYSPARK_PYTHON = "/root/anaconda3/bin/python"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    
    #3-RDD<元组或列表>  -> RDD.toDF([字段名1,字段名2])
    if __name__ == '__main__':
        #1-创建上下文对象
        spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
        #2-创建一个pandas的DataFrame
        import pandas as pd
        pdf = pd.DataFrame({
            "id": [1, 2, 3],
            "name": ["张大仙", '王晓晓', '王大锤'],
            "age": [11, 11, 11]
        })
        print(pdf)
    
        #3-将其转换成spark的DataFrame
        df=spark.createDataFrame(pdf)
        #4-打印schema
        df.printSchema()
        #5-打印数据
        df.show()
    

二.加载外部数据(文件)转化为DataFrame

  • 用【spark.read】统一加载各种格式的数据文件
import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StringType, IntegerType

os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
os.environ['SPARK_HOME'] = '/export/server/spark'
PYSPARK_PYTHON = "/root/anaconda3/bin/python"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON

if __name__ == '__main__':
    #1创建上下文对象
    spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
    #2-加载txt文件形成DataFrame
    sc=spark.sparkContext
    #都是加载同样的txt文件,但是sc返回的是RDD,SparkSession返回的是DataFrame
    rdd=sc.textFile('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')
    df1=spark.read.format('text').load('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')
    #简化写法2
    df2=spark.read.text('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')
    # 普通文本文件加载后,只有一个value字段,值就是每行的字符串
    df1.printSchema()
    df1.show()

    #3加载json文件形成DataFrame
    df3=spark.read.json('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.json')
    df3.printSchema()
    df3.show()
    #4加载,csv文件形成DataFrame
    #sep,指定分隔符,csv文件默认分隔符为,号
    #header,是否把第一行识别成字段名
    #inferSchema,是否自动推断字段类型,比如将20识别成整数而不是string的'20'
    df4=spark.read\
        .option('sep',';')\
        .option('header','true')\
        .option('inferSchema','true')\   # inferSchema=True 表示开启自动推断字段的类型
        .csv('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.csv')
    df4.printSchema()
    df4.show()

    #5加载parquet文件形成DataFrame
    #parquet是列式存储文件,无法用文本打开
    df5=spark.read.parquet('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/users.parquet')
    df5.show()
    df5.printSchema()
    spark.stop()

DataFrame常用操作

DSL风格

  • 简单来说DSL风格就是,调用dataframe的api名,他们与sql语句的关键字同名。

    • #比如
      df.select()
      .where()
      .groupby()
      .orderby()
      .limit()
      
    • 其他更复杂的api,可以借助pyspark.sql.functions包下的API

SQL 风格

  • 2个步骤

      1. 将DataFrame注册成一个【视图(可以认为是表)】
      df.createOrReplaceTempView('表名')
      
    • 2.查询上面的视图(表)

      df2=spark.sql('sql 语句')
      

案例一,花式查询

  • 需求:

    • 读取people.txt并分析下面

    • DSL分析
      1.查看DataFrame中的内容,通过调用show方法
      2.查看DataFrame的Scheme信息
      3.第一种方式查看name字段数据
      4.第二种方式查看name字段数据
      5.第三种方式查看name和age字段数据
      6.每个年龄加1
      7.过滤出年龄大于21的数据
      8.统计每个年龄有多少人
      SQL分析
      1 查看DataFrame中的内容
      2 查看DataFrame的Scheme信息
      3 查看name字段数据
      3 根据age排序的前两个人员信息
      4 查询年龄大于25的人的信息
      
      
  • 代码

    import os
    
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    
    os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
    os.environ['SPARK_HOME'] = '/export/server/spark'
    PYSPARK_PYTHON = "/root/anaconda3/bin/python"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    
    if __name__ == '__main__':
        #1创建上下文对象
        spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
        sc=spark.sparkContext
        #2-加载people.txt形成DataFrame
        #最好先加载成RDDF,再转化成DataFrame
        rdd=sc.textFile('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/people.txt')
        df=rdd.map(lambda x:x.split(',')).map(lambda l:(l[0],int(l[1].strip()))).toDF(['name','age'])
    
        #DSL分析
        # 1.查看DataFrame中的内容,通过调用show方法
        df.show()
        # 2.查看DataFrame的Scheme信息
        df.printSchema()
        # 3.第一种方式查看name字段数据
        df2=df.select('name','age')
        df2.show()
        df2.printSchema()
        # 4.第二种方式查看name字段数据
        df.select(['name','age']).show()
        # 5.第三种方式查看name和age字段数据
        df.select(df['name']).show()
        # 6.每个年龄加1
        #下面是错误写法
        #df.select('age+1').show()
        #正确写法
        df.select(df['age']+1).show()
        df.select((df['age']+1).alias('new_age')).show()
        # 7.过滤出年龄大于21的数据
        df.where('age>21').show()
        df.filter('age>21').show()
        # 8.统计每个年龄有多少人
        df.groupby('age').count().show()
        # SQL分析
        #将DataFrame映射为一个表名
        df.createOrReplaceTempView('people')
        # 1 查看DataFrame中的内容
        df3=spark.sql('select * from people')
    
        df3.show()
        # 2 查看DataFrame的Scheme信息
        df3.printSchema()
        # 3 查看name字段数据
        spark.sql('select name from people').show()
        # 3 根据age排序的前两个人员信息
        spark.sql('select * from people order by age desc limit 2').show()
        # 4 查询年龄大于25的人的信息
        spark.sql('select * from people where age>25').show()
    
    

案例二,wordcount

  • 需求,用SQL和DSL 2种编程方式来做wordcount,并对词频倒序排序

  • DSL风格做wordcount

    • 对DataFrame调用API
    • 调用的顺序【split】->【explode】->【groupby】+【count】+【orderBy】
  • SQL风格做wordcount

    • SQL分析2个步骤
      • 注册临时视图
      • 在spark.sql()中使用上面的视图名
    • 做wordcount的步骤: split->explode->group by+count+order by
    • 最好把每一步的中间结果打印出来,做到心中有数。
  • 代码

    • 最好先写【SQL】风格,可以推断出【DSL】函数的调用顺序。
    import os
    
    from pyspark.sql import SparkSession
    from pyspark.sql.types import StructType, StringType, IntegerType
    
    os.environ['JAVA_HOME'] = '/export/server/jdk1.8.0_241'
    os.environ['SPARK_HOME'] = '/export/server/spark'
    PYSPARK_PYTHON = "/root/anaconda3/bin/python"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    
    if __name__ == '__main__':
        #1创建上下文对象
        spark=SparkSession.builder.appName('test').master('local[*]').getOrCreate()
        sc=spark.sparkContext
        #加载文件形成DataFrame
        df=spark.read.text('file:///export/pyworkspace/pyspark_dev/spark_sql/data/sql/words.txt')
        df.printSchema()
        df.show()
    
        #用SQL风格做wordcount
        #映射成临时表名
        df.createOrReplaceTempView('words_t')
        spark.sql('''
            select word,
                   count(1) as cnt
             from 
                 (select explode(split(value , ' ')) as word  from words_t ) t
             group by word
             order by cnt desc
        ''').show()
        #用DSL风格做wordcount
        from pyspark.sql import functions as F
        df2=df.select(F.split('value',' ').alias('arr'))
        df3=df2.select(F.explode('arr').alias('word'))
        #下面调用count()后,会自动拼接一列名叫count
        df4=df3.groupby('word').count()
        df5=df4.orderBy('count',ascending=False)
        df5.show()
    
        spark.stop()
    
    
    

【重要】案例三 电影评分数据

  • 需求:对电影评分数据进行统计分析,获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)。

  • 注意:

    • 最好先写【SQL】风格,可以推断出【DSL】函数的调用顺序。

    • 对shuffle调优,因为SQL语句中的【group by】、【order by】,【distinct】关键字都会触发shuffle,那么就会触发shuffle的并行度参数【spark.sql.shuffle.partitions】起作用,他默认是【200】,可以适时降低。可以为申请核数的2~3倍。

    • 查看官网参数 http://spark.apache.org/docs/latest/configuration.html

    • 对shuffle的调优设置,有2种方式

      • 既可以在【上下文对象】中配置,

            spark=SparkSession.builder\
                .appName('test')\
                .master('local[*]')\
                .config('spark.sql.shuffle.partitions',4)\
                .getOrCreate()
        
        
      • 也可以在【sql会话】中设置。

        spark.sql('set spark.sql.shuffle.partitions=4')
        
        
  • 代码

    # -*- coding:utf-8 -*-
    # Desc:This is Code Desc
    import os
    # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
    import time
    
    from pyspark import Row, StorageLevel
    from pyspark.sql import SparkSession
    
    
    os.environ['SPARK_HOME'] = '/export/server/spark'
    PYSPARK_PYTHON = "/root/anaconda3/bin/python"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    
    if __name__ == '__main__':
        #1-创建上下文对象
        spark=SparkSession.builder\
            .appName('test')\
            .master('local[*]')\
            .config('spark.sql.shuffle.partitions','4')\
            .getOrCreate()
        #设置方式2
        spark.sql('set spark.sql.shuffle.partitions=4')
        #2-加载电影文件形成DataFrame
        df=spark.read.format('csv')\
            .option('sep','::') \
            .schema('user_id string, movie_id string, rating float, timestamp long') \
            .load('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/sql/ratings.dat')
        df.persist(StorageLevel.MEMORY_AND_DISK_2)
        df.printSchema()
        df.show()
    
        #获取Top10电影(电影评分平均值最高,并且每个电影被评分的次数大于2000)
        #3-先用SQL风格做一遍
        df.createOrReplaceTempView('movie_t')
        result_df = spark.sql('''
            select movie_id,
                   round(avg(rating) , 2) as avg_rate,--平均分
                   count(1) as cnt
            from movie_t 
            group by movie_id
            having cnt>2000
            order by avg_rate desc
            limit 10
        ''')
        #优化:由于result_df被复用了多次,所以可以做持久化
        result_df.persist(StorageLevel.MEMORY_AND_DISK_2)
        #对于DataFrame来说,他的show,write等操作,跟RDD的action操作是一样的都是触发执行。
        result_df.show()
        #4-再用DSL风格做一遍
        from pyspark.sql import functions as F
        df.groupby('movie_id').agg(
            F.round(F.avg('rating'), 2).alias('avg_rate'),
            F.count('user_id').alias('cnt')
        ).where('cnt>2000') \
            .orderBy('avg_rate', ascending=False) \
            .limit(10)\
            .show()
    
        #5-将结果输出成csv文件
        result_df.write.format('csv') \
            .option('header', True) \
            .save('file:///export/pyworkspace/pyspark_dev/spark_sql/data/out/movie')
    	#6-将结果输出到MySQL.
        #TODO 1-需要事先创建数据库名,比如叫bigdata
        #TODO 2-需要将MySQL的驱动jar包复制到spark/jars目录中
        #TODO 3-无需创建表,Spark会自动在MySQL中建表。
            result_df.write.format('jdbc')\
                .mode('overwrite')\
                .option('url','jdbc:mysql://node1:3306/bigdata')\
                .option('dbtable','tb_movie_top10')\
                .option('user','root')\
                .option('password','123456')\
                .save()
    
        time.sleep(10*60)
    
    
    
  • 由webui页面可知,【4040首页jobs标签页,点击对应的jobid,跳转到job详情页,显示DAG和拆分的各stage运行情况。】shuffle调优之后,shuffle的耗时从【9s降到了1s】。shuffle的Task数从【200降到了4】

  • 不管是SQL还是DSL 2种风格,他们的【DAG】执行过程都是一样的。因为底层都是【catalyst】引擎。

SparkSQL的外部数据源

  • Spark的统一的读取数据的入口

    • spark.read.format(【保存格式】).load(【文件所在的路径】)
    • 简写: spark.read.格式的名称('文件的路径')
      • 例如: spark.read.option('key','value').json('文件路径')
  • Spark的统一的数据的写出的出口

    • DataFrame.write.format(【格式】).mode('保存模式').save(【保存的文件路径】)
    • 简写: DataFrame.write.保存的格式(保存到哪个路径);
      • 例如: df.write.mode('保存模式').option('key','value').json('保存的路径')
  • Spark的保存模式有4种方式append、【overwrite】、ignore、errorifexists、

  • 保存的综合案例。

    # -*- coding:utf-8 -*-
    # Desc:This is Code Desc
    import os
    # 这里可以选择本地PySpark环境执行Spark代码,也可以使用虚拟机中PySpark环境,通过os可以配置
    import time
    
    from pyspark import Row, StorageLevel
    from pyspark.sql import SparkSession
    
    os.environ['SPARK_HOME'] = '/export/server/spark'
    PYSPARK_PYTHON = "/root/anaconda3/bin/python"
    # 当存在多个版本时,不指定很可能会导致出错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
    
    if __name__ == '__main__':
        #1-创建上下文对象
        spark=SparkSession.builder\
            .appName('test')\
            .master('local[*]')\
            .getOrCreate()
        #2-通过jdbc读取mysql的数据
        df=spark.read.format('jdbc')\
            .option('url','jdbc:mysql://node1:3306/bigdata') \
            .option('dbtable','tb_movie_top10') \
            .option('user','root') \
            .option('password','123456') \
            .load()
        df.printSchema()
        df.show()
        #3-将上面的DataFrame保存为text
        #注意保存为text纯文本时,要求DataFrame的列只能有1列,不能有多列
        #所以折中方式是利用concat_ws函数将多列拼接为1列。
        from pyspark.sql import functions as F
        df.select(F.concat_ws('_','movie_id','avg_rate','cnt'))\
            .write\
            .format('text')\
            .mode('overwrite')\
            .save('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/text')
        #4-将上面的DataFrame保存为csv
        df.write.format('csv').mode('overwrite')\
            .option('header',True) \
            .option('sep','\t') \
            .save('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/csv')
        #简写方式
        df.write.mode('overwrite')\
            .option('header',True) \
            .option('sep','\t') \
            .csv('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/csv2')
    
        #5-将上面的DataFrame保存为json
        df.write.mode('overwrite').json('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/json')
        #6-将上面的DataFrame保存为parquet
        df.write.mode('overwrite').parquet('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/parquet')
        #spark的默认的读写格式就是parquet,所以此时parquet可以默认不写
        df.write.mode('overwrite').save('file:///export/pyworkspace/pyspark_sz29/spark_sql/data/out/parquet2')
        #7-将上面的DataFrame保存到mysql
    	#格式1
        df.write.mode('overwrite') \
        .format('jdbc')\
        .option('url', 'jdbc:mysql://node1:3306/test') \
        .option('dbtable', 'tb_movie_top10_2') \
        .option('user', 'root') \
        .option('password', '123456')\
        .save()
        #格式2
        properties={'user':'root','password':'123456'}
        df.write.mode('overwrite')\
            .jdbc('jdbc:mysql://node1:3306/bigdata','tb_movie_top10_2','overwrite',properties)
    
    
    

标签:PYSPARK,df,DataFrame,PYTHON,SparkSQL,sql,spark
From: https://www.cnblogs.com/nanguyhz/p/16833675.html

相关文章

  • SparkSQL
    DataFrameDataFrame是一种以RDD为基础的分布式数据集,类似于二维表格。与RDD的区别在于,前者带有schema元信息,即DataFrame。DataFrame也是懒执行的,但性能上比......
  • SparkSQL参数
    SparkSQL参数<1>表分区类参数--是否允许动态生成分区sethive.exec.dynamic.partition=true;--是否容忍指定分区全部动态生成sethive.exec.dynamic.partition.mode=......
  • SparkSQL on K8s 在网易传媒的落地实践
    随着云原生技术的发展和成熟,大数据基础设施积极拥抱云原生是业内发展的一大趋势。网易传媒在2021年成功将SparkSQL部署到了K8s集群,并实现与部分在线业务的混合部署,......
  • (4)SparkSQL中如何定义UDF和使用UDF
    SparkSQL中用户自定义函数,用法和SparkSQL中的内置函数类似;是saprkSQL中内置函数无法满足要求,用户根据业务需求自定义的函数。首先定义一个UDF函数:packagecom.udf;import......
  • 关于sparksql调优的一些操作
    1、查看执行计划 1、直接sql查看:explainselect...from... 2、ds.explain()2、执行计划的处理流程 sql代码->未决断的逻辑执行计划->根据元数据生成已决断的逻辑......
  • sparksql 优化
    最近把spark文档里面配置那一页看了一下,在这记录一些可用的配置,免得后续再去查文档地址:https://spark.apache.org/docs/3.0.1/configuration.htmlSpark文档运行环境......
  • sparksql 函数大全
    数学函数函数简介用法acosh反双曲余弦值SELECTacosh(0.5);0.9624236501192069SELECTacosh(3.5);1.9248473002384139asinh反双曲正弦SELECTasinh(1.45);......
  • sparksql概念补充
    Spark-sql概念补充基本概念        SparkSQL是基于RDD的,可以通过Schema信息来访问其中某个字段        RDD处理的不是结构化数据,所以不能进行类似HIve......
  • 创建SparkSQL的项目
    创建项目方式和前面一样pom依赖不一样无需导入spark_core包,因为spark_sql中包含了spark_corepom.xml文件<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="h......
  • SparkSQL支持的数据源
    1.SparkSQL支持的数据源HiveScala内存中数据--集合支持从RDD读取数据作SQL操作支持从外部存储文件读取数据json,csv,普通结构文本文件支持从关系型数据库读取数据处理......