python 类内多进程写入到成员变量的问题


import multiprocessing as mp
import time

class MyClass:
    def __init__(self):
        self.data = {}
        self.data_queue = mp.Queue()

    def write(self):
        while True:
            data = self.data_queue.get()
            if data is None:
                break
            # 写入 self.data
            self.data[data[0]] = data[1]
            print(f"Writing data: {data}")
            # print(self.data)
  
    
    def quick_test(self):
        # 启动写入进程
        processes = []
        for _ in range(2):
            p = mp.Process(target=self.write)
            p.start()
            processes.append(p)

        # 向队列中添加数据
        data = [("key1", "value1"), ("key2", "value2"),  ("key3", "value3"),None,None,]
        for d in data:
            self.data_queue.put(d)

        # 等待写入进程结束
        for p in processes:
            p.join()

        print(self.data)

my_obj = MyClass()
my_obj.quick_test()
print(my_obj.data)

诚心问各位,为什么上面的两个print都输出空?而若在write里面进行print却非空?

引用 皆我百晓生 小程序回复内容作答:

在多进程中,每个进程都有自己独立的内存空间,因此在子进程中修改的self.data不会反映到主进程的self.data上。

为了解决这个问题,可以使用Manager类来创建一个可以在多个进程之间共享的字典。修改代码如下:

import multiprocessing as mp
import time

class MyClass:
    def __init__(self):
        self.manager = mp.Manager()
        self.data = self.manager.dict()
        self.data_queue = self.manager.Queue()

    def write(self):
        while True:
            data = self.data_queue.get()
            if data is None:
                break
            # 写入 self.data
            self.data[data[0]] = data[1]
            print(f"Writing data: {data}")

    def quick_test(self):
        # 启动写入进程
        processes = []
        for _ in range(2):
            p = mp.Process(target=self.write)
            p.start()
            processes.append(p)

        # 向队列中添加数据
        data = [("key1", "value1"), ("key2", "value2"),  ("key3", "value3"),None,None,]
        for d in data:
            self.data_queue.put(d)

        # 等待写入进程结束
        for p in processes:
            p.join()

        print(self.data)

my_obj = MyClass()
my_obj.quick_test()
print(my_obj.data)

通过使用self.manager.dict()来创建一个可以在多个进程之间共享的字典对象。这样,子进程中的修改操作就会同步到主进程中的self.data上。

注意:在使用Manager类时,创建的共享对象会自动使用IPC(Inter-process Communication)来实现进程间的通信,因此会产生一些额外的性能开销。如果数据量较小,可以考虑使用multiprocessing.Valuemultiprocessing.Array来共享数据,这样可以避免一些额外的开销。

【以下回答由 GPT 生成】

问题的输出结果是空的,是因为多进程是并行执行的,每个进程都有自己的内存空间。在这种情况下,每个进程都有自己的self.data对象,而不是共用同一个对象。所以,当一个进程修改了self.data后,其它进程并不会感知到这个修改。

quick_test方法中,启动了两个写入进程,这两个进程同时对self.data进行写操作。因为是并行执行的,所以两个进程可能会同时修改self.data,这个时候就会发生冲突。如果两个进程同时修改同一个键的值,那么最后的结果将是不确定的。

当你在write方法内部进行打印输出时,因为这些打印语句是在单个进程内部执行的,所以它们会正确地输出修改后的self.data

解决这个问题的一种方式是使用进程锁来确保多个进程之间的操作是互斥的。下面是一个修改后的代码示例:

import multiprocessing as mp
import time

class MyClass:
    def __init__(self):
        self.data = {}
        self.data_queue = mp.Queue()
        self.lock = mp.Lock()

    def write(self):
        while True:
            data = self.data_queue.get()
            if data is None:
                break
            # 使用锁来确保写操作的互斥
            with self.lock:
                self.data[data[0]] = data[1]
            print(f"Writing data: {data}")

    def quick_test(self):
        # 启动写入进程
        processes = []
        for _ in range(2):
            p = mp.Process(target=self.write)
            p.start()
            processes.append(p)

        # 向队列中添加数据
        data = [("key1", "value1"), ("key2", "value2"),  ("key3", "value3"), None, None]
        for d in data:
            self.data_queue.put(d)

        # 等待写入进程结束
        for p in processes:
            p.join()

        print(self.data)

my_obj = MyClass()
my_obj.quick_test()
print(my_obj.data)

在修改后的代码中,我们添加了一个self.lock对象,使用with self.lock语句来确保每次写操作的互斥。这样,多个写入进程就可以正确地修改self.data了。最后的输出结果应该是正确的。

请注意,为了确保正确性,在多进程编程中操作共享数据时,通常需要使用进程锁或其他同步机制来确保数据的一致性和避免竞争条件。



【相关推荐】



如果你已经解决了该问题, 非常希望你能够分享一下解决方案, 写成博客, 将相关链接放在评论区, 以帮助更多的人 ^-^