python+mysql——高效的数据处理方式

使用场景

大规模数据处理;

多个任务可以并发执行;

需要保存结果;

为实现以上三个要求,就需要充分利用服务器中的多核资源,让程序高效并发执行,并采用一定的方式保存结果。

用到的工具和方法

采用多进程 + mysql + BaseManager

  1. 由于pythonGIL全局锁,如果利用多线程无法充分利用多核资源,因此使用多进程进,充分利用多核资源,榨干机器的性能! 并且在实际应用中需要给每个进程传参,以处理对应的子任务,所以使用apply/apply_async这个多进程函数。 另外多进程可以将进程分步到多台机器上跑,但是数据处理任务应该用不到,hhh..

  2. 由于在开发中经常遇到需要任务返回结果的情况,所以需要对程序执行结果进行记录。为了提高记录的效率,采用mysql进行数据保存,构建属于自己的数据库。

  3. 由于每个进程有独立的资源,不同进程间共享资源非常麻烦,为了可以操作共享对象,进行数据库写入等一些操作,采用 BaseManager 进行共享对象的管理。

开启多进程

BaseManager管理器提供了一种创建共享数据的方法,可以在不同进程中共享,甚至可以通过网络跨机器共享数据。管理器维护一个用于管理共享对象的服务,这个服务也是一个子进程。其他进程可以通过代理访问这些共享对象。

之所以选择BaseManager管理共享对象,因为这种方式可以很简单的将一些操作打包成一个类,然后将这个类作为对象进行共享。

from multiprocessing import Lock,cpu_count,Pool
from multiprocessing.managers import BaseManager

class cls_name_1():
    ...
class cls_name_2():
    ...

def fun(my_var_1,my_var_2,i1,i2,i3,..):
    ...


if __name__ == "__main__":
    # register可以将一个类型或者可调用对象注册到管理器类。
    # 第一个参数是 "类型标识符",用于唯一表示某种共享对象,必须是一个字符串。
    # 第二个参数是一个可调用对象,用来为类型标识符创建对象。这个参数就是我们实际想共享的对象
    BaseManager.register("name_1",cls_name_1)
    BaseManager.register("name_2",cls_name_2)
    # 创建一个 BaseManager 对象。
    # 一旦创建,应该及时调用start()以确保管理器对象对应的管理进程已经启动。
    m = BaseManager() 
    m.start()

    my_var_1 = m.name_1() # 生成共享对象的实例
    my_var_2 = m.name_2()

    p = Pool(cpu_count()) # 返回系统的 CPU 数量
    for i1,i2,i3... in params:
        r = p.apply_async(fun,args=(my_var_1,my_var_2,i1,i2,i3,..))
    p.close()
    p.join()

定义好进程启动以及共享对象后,就可以将共享对象和执行每个子任务所需要的参数传递到执行函数fun里面,开始之后便开始并发执行。

fun函数编写要点:

  1. fun里面的函数要正确编写,否则进程无法执行所有的代码,遇到出错的地方便停止执行,并没有任何错误输出;
  2. 可以添加try..except..捕获错误,然后将错误print到屏幕。但是有的机器会打印错误,有的则不会打印错误。也是很迷_
  3. 在对共享对象使用时要注意加锁;
  4. 当使用了无法共享的模型变量或者其他变量的时候,pool子进程不会执行,不会报错。但是可以使用apply和try..except进行强制报错。采用multiprocessing.managers.BaseManager共享模型变量。

mysql编写要点

在这里用的是pymysql,安装方式:pip install pymsql。关于mysql我会在后面出一篇文章。现在主要说一下mysql编程时的一些要点,避免踩坑~

  1. 在使用%s填充sql语句时,对于表名,字段名,要加上 ` 这个符号,波浪线的英文键;
  2. 在用%s填充sql语句的某个字段的值的时候,要加单引号 ' 将 %s 包起来;

其他数据共享方式

共享列表字典变量,其操作跟正常的列表和字典的操作一样。

res_dict = multiprocessing.Manager().dict()
y_true = multiprocessing.Manager().list()

# 将这些变量通过传参方式传递给子进程
p.apply_async(fun,args=(res_dict,y_true)

变量共享

 num=multiprocessing.Value("d",10.0) 
 # d表示double类型,主进程与子进程共享这个value。(主进程与子进程都是用的同一个value)  

 # 子进程的使用方式
  num.value=119 # 重新赋值为火警电话 

以上就是在大规模处理数据时使用的一种快速处理数据的方法。
期待与各位交流~