一、简介
这是其他几篇关于并行的文章:
本篇讲如何在Python中如何实现多个任务的并行(多进程)。在Python中,multiprocessing模块提供了一个Process类来代表一个进程对象,而在Fortran的OpenMP中是通过启动“线程”来实现并行。补充说明:在Python中,有threading模块可以实现多线程的并行,也是比较常用的,参考:常用的Python软件包。
multiprocessing文档:https://docs.python.org/3/library/multiprocessing.html
进程并行和线程并行是实现多任务并行常见的两种方法。打开电脑的任务管理器(底部栏右击),在应用程序旁边就是“进程”。运行本文的例子程序时,可以发现Python的multiprocessing会产生多个“进程”,实现多个任务的并行,截图如下:
而在运行Fortran的OpenMP例子时,单个进程占了近100%CPU(四个核,一个核为25%),也就是OpenMP通过产生多个“线程”,分给指定CPU运行,实现多任务的并行,截图如下:
二、测试例子
以下是整理的一个Python多任务并行(multiprocessing模块Process类)的测试例子:
"""
This code is supported by the website: https://www.guanjihuan.com
The newest version of this code is on the web page: https://www.guanjihuan.com/archives/4536
"""
import multiprocessing
import os
import time
def run_proc(name): # 要执行的代码
start_time = time.perf_counter()
time.sleep(2)
end_time = time.perf_counter()
print ('Process id running on %s = %s' % (name, os.getpid()), '; running time = %s' % (end_time-start_time))
if __name__ == '__main__':
# 串行
print('串行程序')
print('Process id = %s.' % os.getpid())
start_time = time.perf_counter()
run_proc('job1')
run_proc('job2')
run_proc('job3')
run_proc('job4')
end_time = time.perf_counter()
print('CPU执行时间(s)=', (end_time-start_time), '\n')
# 并行
print('并行程序')
print('Process id = %s.' % os.getpid())
start_time = time.perf_counter()
p1 = multiprocessing.Process(target=run_proc, args=('job1',))
p2 = multiprocessing.Process(target=run_proc, args=('job2',))
p3 = multiprocessing.Process(target=run_proc, args=('job3',))
p4 = multiprocessing.Process(target=run_proc, args=('job4',))
p1.start()
p2.start()
p3.start()
p4.start()
p1.join() # join()方法可以等待子进程结束后再继续往下运行
p2.join()
p3.join()
p4.join()
end_time = time.perf_counter()
print('运行时间(s)=', (end_time-start_time))
运行结果:
串行程序
Process id = 37156.
Process id running on job1 = 37156 ; running time = 2.0097683
Process id running on job2 = 37156 ; running time = 2.0110964
Process id running on job3 = 37156 ; running time = 2.0105541000000002Process id running on job4 = 37156 ; running time = 2.0046176000000004CPU执行时间(s)= 8.0375171
并行程序
Process id = 37156.
Process id running on job4 = 38820 ; running time = 2.0051369
Process id running on job3 = 21208 ; running time = 2.0048537
Process id running on job1 = 29340 ; running time = 2.0072364
Process id running on job2 = 34028 ; running time = 2.0072584
运行时间(s)= 2.1242947
Python运行速度会比Fortran慢,但熟悉后写起来挺方便的。在科学计算中,如果想用Python写代码,又想赶进度,除了手动操作并行外,还可以用上以上这种并行方法。尤其是当使用超算时,如果没有使用多任务并行,选择多个核的速度和选择一个核的运行的速度相差不多的,这时候如果仍然选择多个核(如12个核)进行运算,就很有可能会极大地浪费超算资源和科研经费。
三、使用字典传入参数
使用字典传入参数会更灵活些,不一定要按顺序传入参数,这样可以跳过一些默认的参数。
代码例子如下:
"""
This code is supported by the website: https://www.guanjihuan.com
The newest version of this code is on the web page: https://www.guanjihuan.com/archives/4536
"""
from multiprocessing import Process
import os
import time
def run_proc(name, a=0, b=-1): # 要执行的代码
start_time = time.perf_counter()
time.sleep(2)
end_time = time.perf_counter()
print ('Process id running on %s = %s' % (name, os.getpid()), f'; Values: a={a}, b={b}', '; running time = %s' % (end_time-start_time))
if __name__ == '__main__':
print('并行程序')
print('Process id = %s.' % os.getpid())
start_time = time.perf_counter()
p1 = Process(target=run_proc, kwargs={'name':'job1', 'a':10, 'b':100})
p2 = Process(target=run_proc, kwargs={'name':'job2', 'a':20})
p3 = Process(target=run_proc, kwargs={'name':'job3', 'b':300})
p4 = Process(target=run_proc, kwargs={'name':'job4'})
p1.start()
p2.start()
p3.start()
p4.start()
p1.join() # join()方法可以等待子进程结束后再继续往下运行
p2.join()
p3.join()
p4.join()
end_time = time.perf_counter()
print('运行时间(s)=', (end_time-start_time))
运行结果:
并行程序
Process id = 1440.
Process id running on job1 = 40524 ; Values: a=10, b=100 ; running time = 2.0056711000000003
Process id running on job2 = 16872 ; Values: a=20, b=-1 ; running time = 2.0023163999999998
Process id running on job3 = 38816 ; Values: a=0, b=300 ; running time = 2.0142491000000002
Process id running on job4 = 396 ; Values: a=0, b=-1 ; running time = 2.0081636
运行时间(s)= 2.1090617000000003
四、返回函数值
如果是想返回函数值,可通过共享内存Value或Array来实现,参考资料为[1]。这里给出一个例子:
"""
This code is supported by the website: https://www.guanjihuan.com
The newest version of this code is on the web page: https://www.guanjihuan.com/archives/4536
"""
import multiprocessing
def run_proc(name, a, num): # 要执行的代码
num.value = a
if __name__ == '__main__':
num1 = multiprocessing.Value('d', 0.0) # 共享内存
num2 = multiprocessing.Value('d', 0.0) # 共享内存
p1 = multiprocessing.Process(target=run_proc, args=('job1', 100, num1))
p2 = multiprocessing.Process(target=run_proc, args=('job2', 200, num2))
p1.start()
p2.start()
p1.join()
p2.join()
print(num1.value)
print(num2.value)
运行结果:
100.0
200.0
五、一个应用例子
multiprocessing的一个应用的例子如下(使用开源软件包:https://py.guanjihuan.com):
"""
This code is supported by the website: https://www.guanjihuan.com
The newest version of this code is on the web page: https://www.guanjihuan.com/archives/4536
"""
import multiprocessing
import os
import time
import numpy as np
import guan
def main(parameter_array, task_index):
print ('Process id = %s' % (os.getpid()))
result_array = []
for parameter in parameter_array:
result = parameter*2
result_array.append(result)
time.sleep(np.random.uniform(1,10))
guan.write_one_dimensional_data(parameter_array, result_array, filename='task_index='+str(task_index))
if __name__ == '__main__':
task_num = 4
parameter_array_all = np.arange(0, 17, 1)
start_time = time.perf_counter()
process_array = []
for task_index in range(task_num):
parameter_array = guan.preprocess_for_parallel_calculations(parameter_array_all, task_num, task_index)
process_array.append(multiprocessing.Process(target=main, args=(parameter_array, task_index)))
for process in process_array: # 运行子进程
process.start()
for process in process_array: # 等待子进程完成
process.join()
end_time = time.perf_counter()
print('运行时间=', (end_time-start_time))
# 合并数据
f = open('result.txt', 'w')
for task_index in range(task_num):
with open('task_index='+str(task_index)+'.txt', 'r') as f0:
text = f0.read()
f.write(text)
f.close()
运行结果:
Process id = 5356
Process id = 36908
Process id = 39844
Process id = 34404
运行时间= 9.2735009
参考资料:
[1] 多进程
[2] multiprocessing --- 基于进程的并行
[3] 使用OMP_NUM_THREADS=1进行Python多处理
【说明:本站主要是个人的一些笔记和代码分享,内容可能会不定期修改。为了使全网显示的始终是最新版本,这里的文章未经同意请勿转载。引用请注明出处:https://www.guanjihuan.com】
你好,你这个函数是直接打印结果的。如果我的函数是要返回一个结果,那么并行要怎么获取函数值
可通过共享内存Value来实现。我在博文最后更新了内容,可以参考下。参考资料为[2],在网页中搜索“共享内存”,有对应的内容。