python+mysql——高效的数据处理方式
使用场景
大规模数据处理;
多个任务可以并发执行;
需要保存结果;
为实现以上三个要求,就需要充分利用服务器中的多核资源,让程序高效并发执行,并采用一定的方式保存结果。
用到的工具和方法
采用多进程 + mysql + BaseManager
-
由于pythonGIL全局锁,如果利用多线程无法充分利用多核资源,因此使用多进程进,充分利用多核资源,榨干机器的性能! 并且在实际应用中需要给每个进程传参,以处理对应的子任务,所以使用apply/apply_async这个多进程函数。 另外多进程可以将进程分步到多台机器上跑,但是数据处理任务应该用不到,hhh..
-
由于在开发中经常遇到需要任务返回结果的情况,所以需要对程序执行结果进行记录。为了提高记录的效率,采用mysql进行数据保存,构建属于自己的数据库。
-
由于每个进程有独立的资源,不同进程间共享资源非常麻烦,为了可以操作共享对象,进行数据库写入等一些操作,采用 BaseManager 进行共享对象的管理。
开启多进程
BaseManager管理器提供了一种创建共享数据的方法,可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理共享对象的服务,这个服务也是一个子进程。其他进程可以通过代理访问这些共享对象。
之所以选择BaseManager管理共享对象,因为这种方式可以很简单的将一些操作打包成一个类,然后将这个类作为对象进行共享。
from multiprocessing import Lock,cpu_count,Pool
from multiprocessing.managers import BaseManager
class cls_name_1():
...
class cls_name_2():
...
def fun(my_var_1,my_var_2,i1,i2,i3,..):
...
if __name__ == "__main__":
# register可以将一个类型或者可调用对象注册到管理器类。
# 第一个参数是 "类型标识符",用于唯一表示某种共享对象,必须是一个字符串。
# 第二个参数是一个可调用对象,用来为类型标识符创建对象。这个参数就是我们实际想共享的对象
BaseManager.register("name_1",cls_name_1)
BaseManager.register("name_2",cls_name_2)
# 创建一个 BaseManager 对象。
# 一旦创建,应该及时调用start()以确保管理器对象对应的管理进程已经启动。
m = BaseManager()
m.start()
my_var_1 = m.name_1() # 生成共享对象的实例
my_var_2 = m.name_2()
p = Pool(cpu_count()) # 返回系统的 CPU 数量
for i1,i2,i3... in params:
r = p.apply_async(fun,args=(my_var_1,my_var_2,i1,i2,i3,..))
p.close()
p.join()
定义好进程启动以及共享对象后,就可以将共享对象和执行每个子任务所需要的参数传递到执行函数fun
里面,开始之后便开始并发执行。
fun函数编写要点:
- fun里面的函数要正确编写,否则进程无法执行所有的代码,遇到出错的地方便停止执行,并没有任何错误输出;
- 可以添加
try..except..
捕获错误,然后将错误print到屏幕。但是有的机器会打印错误,有的则不会打印错误。也是很迷_ - 在对共享对象使用时要注意加锁;
- 当使用了无法共享的模型变量或者其他变量的时候,pool子进程不会执行,不会报错。但是可以使用apply和try..except进行强制报错。采用
multiprocessing.managers.BaseManager
共享模型变量。
mysql编写要点
在这里用的是pymysql,安装方式:pip install pymsql
。关于mysql我会在后面出一篇文章。现在主要说一下mysql编程时的一些要点,避免踩坑~
- 在使用%s填充sql语句时,对于表名,字段名,要加上 ` 这个符号,波浪线的英文键;
- 在用%s填充sql语句的某个字段的值的时候,要加单引号 ' 将 %s 包起来;
其他数据共享方式
共享列表字典变量,其操作跟正常的列表和字典的操作一样。
res_dict = multiprocessing.Manager().dict()
y_true = multiprocessing.Manager().list()
# 将这些变量通过传参方式传递给子进程
p.apply_async(fun,args=(res_dict,y_true)
变量共享
num=multiprocessing.Value("d",10.0)
# d表示double类型,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value)
# 子进程的使用方式
num.value=119 # 重新赋值为火警电话
以上就是在大规模处理数据时使用的一种快速处理数据的方法。
期待与各位交流~