首页 > 其他分享 >spark context stop use with as

spark context stop use with as

时间:2023-05-30 22:06:21浏览次数:75  
标签:__ use exc self stop exit path spark def

调用方法:

with session.SparkStreamingSession('CC_Traffic_Realtime', ssc_time_windown) as ss_session:
        
        kafkaStreams = ss_session.get_direct_stream(TOPICNAME)
        kafkaStreams.transform(xxxx)...
        ss_session.ready_to_go()

  

实现方法:

import pickle
from io import BytesIO
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils


class SparkgSession(object):
    __APP_PATH = "hdfs://hacluster/tmp"

    def __init__(self, app_name):
        self._sc = SparkContext(appName=app_name)

    def __enter__(self):
        return self

    def __exit__(self, e_type, e_value, e_traceback):
        self._sc.stop()

    def _get_full_path(self, file_path):
        return "{}/{}".format(self.__APP_PATH, file_path)

    def load_model(self, pkl_file_path):
        full_path = self._get_full_path(pkl_file_path)
        rdd = self._sc.binaryFiles(full_path)
        return rdd.values().map(lambda p: pickle.load(BytesIO(p))).collect()[0]

    def load_text_file(self, file_path):
        # TODO, sec problem
        full_path = self._get_full_path(file_path)
        return self._sc.textFile(full_path).collect()


class SparkStreamingSession(SparkgSession):
    def __init__(self, app_name, ssc_time_window):
        super(SparkStreamingSession, self).__init__(app_name)
        self._ssc = StreamingContext(self._sc, ssc_time_window)

    def get_direct_stream(self, topic, brokers):
        # todo, REMOVE
        return KafkaUtils.createDirectStream(self._ssc, [topic], kafkaParams={"bootstrap.servers": brokers})

    def ready_to_go(self):
        self._ssc.start()
        self._ssc.awaitTermination()

    def __enter__(self):
        return self

    def __exit__(self, e_type, e_value, e_traceback):
        # TODO, sdk support
        print("Detect Error!Error: {}".format(e_type.__name__))
        print("Message: {}".format(e_value.message))
        print("File name: {}".format(e_traceback.tb_frame.f_code.co_filename))
        print("line number: {}".format(e_traceback.tb_lineno))
        self._ssc.stop()
        super(SparkStreamingSession, self).__exit__(e_type, e_value, e_traceback)

 

补充:

前言

with 语句适用于对资源进行访问的场合,确保不管使用过程中是否发生异常都会执行必要的“清理”操作,释放资源,比如文件使用后自动关闭/线程中锁的自动获取和释放等。

问题引出

如下代码:

file = open("1.txt")
data = file.read()
file.close()

上面代码存在2个问题: 
(1)文件读取发生异常,但没有进行任何处理; 
(2)可能忘记关闭文件句柄;

改进

try:
    f = open('xxx')
except:
    print('fail to open')
    exit(-1)
try:
    do something
except:
    do something
finally:
    f.close()

虽然这段代码运行良好,但比较冗长。 
而使用with的话,能够减少冗长,还能自动处理上下文环境产生的异常。如下面代码:

with open("1.txt") as file:
    data = file.read()

with 工作原理

(1)紧跟with后面的语句被求值后,返回对象的“–enter–()”方法被调用,这个方法的返回值将被赋值给as后面的变量; 
(2)当with后面的代码块全部被执行完之后,将调用前面返回对象的“–exit–()”方法。 
with工作原理代码示例:

class Sample:
    def __enter__(self):
        print "in __enter__"
        return "Foo"
    def __exit__(self, exc_type, exc_val, exc_tb):
        print "in __exit__"
def get_sample():
    return Sample()
with get_sample() as sample:
    print "Sample: ", sample

 

代码的运行结果如下:

in __enter__
Sample:  Foo
in __exit__

可以看到,整个运行过程如下: 
(1)enter()方法被执行; 
(2)enter()方法的返回值,在这个例子中是”Foo”,赋值给变量sample; 
(3)执行代码块,打印sample变量的值为”Foo”; 
(4)exit()方法被调用;

【注:】exit()方法中有3个参数, exc_type, exc_val, exc_tb,这些参数在异常处理中相当有用。 
exc_type: 错误的类型 
exc_val: 错误类型对应的值 
exc_tb: 代码中错误发生的位置 
示例代码:

class Sample():
    def __enter__(self):
        print('in enter')
        return self
    def __exit__(self, exc_type, exc_val, exc_tb):
        print "type: ", exc_type
        print "val: ", exc_val
        print "tb: ", exc_tb
    def do_something(self):
        bar = 1 / 0
        return bar + 10
with Sample() as sample:
    sample.do_something()

 

程序输出结果:

in enter
Traceback (most recent call last):
type:  <type 'exceptions.ZeroDivisionError'>
val:  integer division or modulo by zero
  File "/home/user/cltdevelop/Code/TF_Practice_2017_06_06/with_test.py", line 36, in <module>
tb:  <traceback object at 0x7f9e13fc6050>
    sample.do_something()
  File "/home/user/cltdevelop/Code/TF_Practice_2017_06_06/with_test.py", line 32, in do_something
    bar = 1 / 0
ZeroDivisionError: integer division or modulo by zero

Process finished with exit code 1

总结

实际上,在with后面的代码块抛出异常时,exit()方法被执行。开发库时,清理资源,关闭文件等操作,都可以放在exit()方法中。 
总之,with-as表达式极大的简化了每次写finally的工作,这对代码的优雅性是有极大帮助的。 
如果有多项,可以这样写:

With open('1.txt') as f1, open('2.txt') as  f2:
    do something

参考网址

http://blog.kissdata.com/2014/05/23/python-with.html

  

标签:__,use,exc,self,stop,exit,path,spark,def
From: https://blog.51cto.com/u_11908275/6382263

相关文章

  • spark rdd median 中位数求解
    lookup(key)ReturnthelistofvaluesintheRDDforkey key.ThisoperationisdoneefficientlyiftheRDDhasaknownpartitionerbyonlysearchingthepartitionthatthekeymapsto.>>>l=range(1000)>>>rdd=sc.parallelize(zip(l,l......
  • python spark 求解最大 最小 平均 中位数
    rating_data_raw=sc.textFile("%s/ml-100k/u.data"%PATH)printrating_data_raw.first()num_ratings=rating_data_raw.count()print"Ratings:%d"%num_ratings#In[35]:rating_data=rating_data_raw.map(lambdaline:line.split(&quo......
  • How to use the shell command to get the version of Linux Distributions All In On
    HowtousetheshellcommandtogettheversionofLinuxDistributionsAllInOne如何使用shell命令获取Linux发行版的版本hostnamectlcat/etc/os-releaselsb_release-aLinuxDistributionsDebianUbuntuRaspberryPiOShttps://en.wikipedia.org/wiki/L......
  • pyspark
    一、pyspark为了让Spark支持Python,ApacheSpark社区发布了一个工具PySpark。使用PySpark,我们可以使用Python编程语言处理RDD。这一切是由一个名为Py4j的库达到的。其架构如下所示。PySpark的优势之一是在开发中允许你直接调用Python的内置库和第三方库如果Spark是本地模式,可以直接......
  • 【Oracle】Clean all objects belong to particular the user but not using drop use
      #--WX:DBAJOE399--DEST_SCHEMA=Expected_user_namesqlplus/assysdba<<!EOFsetserveroutputonsetechooffsetfeedbackoffWHENEVERSQLERROREXIT1WHENEVEROSEEROREXIT1altersessionsetcurrent_schema=${DEST_SCHEMA};purgedba......
  • 【Oracle】Check size of datafiles and tempfile tablespaces used in CDB and PDB
       --WX:DBAJOE399--setline200pages999columnnamefora10columntablespace_namefora15column"MAXSIZE(GB)"format9,999,990.00column"ALLOC(GB)"format9,999,990.00column"USED(GB)"format9,999,990.00selec......
  • clickhouse学习资源
    ClickHouse是一个开源的列式数据库管理系统,最初由俄罗斯搜索引擎Yandex开发。它专为OLAP(联机分析处理)场景设计,可以快速处理大量数据。以下是一些ClickHouse学习资源:ClickHouse官方文档:https://clickhouse.tech/docs/zh/ClickHouse中文文档:https://clickhouse-docs-cn.......
  • nacos服务下线操作时报错:The Raft Group [naming_instance_metadata] did not find th
    【问题描述】caused:errCode:500,errMsg:dometadataoperationfailed;caused:com.alibaba.nacos.consistency.exception.ConsistencyException:TheRaftGroup[naming_instance_metadata]didnotfindtheLeadernode;caused:TheRaftGroup[naming_instance_metad......
  • flutter开发Nuget.exe not found, trying to download or use cached version解决方法
    问题:Nuget.exenotfound,tryingtodownloadorusecachedversion解决方法:首先确保VisualStudio安装,这个是flutter构建Window应用必须的,并且安装了对应的WindowsSDK,通过VisualStudioInstaller安装管理员身份运行cmd窗口,然后执行wingetinstallMicrosoft.NuGet安装NuG......
  • ByConity与主流开源OLAP引擎(Clickhouse、Doris、Presto)性能对比分析
    引言:随着数据量和数据复杂性的不断增加,越来越多的企业开始使用OLAP(联机分析处理)引擎来处理大规模数据并提供即时分析结果。在选择OLAP引擎时,性能是一个非常重要的因素。因此,本文将使用TPC-DS基准测试的99个查询语句来对比开源的ClickHouse、Doris、Presto以及ByConity这4个OLAP......