Python 多进程 产生僵尸进程问题

python3.10 linux 每个任务用这个multiprocessing.Process 起一个进程

class Process(multiprocessing.Process):
    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        self._pconn, self._cconn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            multiprocessing.Process.run(self)
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception


def execute_task(task_func, params, task_id, timeout=None):
    """
    在新进程中执行任务
    """
    try:
        # parent_conn, child_conn = multiprocessing.Pipe(True)
        child_process = Process(target=task_func, args=(params,))
        child_process.start()
        child_process.join(timeout)
        if child_process.exception:
            error, traceback = child_process.exception
            msg = f'{task_id} {child_process.pid} error, info: {error}'
            code = CallBackStatus.error
        else:
            if child_process.is_alive():
                msg = f'{task_id} {child_process.pid} timeout !!!'
                code = CallBackStatus.error
                child_process.terminate()
            else:
                msg = f'{task_id} {child_process.pid} success'
                code = CallBackStatus.finish
    except Exception as e:
        msg = f'{task_id} error, info: {e}'
        code = CallBackStatus.error
    return code, msg

进程内部用 ProcessPoolExecutor 进程池 forkserver 方式起多个进程干活

context = multiprocessing.get_context('forkserver')
with ProcessPoolExecutor(max_workers=max_workers, mp_context=context) as executor:

每次任务结束后会留下两个僵尸进程

img

img

在你的代码中,产生僵尸进程的问题很可能是因为在每次任务结束后,你没有及时对子进程进行适当的回收操作。为了解决这个问题,你可以在主进程中使用 multiprocessing.Process.join() 方法来等待子进程结束并回收资源。同时,使用 atexit 模块来确保在程序退出时,所有子进程都得到正确的回收。

以下是修改后的代码示例,针对每次任务结束进行子进程回收操作:

python
Copy code
import multiprocessing
import traceback
from concurrent.futures import ProcessPoolExecutor

class Process(multiprocessing.Process):
    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        self._pconn, self._cconn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            multiprocessing.Process.run(self)
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception

def execute_task(task_func, params, task_id, timeout=None):
    try:
        child_process = Process(target=task_func, args=(params,))
        child_process.start()
        child_process.join(timeout)    import multiprocessing
import traceback
from concurrent.futures import ProcessPoolExecutor

class Process(multiprocessing.Process):
    def __init__(self, *args, **kwargs):
        multiprocessing.Process.__init__(self, *args, **kwargs)
        self._pconn, self._cconn = multiprocessing.Pipe()
        self._exception = None

    def run(self):
        try:
            multiprocessing.Process.run(self)
            self._cconn.send(None)
        except Exception as e:
            tb = traceback.format_exc()
            self._cconn.send((e, tb))

    @property
    def exception(self):
        if self._pconn.poll():
            self._exception = self._pconn.recv()
        return self._exception

def execute_task(task_func, params, task_id, timeout=None):
    try:
        child
  # 等待子进程结束
        if child_process.exception:
            error, traceback = child_process.exception
            msg = f'{task_id} {child_process.pid} error, info: {error}'
            code = CallBackStatus.error
        else:
            if child_process.is_alive():
                msg = f'{task_id} {child_process.pid} timeout !!!'
                code = CallBackStatus.error
                child_process.terminate()
            else:
                msg = f'{task_id} {child_process.pid} success'
                code = CallBackStatus.finish
    except Exception as e:
        msg = f'{task_id} error, info: {e}'
        code = CallBackStatus.error
    return code, msg

def main():
    # ... 其他代码 ...

if __name__ == '__main__':
    import atexit
    
    # 在程序退出时回收子进程资源
    atexit.register(multiprocessing.get_start_method().spawn.__self__.terminate)
    
    main()

通过上述修改,你可以确保在每次任务结束后,使用 join() 方法等待子进程的结束,并在程序退出时通过 atexit 模块来进行资源的回收,从而避免产生僵尸进程。

如果超时发生,确保再次调用join()来清理进程。
os.waitpid()来等待子进程并获取其退出状态。
atexit确保在程序退出时执行某些清理操作。

你的代码中每次任务结束后留下两个僵尸进程的问题可能是由于同时使用了multiprocessing.Process和ProcessPoolExecutor导致的。

当你使用multiprocessing.Process创建一个Process对象时,它会为该特定任务启动一个新的进程。然而,当你使用带有forkserver上下文的ProcessPoolExecutor时,它会创建一个处理多个任务的工作进程池。

在你的代码中,你为每个任务创建了一个Process对象并单独启动它,但同时又使用ProcessPoolExecutor来管理一个工作进程池。这可能会导致冲突和意外行为,包括僵尸进程的产生。

为了解决这个问题,你应该选择一种处理进程的方法:要么使用multiprocessing.Process,要么使用ProcessPoolExecutor。以下是修改代码以仅使用ProcessPoolExecutor的示例:

from concurrent.futures import ProcessPoolExecutor

def execute_task(task_func, params, task_id, timeout=None):
    try:
        with ProcessPoolExecutor() as executor:
            future = executor.submit(task_func, params)
            result = future.result(timeout=timeout)
            msg = f'{task_id} success'
            code = CallBackStatus.finish
    except Exception as e:
        msg = f'{task_id} error, info: {e}'
        code = CallBackStatus.error

    # Handle the result and return necessary information



通过仅使用ProcessPoolExecutor,你可以避免不同进程管理方法之间的冲突,并防止产生僵尸进程。

【以下回答由 GPT 生成】

这段代码中的问题是关于Python多进程产生僵尸进程的。在execute_task函数中,以自定义的Process类的形式创建了一个子进程,并通过管道进行通信。但是,由于没有调用child_process.join()来等待子进程的结束,可能会导致子进程变成僵尸进程。

为了解决这个问题,可以在execute_task函数的末尾添加child_process.join(),以确保子进程的正常退出。具体代码修改如下:

def execute_task(task_func, params, task_id, timeout=None):
    """
    Executes a task in a new process
    """
    try:
        child_process = Process(target=task_func, args=(params,))
        child_process.start()
        child_process.join(timeout)

        if child_process.exception:
            error, traceback = child_process.exception
            msg = f'{task_id} {child_process.pid} error, info: {error}'
            code = CallBackStatus.error
        else:
            if child_process.is_alive():
                msg = f'{task_id} {child_process.pid} timeout !!!'
                code = CallBackStatus.error
                child_process.terminate()
            else:
                msg = f'{task_id} {child_process.pid} success'
                code = CallBackStatus.finish

        child_process.join()  # 等待子进程结束
    except Exception as e:
        msg = f'{task_id} error, info: {e}'
        code = CallBackStatus.error

    return code, msg

这样修改之后,就能确保子进程在完成任务后正常退出,防止产生僵尸进程。



【相关推荐】



如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^

任务结束前查一下僵尸进程杀一下

参考结合GPT4.0、文心一言,如有帮助,恭请采纳。
你可以修改 execute_task 函数来处理这个信息,这样的话,如果子进程发送了异常信息,那么父进程就会抛出异常,这样就可以在调用 execute_task 函数的地方捕获这个异常并进行处理

def execute_task(task_func, params, task_id, timeout=None):  
    """  
    在新进程中执行任务  
    """  
    try:  
        child_process = Process(target=task_func, args=(params,))  
        child_process.start()  
        child_process.join(timeout)  
        if child_process.exception:  
            error, traceback = child_process.exception  
            raise Exception(f'{task_id} {child_process.pid} error: {error}')  
        else:  
            if child_process.is_alive():  
                child_process.terminate()  
                raise Exception(f'{task_id} {child_process.pid} timeout !!!')  
            else:  
                return f'{task_id} {child_process.pid} success'  
    except Exception as e:  
        return f'{task_id} error: {e}'



def execute_task(task_func, params, task_id, timeout=None):
    """
    Executes a task in a new process
    """
    try:
        child_process = Process(target=task_func, args=(params,))
        child_process.start()
        child_process.join(timeout)
        if child_process.exception:
            error, traceback = child_process.exception
            msg = f'{task_id} {child_process.pid} error, info: {error}'
            code = CallBackStatus.error
        else:
            if child_process.is_alive():
                msg = f'{task_id} {child_process.pid} timeout !!!'
                code = CallBackStatus.error
                child_process.terminate()
            else:
                msg = f'{task_id} {child_process.pid} success'
                code = CallBackStatus.finish
        child_process.join()  # 等待子进程结束
    except Exception as e:
        msg = f'{task_id} error, info: {e}'
        code = CallBackStatus.error
    return code, msg

在每个任务结束后,你可以显式地调用join()方法来等待进程结束,并释放相关的资源。这可能会确保没有僵尸进程被留下。

child_process = Process(target=task_func, args=(params,))
child_process.start()
child_process.join(timeout)
child_process.terminate()  # 显式地终止进程


每次任务结束后留下两个僵尸进程的问题可能是由于进程池的资源管理不当导致的。
forkserver是一种启动子进程的方式,它使用了一个专门的服务器进程来管理子进程的创建和销毁。当一个子进程完成任务后,它会被回收并返回给进程池以供下次使用。然而,有时候由于资源管理的问题,子进程可能没有被正确回收,从而导致僵尸进程的产生。
要解决这个问题,你可以尝试以下几个方法:

  1. 确保在每次任务完成后,调用executor.shutdown()来正确关闭进程池。这将确保所有子进程都被正确回收。
  2. 在使用ProcessPoolExecutor时,尽量避免在任务中创建新的子进程。由于forkserver方式的限制,创建新的子进程可能会导致资源管理问题。
  3. 如果以上方法无效,你可以尝试使用其他的进程启动方式,如spawn或fork,看是否能够解决僵尸进程的问题。在使用ProcessPoolExecutor时,将进程启动方式改为spawn或fork,而不是forkserver。这样可以避免由于forkserver方式的限制导致的僵尸进程问题。
context = multiprocessing.get_context('spawn')  # 或者使用 'fork'
with ProcessPoolExecutor(max_workers=max_workers, mp_context=context) as executor:
    # 执行任务

在每次任务完成后,调用executor.shutdown(wait=True)来正确关闭进程池并等待所有子进程完成。这将确保所有子进程都被正确回收。

with ProcessPoolExecutor(max_workers=max_workers, mp_context=context) as executor:
    # 执行任务
    executor.shutdown(wait=True)

通过以上修改,你应该能够解决每次任务结束后留下僵尸进程的问题。请注意,spawn方式可能会导致一些额外的开销,因此你可以尝试使用fork方式来获得更好的性能,但这取决于你的具体需求和操作系统的支持情况。

问题点: 多进程出现僵尸进程.
分析思路:僵尸进程的出现原因和管道 multiprocessing.Pipe()有关.应该将管道变成全局变量进行使用,这样进程才能在完成后正常退出。

https://stackoverflow.com/questions/30506489/python-multiprocessing-leading-to-many-zombie-processes

多进程模块的 Process 类在进程结束时不会自动清理,这就是为什么在你的代码中会留下僵尸进程的原因。因此,可以在代码中使用完毕后手动清理掉。

python os.popen 出现僵尸进程 解决方法
可以参考下

python os.popen 出现僵尸进程 解决方法_python多进程 僵尸进程_whatday的博客-CSDN博客 调用os模块,使用popen方法执行一个命令,可以看到在进程中产生了一个僵尸进程。如果关掉这个进程,只需要在后面加入close()函数即可。对于第一点,如果我们只是调用一些简单的 shell 命令,比如。同理,类似 file 的操作,我们可以在。返回的 file-like object 封装了。之类的话,倒是一般不需要担心执行可能会失败;方法,我们借助 context 机制实现对。对于第二点,回收子进程的资源 调用。的接口,这也是我们可以对其调用。调用,完成对子进程资源回收。下面是产生僵尸进程的例子。_python多进程 僵尸进程 https://blog.csdn.net/whatday/article/details/131616237

在每次任务结束后调用child_process.terminate()来终止子进程,确保子进程正确结束并不会成为僵尸进程

python3 多进程 multiprocessing对僵尸进程的处理_whatday的博客-CSDN博客 之所以想写这么一篇博客,是在学TCP/IP网络编程时,讲到多进程服务器的构建时,父进程需要对子进程进行处理,不然的话会产生一堆的僵尸进程,最后会危害整个系统。然后想到我之前写的爬虫,Dwonloader使用的就是多进程的方式来并发下载数据。在Downloader中,我的做法是凑够一定数量的请求,然后开启一个进程,让这个进程去处理这堆请求。而我只开启了进程,并没有去管理这些进程,突然有点心慌慌,这个... https://blog.csdn.net/whatday/article/details/104375996/?ops_request_misc=&request_id=&biz_id=102&utm_term=Python%20%E5%A4%9A%E8%BF%9B%E7%A8%8B%20%E4%BA%A7%E7%94%9F%E5%83%B5%E5%B0%B8%E8%BF%9B%E7%A8%8B%E9%97%AE%E9%A2%98&utm_medium=distribute.pc_search_result.none-task-blog-2~all~sobaiduweb~default-0-104375996.142^v93^chatsearchT3_2&spm=1018.2226.3001.4187