科学计算, 生活

使用Python脚本文件实现半手动的并行计算

这是之前的两篇:

其中一篇是使用 sh 脚本来实现这个功能,但大部分同学(包括自己)对 sh 脚本并不熟悉,本篇给出 Python 脚本实现版手动的并行计算例子。虽然在 Python 中有 multiprocessing、threading 等软件包可以实现多进程多线程并行计算,但由于是自动分配任务,使用起来总感觉不是特别放心,且在原有代码的基础上需要增加的代码比较多,可能不小心会破坏原有代码产生 bug。因此,个人比较倾向于半手动的并行计算。任务提交的命令参考这篇:超算中作业管理系统PBS/LSF/Slurm的常用命令

一、使用 Guan 软件包

Guan 软件包的网址:https://py.guanjihuan.com

安装方法:

pip install --upgrade guan

需要说明的是:这里最好大致看下调用的函数源码,然后再使用,可避免不必要的错误。

1. PBS 作业管理系统

并行提交任务(qsub_parallel_with_guan):

import guan

parameter_array = [1, 2, 3, 4]

guan.make_sh_file_for_qsub(sh_filename='a', command_line='python a.py', cpu_num=1, task_name='task', cd_dir=0)

guan.copy_py_sh_file_and_qsub_task(parameter_array=parameter_array, py_filename='a', old_str_in_py='parameter=0', new_str_in_py='parameter=', sh_filename='a', qsub_task_name='task')

代码文件a.py示例:

parameter=0
print(f'hello world {parameter}')

2. LSF 作业管理系统

并行提交任务(bsub_parallel_with_guan):

import guan

parameter_array = [1, 2, 3, 4]

guan.make_sh_file_for_bsub(sh_filename='a', command_line='python a.py', cpu_num=1, task_name='task', queue_name='score', cd_dir=0)

guan.copy_py_sh_file_and_bsub_task(parameter_array, py_filename='a', old_str_in_py='parameter=0', new_str_in_py='parameter=', sh_filename='a', bsub_task_name='task')

代码文件a.py示例:

parameter=0
print(f'hello world {parameter}')

二、自定义代码

这里以 PBS 作业管理系统为例。

通过运行以下Python脚本文件直接提交任务(python parallel.py),可进行自定义修改:

"""
This code is supported by the website: https://www.guanjihuan.com
The newest version of this code is on the web page: https://www.guanjihuan.com/archives/29200
"""

import os

parameter_str_array = ['np.arange(1, 11, 1)', 'np.arange(11, 21, 1)']

index = 0
for parameter_str in parameter_str_array:
    index += 1

    # 以下处理代码文件
    old_file = 'a.py'
    new_file = 'a'+str(index)+'.py'

    # 说明:linux系统下复制用cp,windows系统下复制用copy
    os.system('cp '+old_file+' '+new_file)  # 复制python代码文件
    with open(new_file, 'r') as f:  # 读取
        content  = f.read()
    
    old_str = 'parameter_array_labeled_for_replacement = []'
    new_str = 'parameter_array_labeled_for_replacement = ' + parameter_str
    content = content.replace(old_str, new_str)

    # 如果程序需要将数据写入文件,除了需要替代参数,还需要替代文件名,方法和以上相同

    with open('a'+str(index)+'.py', 'w') as f: # 写入
        f.write(content)



    # 以下处理任务上传文件
    old_file = 'a.sh'
    new_file = 'a'+str(index)+'.sh'

    os.system('cp '+old_file+' '+new_file)  # 复制任务调度系统的sh上传文件
    with open(new_file, 'r') as f:  # 读取
        content  = f.read()
    
    old_str = 'python a.py'
    new_str = 'python a'+str(index)+'.py'
    content = content.replace(old_str, new_str)

    old_str = 'task'
    new_str = 'task_'+str(index)
    content = content.replace(old_str, new_str)

    with open('a'+str(index)+'.sh', 'w') as f: # 写入
        f.write(content)



    # 提交任务
    os.system('qsub '+new_file)

代码文件a.py示例:

def run(parameter_array):
    for parameter in parameter_array:
        print('hello world'+' '+str(parameter))

parameter_array_labeled_for_replacement = []
run(parameter_array_labeled_for_replacement)

任务上传文件a.sh示例:

#!/bin/sh
#PBS -N task
#PBS -l nodes=1:ppn=1
python a.py

三、批量取消任务

如果提交 N 个任务后发现需要修改,要取消任务,那么可以使用以下这个命令, 对连续的任务号进行批量取消(例子中的任务号是从100到130,总共31个任务):

qdel $(seq 100 130)

另外,关于 CPU 数量的选取可以阅读这篇:超算提交任务时CPU数量的选取

525 次浏览

【说明:本站主要是个人的一些笔记和代码分享,内容可能会不定期修改。为了使全网显示的始终是最新版本,这里的文章未经同意请勿转载。引用请注明出处:https://www.guanjihuan.com

发表评论

您的邮箱地址不会被公开。 必填项已用 * 标注

Captcha Code