spark有四种部署方式:Local,Standalone,Spark on Mesos,Spark on yarn。第一个为单机模式,后三个为集群模式。
spark-shell支持python和scala,这里使用python。
1.启动pyspark环境
在spark安装目录下
./bin/pyspark
进入之后,如下图:
2.编写程序
新建代码文件WordCount.py,并编写程序
touch WordCount.py
vim WordCount.py
from pyspark import SparkConf, SparkContext
# 使用本地模式启动
conf = SparkConf().setMaster("local").setAppName("My App")
# 生成一个SparkContext对象
sc = SparkContext(conf=conf)
# 设置文件路径
logFile = "file:///opt/servers/spark/README.md"
# 读取README.md文件生成的RDD
logData = sc.textFile(logFile, 2).cache()
# 分别统计RDD元素中包含字母a和b的行数
numAS = logData.filter(lambda line: 'a' in line).count()
numBs = logData.filter(lambda line: 'b' in line).count()
# 打印输出结果
print('Lines with a: %s, Lines with b: %s' % (numAS, numBs))
运行代码:python3 WordCount.py
如果报如下错误:
python3 WordCount.py
Traceback (most recent call last):
File "WordCount.py", line 1, in <module>
from pyspark import SparkConf, SparkContext
ModuleNotFoundError: No module named 'pyspark'
说明没有pyspark模块。
进入python安装目录下的lib/site-packages目录下,使用pip下载安装pyspark,这里使用国内清华大学镜像网站。
pip install pyspark -i http://pypi.tuna.tsinghua.edu.cn/simple/ --trusted-host pypi.tuna.tsinghua.edu.cn
下载一个镜像,中间出了好几个问题,被我记录在
安装pyspark库成功后,重新运行代码,然后还是报错
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
/usr/local/python3/lib/python3.7/site-packages/pyspark/context.py:317: FutureWarning: Python 3.7 support is deprecated in Spark 3.4.
warnings.warn("Python 3.7 support is deprecated in Spark 3.4.", FutureWarning)
Traceback (most recent call last):
File "WordCount.py", line 11, in <module>
numAS = logData.filter(lambda line: 'a' in line).count()
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2297, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2273, in sum
0, operator.add
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 2025, in fold
vals = self.mapPartitions(func).collect()
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 1814, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 5442, in _jrdd
self.ctx, self.func, self._prev_jrdd_deserializer, self._jrdd_deserializer, profiler
File "/usr/local/python3/lib/python3.7/site-packages/pyspark/rdd.py", line 5250, in _wrap_function
sc._javaAccumulator,
TypeError: 'JavaPackage' object is not callable
是因为pyspark版本过高, 改成3.2.0版本的就可以了
pip3 install pyspark==3.2.0 -i http://pypi.tuna.tsinghua.edu.cn/simple/ --trusted-host pypi.tuna.tsinghua.edu.cn
Looking in indexes: http://pypi.tuna.tsinghua.edu.cn/simple/
再次运行代码,运行结果如下:
3.通过spark-submit运行程序
进入spark安装目录下的bin中
./spark-submit WordCound.py的绝对路径
省略了<master-url>参数,默认本地模式
运行结果如下 (部分截图):
在这个过程中产生了许多其他信息干扰,可以通过修改log4j的日志信息显示级别,来消除干扰信息。
进入spark安装目录下的配置文件夹conf
cp log4j2.properties.template log4j2.properties
我的是spark-3.3.3版本,日志文件名字为log4j2.properties.template,每个版本的日志文件名字不太一样,具体的,要按照自己安装的版本的日志文件来,复制日志文件。
编辑日志文件
vim log4j2.properties
把显示控制台的信息改为error,保存并退出。
再次使用spark-submit运行python文件,结果如下
已经没有其他信息干扰了。
标签:shell,pyspark,py,spark,local,line,python3 From: https://blog.csdn.net/m0_68131322/article/details/136895813