通过Python multiprocessing库的Pool进程池实现多进程并等待所有进程执行完毕的问题

问题描述:我在正在通过Python执行shell命令来调用本专业软件进行大量分析,每条命令执行完(即软件分析完成)后会自动生成一些文件,接着要读取这些文件,因为需要反复调用本专业软件的次数太多,而且本专业软件单独分析一次需要十几秒,所以此想要了并行的概念。

目的:我需要同时执行上百条shell命令,并在命令完全执行完毕后,接着读取本专业软件生成的结果文件,我先后使用了两种办法去解决问题,具体如下

方案一:仅使用subprocess库,代码如下,

import subprocess as sbp

processes = []
for ii in range(10):
    newfolder = 'test' + str(ii + 1)
    cc = sbp.Popen(cmd, cwd=newfolder, shell=True)
    processes.append(cc)

for jj in range(10):
    processes[jj].wait()

这段代码运行完毕会生成几个文件,后续代码的功能是读取这些文件,如果只是两三条命令同时执行,不会出问题,但当我试图同时执行10条命令时,运行后极少数情况会正常,大多时候会报错,报错的提示就是没有发现某某文件,打开失败,由此判断应该是某些文件夹下的cmd命令并没有执行完毕。请问朋友们该怎么处理?

方案二:使用subprocess库和multiprocessing库,代码如下,

import subprocess as sbp
import multiprocessing as mlp

def run_shell(number):
    sbp.Popen(cmd, cwd='test' + str(number), shell=True)

numlist = [ii + 1 for ii in range(100)]
pool = mlp.Pool(100)
pool.map(run_shell, numlist)
pool.close()
pool.join()

类似的,执行过程也会因为没有生成某些文件打开失败而中断,经过学习,我了解到受计算机CPU数的限制,例如CPU数为4,则程序会创建4个进程,待其中之一执行完毕后才会执行第5条命令,请问大家如何保证这100个进程全部运行结束再执行后续代码?

此外,将这两种方案导入time模块进行计时,发现方案一输出的时间包含了本专业软件运行消耗的时间,而方案二输出的时间似乎只是创建进程消耗的时间,如何使方案二把调用本专业软件进行分析的用时也涵盖上呢?

看例子,用队列来实现,监测每个子进程的执行完成状态,子进程执行完成后入队:
import subprocess as sp
import multiprocessing as mp
import time

def run_exe(q, code):
    s1 = time.time()
    p = sp.Popen('test.py ' + str(code),shell = True)
    while 1:
        if p.poll() == 0:
            q.put((p.pid, code, time.time() - s1))
            break

if __name__ == "__main__":    
    jobs = []
    qq = mp.Queue()
    for i in range(20):
        pp = mp.Process(target = run_exe, args = (qq,i ))
        jobs.append(pp)
        pp.start()
        
    for s in jobs:
        s.join()

    for _ in jobs:
        cur = qq.get()
        print(cur)
        #这里能获得子进程id及对应的传入参数及运行时间
--result
(6784, 2, 6.593695163726807)
(14944, 0, 8.390507698059082)
(2144, 1, 9.3748459815979)
(11824, 5, 10.656054019927979)
(4360, 11, 9.8748300075531)
(13596, 4, 11.034750699996948)
(15308, 6, 12.576527833938599)
(13056, 8, 12.155471563339233)
(13684, 3, 11.481432437896729)
(15328, 14, 12.865367650985718)
(15092, 7, 13.527697086334229)
(5640, 9, 13.480824947357178)
(6776, 12, 13.168334484100342)
(15508, 17, 13.965184450149536)
(6420, 15, 13.152710437774658)
(14260, 10, 13.605820655822754)
(15896, 16, 12.652727127075195)
(3400, 13, 12.980840921401978)
(8112, 18, 12.730849266052246)
(13944, 19, 13.215219497680664)



#我的test.py程序是被调用程序
dirname = r'C:\Users\Administrator\Desktop\stss'

import sys 

args = sys.argv
code=int(sys.argv[1])
a = []
for i in range(100):
    for j in range(code*1000):
        a.append(i*j)
with open(dirname+'/tt'+str(code)+'.txt',mode='w',encoding='utf-8')    as f:    
    print(a,file =f )

from time import sleep
from random import random
from multiprocessing import Pool
 
# task to execute in a new process
def task(value):
    # generate a random value
    random_value = random()
    # block for moment
    sleep(random_value)
    # return a value
    return (value, random_value)
 
# protect the entry point
if __name__ == '__main__':
    # create the process pool
    with Pool() as pool:
        # issue tasks and process results
        for result in pool.map(task, range(10)):
            print(f'>got {result}')

类似该示例,利用map()函数,对于范围为 0 到 9 的每个参数作用于task函数

运行后,我们可以看到所有任务都被发送到进程池,完成,然后一旦所有结果可用,主进程将迭代返回值。
你可以将所有shell命令放到一个列表通过pool.map去执行

参考链接,希望有帮助哦

python守护进程进程池_Python多进程库multiprocessing创建进程以及进程池Pool类的使用..._weixin_39824529的博客-CSDN博客 问题起因最近要将一个文本分割成好几个topic,每个topic设计一个regressor,各regressor是相互独立的,最后汇总所有topic的regressor得到总得预测结果。没错!类似bagging ensemble!只是我没有抽样。文本不大,大概3000行,topic个数为8,于是我写了一个串行的程序,一个topic算完之后再算另一个topic。可是我在每个topic中用了GridSe... https://blog.csdn.net/weixin_39824529/article/details/111063155?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522166814237416800180685071%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fblog.%2522%257D&request_id=166814237416800180685071&biz_id=0&utm_medium=distribute.pc_search_result.none-task-blog-2~blog~first_rank_ecpm_v1~rank_v31_ecpm-1-111063155-null-null.nonecase&utm_term=%E9%80%9A%E8%BF%87Python%20multiprocessing%E5%BA%93%E7%9A%84Pool%E8%BF%9B%E7%A8%8B%E6%B1%A0%E5%AE%9E%E7%8E%B0%E5%A4%9A%E8%BF%9B%E7%A8%8B%E5%B9%B6%E7%AD%89%E5%BE%85%E6%89%80%E6%9C%89%E8%BF%9B%E7%A8%8B%E6%89%A7%E8%A1%8C%E5%AE%8C%E6%AF%95%E7%9A%84%E9%97%AE%E9%A2%98&spm=1018.2226.3001.4450


 
看例子,用队列来实现,监测每个子进程的执行完成状态,子进程执行完成后入队:
from time import sleep
from random import random
from multiprocessing import Pool
# task to execute in a new process
def task(value):
    # generate a random value
    random_value = random()
    # block for moment
    sleep(random_value)
    # return a value
    return (value, random_value)
# protect the entry point
if __name__ == '__main__':
    # create the process pool
    with Pool() as pool:
        # issue tasks and process results
        for result in pool.map(task, range(10)):
            print(f'>got {result}')