首页 > 数据库 >SparkSql读取数据的方式

SparkSql读取数据的方式

时间:2024-11-06 19:16:08浏览次数:3  
标签:INSERT 读取数据 方式 INTO VALUES SparkSql spark NULL emp

一、读取普通文件 

方式一:给定读取数据源的类型和地址

spark.read.format("json").load(path)
spark.read.format("csv").load(path)
spark.read.format("parquet").load(path)

方式二:直接调用对应数据源类型的方法

spark.read.json(path)
spark.read.csv(path)
spark.read.parquet(path)

1、代码演示最普通的文件读取方式: 

from pyspark.sql import SparkSession
import os


if __name__ == '__main__':
	# 构建环境变量
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'

	# 获取sparkSession对象
	spark = SparkSession.builder.master("local[2]").appName("第一次构建SparkSession").config(
		"spark.sql.shuffle.partitions", 2).getOrCreate()

	df01 = spark.read.json("../../datas/resources/people.json")
	df01.printSchema()
	df02 = spark.read.format("json").load("../../datas/resources/people.json")
	df02.printSchema()
	df03 = spark.read.parquet("../../datas/resources/users.parquet")
	df03.printSchema()
	#spark.read.orc("")
	df04 = spark.read.format("orc").load("../../datas/resources/users.orc")
	df04.printSchema()
	df05 = spark.read.format("csv").option("sep",";").load("../../datas/resources/people.csv")
	df05.printSchema()

	df06 = spark.read.load(
		path="../../datas/resources/people.csv",
		format="csv",
		sep=";"
	)
	df06.printSchema()

	spark.stop()

二、 通过jdbc读取数据库数据

先在本地数据库或者linux数据库中插入一张表:

CREATE TABLE `emp`  (
  `empno` int(11) NULL DEFAULT NULL,
  `ename` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `job` varchar(50) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `mgr` int(11) NULL DEFAULT NULL,
  `hiredate` date NULL DEFAULT NULL,
  `sal` decimal(7, 2) NULL DEFAULT NULL,
  `comm` decimal(7, 2) NULL DEFAULT NULL,
  `deptno` int(11) NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of emp
-- ----------------------------
INSERT INTO `emp` VALUES (7369, 'SMITH', 'CLERK', 7902, '1980-12-17', 800.00, NULL, 20);
INSERT INTO `emp` VALUES (7499, 'ALLEN', 'SALESMAN', 7698, '1981-02-20', 1600.00, 300.00, 30);
INSERT INTO `emp` VALUES (7521, 'WARD', 'SALESMAN', 7698, '1981-02-22', 1250.00, 500.00, 30);
INSERT INTO `emp` VALUES (7566, 'JONES', 'MANAGER', 7839, '1981-04-02', 2975.00, NULL, 20);
INSERT INTO `emp` VALUES (7654, 'MARTIN', 'SALESMAN', 7698, '1981-09-28', 1250.00, 1400.00, 30);
INSERT INTO `emp` VALUES (7698, 'BLAKE', 'MANAGER', 7839, '1981-05-01', 2850.00, NULL, 30);
INSERT INTO `emp` VALUES (7782, 'CLARK', 'MANAGER', 7839, '1981-06-09', 2450.00, NULL, 10);
INSERT INTO `emp` VALUES (7788, 'SCOTT', 'ANALYST', 7566, '1987-04-19', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7839, 'KING', 'PRESIDENT', NULL, '1981-11-17', 5000.00, NULL, 10);
INSERT INTO `emp` VALUES (7844, 'TURNER', 'SALESMAN', 7698, '1981-09-08', 1500.00, 0.00, 30);
INSERT INTO `emp` VALUES (7876, 'ADAMS', 'CLERK', 7788, '1987-05-23', 1100.00, NULL, 20);
INSERT INTO `emp` VALUES (7900, 'JAMES', 'CLERK', 7698, '1981-12-03', 950.00, NULL, 30);
INSERT INTO `emp` VALUES (7902, 'FORD', 'ANALYST', 7566, '1981-12-03', 3000.00, NULL, 20);
INSERT INTO `emp` VALUES (7934, 'MILLER', 'CLERK', 7782, '1982-01-23', 1300.00, NULL, 10);

dept的数据:

CREATE TABLE `dept`  (
  `deptno` int(11) NULL DEFAULT NULL,
  `dname` varchar(14) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `loc` varchar(13) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of dept
-- ----------------------------
INSERT INTO `dept` VALUES (10, 'ACCOUNTING', 'NEW YORK');
INSERT INTO `dept` VALUES (20, 'RESEARCH', 'DALLAS');
INSERT INTO `dept` VALUES (30, 'SALES', 'CHICAGO');
INSERT INTO `dept` VALUES (40, 'OPERATIONS', 'BOSTON');

查询时会报如下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o67.load.
: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:46)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1(JDBCOptions.scala:102)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$1$adapted(JDBCOptions.scala:102)

接着放驱动程序: 

Python环境放入MySQL连接驱动

  • 找到工程中pyspark库包所在的环境,将驱动包放入环境所在的jars目录中
  • 如果是Linux上:注意集群模式所有节点都要放。

第一种情况:

假如你是windows环境:

我的最终的路径是在这里:

第二种情况:linux环境下,按照如下方式进行

# 进入目录
cd /opt/installs/anaconda3/lib/python3.8/site-packages/pyspark/jars

# 上传jar包:mysql-connector-java-5.1.32.jar

代码演示: 

import os

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType


if __name__ == '__main__':
	# 获取sparkSession对象
	# 设置 任务的环境变量
	os.environ['JAVA_HOME'] = r'C:\Program Files\Java\jdk1.8.0_77'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = r'C:\ProgramData\Miniconda3\python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
	# 得到sparkSession对象
	spark = SparkSession.builder.master("local[2]").appName("").config("spark.sql.shuffle.partitions", 2).getOrCreate()
	

	dict = {"user":"root","password":"root"}
	jdbcDf = spark.read.jdbc(url="jdbc:mysql://localhost:3306/spark",table="emp",properties=dict)
	jdbcDf.show()
	# jdbc的另一种写法
	jdbcDf2 = spark.read.format("jdbc") \
		.option("driver", "com.mysql.cj.jdbc.Driver") \
		.option("url", "jdbc:mysql://localhost:3306/spark") \
		.option("dbtable", "spark.dept") \
		.option("user", "root") \
		.option("password", "root").load()
	jdbcDf2.show()


	# 关闭
	spark.stop()

三、 读取table中的数据【hive】

  • 场景:Hive底层默认是MR引擎,计算性能特别差,一般用Hive作为数据仓库,使用SparkSQL对Hive中的数据进行计算
    • 存储:数据仓库:Hive:将HDFS文件映射成表
    • 计算:计算引擎:SparkSQL、Impala、Presto:对Hive中的数据表进行处理
  • 问题:SparkSQL怎么能访问到Hive中有哪些表,以及如何知道Hive中表对应的

1)集群环境操作hive 

需要启动的服务: 

先退出base环境:conda deactivate
启动服务:
启动hdfs:  start-dfs.sh  因为hive的数据在那里存储着
启动yarn:  start-yarn.sh 因为spark是根据yarn部署的,假如你的spark是standalone模式,不需要启动yarn.
日志服务也需要启动一下:
mapred --daemon start historyserver
# 启动Spark的HistoryServer:18080
/opt/installs/spark/sbin/start-history-server.sh
启动metastore服务: 因为sparkSQL需要知道表结构,和表数据的位置
hive-server-manager.sh start metastore
启动spark服务: 啥服务也没有了,已经启动完了。
查看metastore服务:
hive-server-manager.sh status metastore

修改配置: 

cd /opt/installs/spark/conf
新增:hive-site.xml
vi hive-site.xml

在这个文件中,编写如下配置:
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
    <property>
        <name>hive.metastore.uris</name>
        <value>thrift://bigdata01:9083</value>
    </property>
</configuration>

接着将该文件进行分发:
xsync.sh hive-site.xml

操作sparkSQL: 

/opt/installs/spark/bin/pyspark --master local[2] --conf spark.sql.shuffle.partitions=2

此处的pyspark更像是一个客户端,里面可以通过python编写spark代码而已。而我们以前安装的pyspark更像是spark的python运行环境。

进入后,通过内置对象spark:

>>> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|  default|
|     yhdb|
+---------+

>>> spark.sql("select * from yhdb.student").show()
+---+------+                                                                    
|sid| sname|
+---+------+
|  1|laoyan|
|  1|廉德枫|
|  2|  刘浩|
|  3|  王鑫|
|  4|  司翔|
+---+------+

2)开发环境如何编写代码,操作hive:

代码实战:

from pyspark.sql import SparkSession
import os



if __name__ == '__main__':
	# 构建环境变量
	# 配置环境
	os.environ['JAVA_HOME'] = 'D:/Program Files/Java/jdk1.8.0_271'
	# 配置Hadoop的路径,就是前面解压的那个路径
	os.environ['HADOOP_HOME'] = 'D:/hadoop-3.3.1/hadoop-3.3.1'
	# 配置base环境Python解析器的路径
	os.environ['PYSPARK_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'  # 配置base环境Python解析器的路径
	os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/ProgramData/Miniconda3/python.exe'
	# 防止在本地操作hdfs的时候,出现权限问题
	os.environ['HADOOP_USER_NAME'] = 'root'

	# 获取sparkSession对象
	spark = SparkSession \
		.builder \
		.appName("HiveAPP") \
		.master("local[2]") \
		.config("spark.sql.warehouse.dir", 'hdfs://bigdata01:9820/user/hive/warehouse') \
		.config('hive.metastore.uris', 'thrift://bigdata01:9083') \
		.config("spark.sql.shuffle.partitions", 2) \
		.enableHiveSupport() \
		.getOrCreate()

	spark.sql("select * from yhdb.student").show()

    spark.read.table("yhdb.student").show()

	spark.stop()

不要在一个python 文件中,创建两个不同的sparkSession对象,否则对于sparksql获取hive的元数据,有影响。

标签:INSERT,读取数据,方式,INTO,VALUES,SparkSql,spark,NULL,emp
From: https://blog.csdn.net/weixin_64860388/article/details/143525425

相关文章

  • js内存泄漏几种方式及如何避免、内存溢出
    https://blog.csdn.net/Judy_qiudie/article/details/82845692 一、什么是内存泄漏(memoryleak)?参考阮一峰老师博客:http://www.ruanyifeng.com/blog/2017/04/memory-leak.html不再用到的内存,没有及时释放,就叫做内存泄漏(memoryleak)。程序的运行需要内存。只要程序提出要求,操......
  • 除了委托和事件,C#中还有其他实现回调机制的方式吗?
    除了委托和事件,C#中还可以通过以下几种方式实现回调机制:一、接口回调定义接口:首先定义一个包含回调方法签名的接口。例如,假设要实现一个数据处理完成后的回调通知,可定义如下接口:interfaceIDataProcessingCallback{voidOnDataProcessed(intresult);}-这......
  • STM32G4 双ADC模式之交替触发方式
    目录概述1认识双ADC模式2功能实现2.1原理介绍2.2实现方法 2.3应用范例概述本文主要介绍STM32G4双ADC模式之交替触发方式,包括ADC模块的功能介绍,实现框架结构,以及交替触发方式ADC的转换的实现原理。1认识双ADC模式双ADC模式可用于具有两个或更多ADC的器件。......
  • 又简单又快的方式,快来试试吧!
    手动添加好友不仅耗时耗力,而且效率极低。现在,有了微信管理系统的自动加好友神器,能让你加微信好友又快又简单。本文将详细介绍如何利用这一神器,实现微信好友的高效管理。01 统一管理,简化操作我们首先需要将所有微信号登录到微信管理系统中。这样,我们就无需在多个账号和设备......
  • 【C/C++】野指针概念以及避免方式
    C语言中的野指针详解野指针(WildPointer)是指向未定义或非法内存位置的指针。本博客讲解野指针的概念、产生原因、危害以及如何避免野指针的问题。1.什么是野指针野指针指的是未初始化或已经失效的指针变量。这些指针指向的内存位置不再有效,可能被系统回收或被其他变量使......
  • 【八百客CRM-注册安全分析报告-无验证方式导致安全隐患】
    前言由于网站注册入口容易被黑客攻击,存在如下安全问题:1.暴力破解密码,造成用户信息泄露2.短信盗刷的安全问题,影响业务及导致用户投诉3.带来经济损失,尤其是后付费客户,风险巨大,造成亏损无底洞所以大部分网站及App都采取图形验证码或滑动验证码等交互解决方案,但在机......
  • 【落羽的落羽 C语言篇】操作符、二进制·之其一:初识编码方式及位操作符
    文章目录一、操作符1.操作符的分类2.操作符的属性2.1优先级2.2结合性二、二进制1.原码、反码、补码2.位操作符2.1左移操作符<<2.2右移操作符>>2.3&|^~一、操作符在C语言中,操作符是用于执行各种操作的符号,它们是构成语法、表达式的基本元素1.操......
  • 在 Windows 中,使用 命令提示符(CMD) 移动文件与在 图形用户界面(GUI) 中使用剪切和粘贴功
    在Windows中,使用命令提示符(CMD)移动文件与在图形用户界面(GUI)中使用剪切和粘贴功能的速度差异,实际上取决于几个因素。这里是对这两种方式的比较:1. CMD移动文件(使用 move 命令):基本操作:CMD中的 move 命令是将文件从一个位置移动到另一个位置。如果目标文件夹和源文件......
  • PCIe系列专题之二:2.2 TLP事务处理方式解析
    一、故事前传之前我们讲了对PCIe的一些基础概念作了一个宏观的介绍,了解了PCIe是一种封装分层协议(packet-basedlayeredprotocol),主要包括事务层(Transactionlayer),数据链路层(Datalinklayer)和物理层(Physicallayer)。较为详细解释请见之前的文章:1.PCIe技术概述;2.0PCIe......
  • 探讨FANUC机床锁机功能实现方式
    fanuc机床因为比较开放,市面上的资料比较多,个人查阅实现方式分3种。1、用中继控制循环启动的电路。此种方式实施比较麻烦,且需要增加硬件成本。2、修改PMC程序,控制PMC中的G信号点,如G130.0,G7.2,G8.5等等。3、不修改PMC程序,查找PMC程序中控制G130.0,G7.2,G8.5等的R地址,控制R地......