python3.10 linux 每个任务用这个multiprocessing.Process 起一个进程
class Process(multiprocessing.Process):
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception = None
def run(self):
try:
multiprocessing.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
def execute_task(task_func, params, task_id, timeout=None):
"""
在新进程中执行任务
"""
try:
# parent_conn, child_conn = multiprocessing.Pipe(True)
child_process = Process(target=task_func, args=(params,))
child_process.start()
child_process.join(timeout)
if child_process.exception:
error, traceback = child_process.exception
msg = f'{task_id} {child_process.pid} error, info: {error}'
code = CallBackStatus.error
else:
if child_process.is_alive():
msg = f'{task_id} {child_process.pid} timeout !!!'
code = CallBackStatus.error
child_process.terminate()
else:
msg = f'{task_id} {child_process.pid} success'
code = CallBackStatus.finish
except Exception as e:
msg = f'{task_id} error, info: {e}'
code = CallBackStatus.error
return code, msg
进程内部用 ProcessPoolExecutor 进程池 forkserver 方式起多个进程干活
context = multiprocessing.get_context('forkserver')
with ProcessPoolExecutor(max_workers=max_workers, mp_context=context) as executor:
每次任务结束后会留下两个僵尸进程
在你的代码中,产生僵尸进程的问题很可能是因为在每次任务结束后,你没有及时对子进程进行适当的回收操作。为了解决这个问题,你可以在主进程中使用 multiprocessing.Process.join() 方法来等待子进程结束并回收资源。同时,使用 atexit 模块来确保在程序退出时,所有子进程都得到正确的回收。
以下是修改后的代码示例,针对每次任务结束进行子进程回收操作:
python
Copy code
import multiprocessing
import traceback
from concurrent.futures import ProcessPoolExecutor
class Process(multiprocessing.Process):
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception = None
def run(self):
try:
multiprocessing.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
def execute_task(task_func, params, task_id, timeout=None):
try:
child_process = Process(target=task_func, args=(params,))
child_process.start()
child_process.join(timeout) import multiprocessing
import traceback
from concurrent.futures import ProcessPoolExecutor
class Process(multiprocessing.Process):
def __init__(self, *args, **kwargs):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception = None
def run(self):
try:
multiprocessing.Process.run(self)
self._cconn.send(None)
except Exception as e:
tb = traceback.format_exc()
self._cconn.send((e, tb))
@property
def exception(self):
if self._pconn.poll():
self._exception = self._pconn.recv()
return self._exception
def execute_task(task_func, params, task_id, timeout=None):
try:
child
# 等待子进程结束
if child_process.exception:
error, traceback = child_process.exception
msg = f'{task_id} {child_process.pid} error, info: {error}'
code = CallBackStatus.error
else:
if child_process.is_alive():
msg = f'{task_id} {child_process.pid} timeout !!!'
code = CallBackStatus.error
child_process.terminate()
else:
msg = f'{task_id} {child_process.pid} success'
code = CallBackStatus.finish
except Exception as e:
msg = f'{task_id} error, info: {e}'
code = CallBackStatus.error
return code, msg
def main():
# ... 其他代码 ...
if __name__ == '__main__':
import atexit
# 在程序退出时回收子进程资源
atexit.register(multiprocessing.get_start_method().spawn.__self__.terminate)
main()
通过上述修改,你可以确保在每次任务结束后,使用 join() 方法等待子进程的结束,并在程序退出时通过 atexit 模块来进行资源的回收,从而避免产生僵尸进程。
如果超时发生,确保再次调用join()
来清理进程。
用os.waitpid()
来等待子进程并获取其退出状态。
atexit确保在程序退出时执行某些清理操作。
你的代码中每次任务结束后留下两个僵尸进程的问题可能是由于同时使用了multiprocessing.Process和ProcessPoolExecutor导致的。
当你使用multiprocessing.Process创建一个Process对象时,它会为该特定任务启动一个新的进程。然而,当你使用带有forkserver上下文的ProcessPoolExecutor时,它会创建一个处理多个任务的工作进程池。
在你的代码中,你为每个任务创建了一个Process对象并单独启动它,但同时又使用ProcessPoolExecutor来管理一个工作进程池。这可能会导致冲突和意外行为,包括僵尸进程的产生。
为了解决这个问题,你应该选择一种处理进程的方法:要么使用multiprocessing.Process,要么使用ProcessPoolExecutor。以下是修改代码以仅使用ProcessPoolExecutor的示例:
from concurrent.futures import ProcessPoolExecutor
def execute_task(task_func, params, task_id, timeout=None):
try:
with ProcessPoolExecutor() as executor:
future = executor.submit(task_func, params)
result = future.result(timeout=timeout)
msg = f'{task_id} success'
code = CallBackStatus.finish
except Exception as e:
msg = f'{task_id} error, info: {e}'
code = CallBackStatus.error
# Handle the result and return necessary information
通过仅使用ProcessPoolExecutor,你可以避免不同进程管理方法之间的冲突,并防止产生僵尸进程。
【以下回答由 GPT 生成】
这段代码中的问题是关于Python多进程产生僵尸进程的。在execute_task
函数中,以自定义的Process
类的形式创建了一个子进程,并通过管道进行通信。但是,由于没有调用child_process.join()
来等待子进程的结束,可能会导致子进程变成僵尸进程。
为了解决这个问题,可以在execute_task
函数的末尾添加child_process.join()
,以确保子进程的正常退出。具体代码修改如下:
def execute_task(task_func, params, task_id, timeout=None):
"""
Executes a task in a new process
"""
try:
child_process = Process(target=task_func, args=(params,))
child_process.start()
child_process.join(timeout)
if child_process.exception:
error, traceback = child_process.exception
msg = f'{task_id} {child_process.pid} error, info: {error}'
code = CallBackStatus.error
else:
if child_process.is_alive():
msg = f'{task_id} {child_process.pid} timeout !!!'
code = CallBackStatus.error
child_process.terminate()
else:
msg = f'{task_id} {child_process.pid} success'
code = CallBackStatus.finish
child_process.join() # 等待子进程结束
except Exception as e:
msg = f'{task_id} error, info: {e}'
code = CallBackStatus.error
return code, msg
这样修改之后,就能确保子进程在完成任务后正常退出,防止产生僵尸进程。
【相关推荐】
任务结束前查一下僵尸进程杀一下
参考结合GPT4.0、文心一言,如有帮助,恭请采纳。
你可以修改 execute_task 函数来处理这个信息,这样的话,如果子进程发送了异常信息,那么父进程就会抛出异常,这样就可以在调用 execute_task 函数的地方捕获这个异常并进行处理
def execute_task(task_func, params, task_id, timeout=None):
"""
在新进程中执行任务
"""
try:
child_process = Process(target=task_func, args=(params,))
child_process.start()
child_process.join(timeout)
if child_process.exception:
error, traceback = child_process.exception
raise Exception(f'{task_id} {child_process.pid} error: {error}')
else:
if child_process.is_alive():
child_process.terminate()
raise Exception(f'{task_id} {child_process.pid} timeout !!!')
else:
return f'{task_id} {child_process.pid} success'
except Exception as e:
return f'{task_id} error: {e}'
def execute_task(task_func, params, task_id, timeout=None):
"""
Executes a task in a new process
"""
try:
child_process = Process(target=task_func, args=(params,))
child_process.start()
child_process.join(timeout)
if child_process.exception:
error, traceback = child_process.exception
msg = f'{task_id} {child_process.pid} error, info: {error}'
code = CallBackStatus.error
else:
if child_process.is_alive():
msg = f'{task_id} {child_process.pid} timeout !!!'
code = CallBackStatus.error
child_process.terminate()
else:
msg = f'{task_id} {child_process.pid} success'
code = CallBackStatus.finish
child_process.join() # 等待子进程结束
except Exception as e:
msg = f'{task_id} error, info: {e}'
code = CallBackStatus.error
return code, msg
在每个任务结束后,你可以显式地调用join()方法来等待进程结束,并释放相关的资源。这可能会确保没有僵尸进程被留下。
child_process = Process(target=task_func, args=(params,))
child_process.start()
child_process.join(timeout)
child_process.terminate() # 显式地终止进程
每次任务结束后留下两个僵尸进程的问题可能是由于进程池的资源管理不当导致的。
forkserver是一种启动子进程的方式,它使用了一个专门的服务器进程来管理子进程的创建和销毁。当一个子进程完成任务后,它会被回收并返回给进程池以供下次使用。然而,有时候由于资源管理的问题,子进程可能没有被正确回收,从而导致僵尸进程的产生。
要解决这个问题,你可以尝试以下几个方法:
context = multiprocessing.get_context('spawn') # 或者使用 'fork'
with ProcessPoolExecutor(max_workers=max_workers, mp_context=context) as executor:
# 执行任务
在每次任务完成后,调用executor.shutdown(wait=True)来正确关闭进程池并等待所有子进程完成。这将确保所有子进程都被正确回收。
with ProcessPoolExecutor(max_workers=max_workers, mp_context=context) as executor:
# 执行任务
executor.shutdown(wait=True)
通过以上修改,你应该能够解决每次任务结束后留下僵尸进程的问题。请注意,spawn方式可能会导致一些额外的开销,因此你可以尝试使用fork方式来获得更好的性能,但这取决于你的具体需求和操作系统的支持情况。
问题点: 多进程出现僵尸进程.
分析思路:僵尸进程的出现原因和管道 multiprocessing.Pipe()有关.应该将管道变成全局变量进行使用,这样进程才能在完成后正常退出。
https://stackoverflow.com/questions/30506489/python-multiprocessing-leading-to-many-zombie-processes
多进程模块的 Process 类在进程结束时不会自动清理,这就是为什么在你的代码中会留下僵尸进程的原因。因此,可以在代码中使用完毕后手动清理掉。
python os.popen 出现僵尸进程 解决方法
可以参考下
在每次任务结束后调用child_process.terminate()来终止子进程,确保子进程正确结束并不会成为僵尸进程