Python专题, 语言

在Python中使用multiprocessing实现多任务并行

一、简介

这是其他几篇关于并行的文章:

本篇讲如何在Python中如何实现多个任务的并行(多进程)。在Python中,multiprocessing模块提供了一个Process类来代表一个进程对象,而在Fortran的OpenMP中是通过启动“线程”来实现并行。补充说明:在Python中,有threading模块可以实现多线程的并行,也是比较常用的,参考:常用的Python软件包

multiprocessing文档:https://docs.python.org/3/library/multiprocessing.html

进程并行和线程并行是实现多任务并行常见的两种方法。打开电脑的任务管理器(底部栏右击),在应用程序旁边就是“进程”。运行本文的例子程序时,可以发现Python的multiprocessing会产生多个“进程”,实现多个任务的并行,截图如下:

而在运行Fortran的OpenMP例子时,单个进程占了近100%CPU(四个核,一个核为25%),也就是OpenMP通过产生多个“线程”,分给指定CPU运行,实现多任务的并行,截图如下:

二、测试例子

以下是整理的一个Python多任务并行(multiprocessing模块Process类)的测试例子:

"""
This code is supported by the website: https://www.guanjihuan.com
The newest version of this code is on the web page: https://www.guanjihuan.com/archives/4536
"""

import multiprocessing
import os
import time

def run_proc(name): # 要执行的代码
    start_time = time.perf_counter()
    time.sleep(2)
    end_time = time.perf_counter()
    print ('Process id running on %s = %s' % (name, os.getpid()), '; running time = %s' % (end_time-start_time))


if __name__ == '__main__':

    # 串行
    print('串行程序')
    print('Process id = %s.' % os.getpid())
    start_time = time.perf_counter()
    run_proc('job1')
    run_proc('job2')
    run_proc('job3')
    run_proc('job4')
    end_time = time.perf_counter()
    print('CPU执行时间(s)=', (end_time-start_time), '\n')

    # 并行
    print('并行程序')
    print('Process id = %s.' % os.getpid())
    start_time = time.perf_counter()
    p1 = multiprocessing.Process(target=run_proc, args=('job1',))
    p2 = multiprocessing.Process(target=run_proc, args=('job2',))
    p3 = multiprocessing.Process(target=run_proc, args=('job3',))
    p4 = multiprocessing.Process(target=run_proc, args=('job4',))
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p1.join()  # join()方法可以等待子进程结束后再继续往下运行
    p2.join()  
    p3.join()  
    p4.join()
    end_time = time.perf_counter()
    print('运行时间(s)=', (end_time-start_time))

运行结果:

串行程序
Process id = 37156.
Process id running on job1 = 37156 ; running time = 2.0097683
Process id running on job2 = 37156 ; running time = 2.0110964
Process id running on job3 = 37156 ; running time = 2.0105541000000002Process id running on job4 = 37156 ; running time = 2.0046176000000004CPU执行时间(s)= 8.0375171

并行程序
Process id = 37156.
Process id running on job4 = 38820 ; running time = 2.0051369
Process id running on job3 = 21208 ; running time = 2.0048537
Process id running on job1 = 29340 ; running time = 2.0072364
Process id running on job2 = 34028 ; running time = 2.0072584
运行时间(s)= 2.1242947

Python运行速度会比Fortran慢,但熟悉后写起来挺方便的。在科学计算中,如果想用Python写代码,又想赶进度,除了手动操作并行外,还可以用上以上这种并行方法。尤其是当使用超算时,如果没有使用多任务并行,选择多个核的速度和选择一个核的运行的速度相差不多的,这时候如果仍然选择多个核(如12个核)进行运算,就很有可能会极大地浪费超算资源和科研经费。

三、使用字典传入参数

使用字典传入参数会更灵活些,不一定要按顺序传入参数,这样可以跳过一些默认的参数。

代码例子如下:

"""
This code is supported by the website: https://www.guanjihuan.com
The newest version of this code is on the web page: https://www.guanjihuan.com/archives/4536
"""

from multiprocessing import Process
import os
import time

def run_proc(name, a=0, b=-1): # 要执行的代码
    start_time = time.perf_counter()
    time.sleep(2)
    end_time = time.perf_counter()
    print ('Process id running on %s = %s' % (name, os.getpid()), f'; Values: a={a}, b={b}', '; running time = %s' % (end_time-start_time))


if __name__ == '__main__':

    print('并行程序')
    print('Process id = %s.' % os.getpid())
    start_time = time.perf_counter()
    p1 = Process(target=run_proc, kwargs={'name':'job1', 'a':10, 'b':100})
    p2 = Process(target=run_proc, kwargs={'name':'job2', 'a':20})
    p3 = Process(target=run_proc, kwargs={'name':'job3', 'b':300})
    p4 = Process(target=run_proc, kwargs={'name':'job4'})
    p1.start()
    p2.start()
    p3.start()
    p4.start()
    p1.join()  # join()方法可以等待子进程结束后再继续往下运行
    p2.join()  
    p3.join()  
    p4.join()
    end_time = time.perf_counter()
    print('运行时间(s)=', (end_time-start_time))

运行结果:

并行程序
Process id = 1440.
Process id running on job1 = 40524 ; Values: a=10, b=100 ; running time = 2.0056711000000003
Process id running on job2 = 16872 ; Values: a=20, b=-1 ; running time = 2.0023163999999998
Process id running on job3 = 38816 ; Values: a=0, b=300 ; running time = 2.0142491000000002
Process id running on job4 = 396 ; Values: a=0, b=-1 ; running time = 2.0081636
运行时间(s)= 2.1090617000000003

四、返回函数值

如果是想返回函数值,可通过共享内存Value或Array来实现,参考资料为[1]。这里给出一个例子:

"""
This code is supported by the website: https://www.guanjihuan.com
The newest version of this code is on the web page: https://www.guanjihuan.com/archives/4536
"""

import multiprocessing

def run_proc(name, a, num): # 要执行的代码
    num.value = a

if __name__ == '__main__':
    num1 = multiprocessing.Value('d', 0.0)  # 共享内存
    num2 = multiprocessing.Value('d', 0.0)  # 共享内存
    p1 = multiprocessing.Process(target=run_proc, args=('job1', 100, num1))
    p2 = multiprocessing.Process(target=run_proc, args=('job2', 200, num2))
    p1.start()
    p2.start()
    p1.join()
    p2.join() 
    print(num1.value)
    print(num2.value)

运行结果:

100.0
200.0

五、一个应用例子

multiprocessing的一个应用的例子如下(使用开源软件包:https://py.guanjihuan.com):

"""
This code is supported by the website: https://www.guanjihuan.com
The newest version of this code is on the web page: https://www.guanjihuan.com/archives/4536
"""

import multiprocessing
import os
import time
import numpy as np
import guan

def main(parameter_array, task_index):
    print ('Process id = %s' % (os.getpid()))
    result_array = []
    for parameter in parameter_array:
        result = parameter*2
        result_array.append(result)
    time.sleep(np.random.uniform(1,10))
    guan.write_one_dimensional_data(parameter_array, result_array, filename='task_index='+str(task_index))

if __name__ == '__main__':
    task_num = 4
    parameter_array_all = np.arange(0, 17, 1) 
    start_time = time.perf_counter()
    process_array = []
    for task_index in range(task_num):
        parameter_array = guan.preprocess_for_parallel_calculations(parameter_array_all, task_num, task_index)
        process_array.append(multiprocessing.Process(target=main, args=(parameter_array, task_index)))
    for process in process_array: # 运行子进程
        process.start()
    for process in process_array: # 等待子进程完成
        process.join() 
    end_time = time.perf_counter()
    print('运行时间=', (end_time-start_time))
    # 合并数据
    f = open('result.txt', 'w')
    for task_index in range(task_num):
        with open('task_index='+str(task_index)+'.txt', 'r') as f0:
            text = f0.read()
        f.write(text)
    f.close()

运行结果:

Process id = 5356
Process id = 36908
Process id = 39844
Process id = 34404
运行时间= 9.2735009

参考资料:

[1] 多进程

[2] multiprocessing --- 基于进程的并行

[3] 使用OMP_NUM_THREADS=1进行Python多处理

4,486 次浏览

【说明:本站主要是个人的一些笔记和代码分享,内容可能会不定期修改。为了使全网显示的始终是最新版本,这里的文章未经同意请勿转载。引用请注明出处:https://www.guanjihuan.com

2 thoughts on “在Python中使用multiprocessing实现多任务并行”

  1. 你好,你这个函数是直接打印结果的。如果我的函数是要返回一个结果,那么并行要怎么获取函数值

    1. 可通过共享内存Value来实现。我在博文最后更新了内容,可以参考下。参考资料为[2],在网页中搜索“共享内存”,有对应的内容。

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

Captcha Code