python apscheduler开多进程出问题

各位请指教!本人非专业,业余水平,写的不对的请不要嘲笑。
我编了一个定时获取商品价格,然后处理商品价格的小程序,用的是Apscheduler库。用BackgroundScheduler调度器,单线程运行时没有问题。但当用BlockingScheduler调度器,开多进程时(商品比较多,数据计算工作量大,需要并发运行),整个程序就乱套了。
程序的思路是:定义商品shangpin类,生成商品shangpin成员;然后定时运行fill_ts函数,获取价格,赋值给成员变量;再定时运行计算价格程序,为方便描述,这里改成了dayin打印函数。
开多进程后,获取的商品价格,有时与成员变量一致,有时不同,感觉乱套了。是不是进程不能获取成员变量,还是什么原因?

简要程序如下:

import time
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler

liebiao = [1,2]
cishu = 1

class shangpin:
    def __init__(self, price):
        self.price = price

    def getshi(self,shuju1):
        self.price.append(shuju1)

    def print(self):
        print(self.price)

def fill_ts(jiage):
    se708.getshi(jiage)
    
def dayin():
    global cishu
    se708.print()
    print('这是第'+ str(cishu) + '次打印')
    cishu += 1
    time.sleep(6)
    
now66 = datetime.datetime.now()
date_str = now66.strftime('%Y-%m-%d')
timed31 = date_str + ' 12:34:00'    
timed32 = date_str + ' 12:34:05'  
timed33 = date_str + ' 12:34:10'  
timed40 = date_str + ' 12:59:20'  

se708 = shangpin([5,6])
fill_ts(8)
dayin()

if __name__ == '__main__':
 
    #配置调度器线程数、进程数
    scheduler_settings = {
        "executors" : {
            #"default": ThreadPoolExecutor(5),
            "processpool": ProcessPoolExecutor(8)
        },
        "job_defaults": {
             "coalesce": False,
            "misfire_grace_time": 10,
            "replace_existing": True,
            "max_instances": 1
        },
    }

    scheduler1 = BlockingScheduler(**scheduler_settings)
    scheduler1.add_executor("processpool")
    scheduler1.add_job(fill_ts, 'interval', seconds=20, start_date=timed31, end_date=timed40, args=[9])
    scheduler1.add_job(dayin, 'interval', seconds=20, start_date=timed32, end_date=timed40)
    scheduler1.add_job(dayin, 'interval', seconds=20, start_date=timed33, end_date=timed40)

    #开始任务计划
    scheduler1.start()

形成的结果如下:

img

我是没搞懂Apscheduler库多进程运行方式?还是我程序本身有问题?
有没有什么好的解决办法?请指教!

你好,第一次调度器运行后任务就乱了,可以假设某个进程将 self.price 的值修改了,但是其他进程不知道这个修改,所以就会出现价格不一致的情况。

你可以使用共享内存或者消息队列来解决这个问题。比如,在共享内存中定义一个数据结构用于存储商品价格信息和更新时间,然后在每个进程中访问共享内存获取价格信息和更新时间。在修改价格信息时,需要加锁避免写冲突。在更新时间时,可以使用原子操作或者信号量来避免并发冲突。

以下是基于 multiprocessing 模块和共享内存的一个示例代码:


python
import time
import datetime
import multiprocessing as mp
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler

# 定义共享内存数据结构
class Shangpin:
    def __init__(self, price, update_time):
        self.price = price
        self.update_time = update_time

# 获取共享内存数据
def get_shangpin(shangpin):
    return shangpin.price, shangpin.update_time

# 更新共享内存数据
def set_shangpin(shangpin, price):
    with shangpin.get_lock():
        shangpin.price = price
        shangpin.update_time = datetime.datetime.now()

# 定时更新商品价格
def fill_ts(shangpin, jiage):
    set_shangpin(shangpin, jiage)

# 定时打印商品价格
def dayin(shangpin):
    price, update_time = get_shangpin(shangpin)
    print(price, update_time)

if __name__ == '__main__':

    # 初始化共享内存
    shangpin = mp.Manager().Value(Shangpin([5,6], datetime.datetime.now()), lock=True)

    # 配置调度器线程数、进程数
    scheduler_settings = {
        "executors" : {
            #"default": ThreadPoolExecutor(5),
            "processpool": ProcessPoolExecutor(8)
        },
        "job_defaults": {
             "coalesce": False,
            "misfire_grace_time": 10,
            "replace_existing": True,
            "max_instances": 1
        },
    }

    # 创建调度器
    scheduler1 = BlockingScheduler(**scheduler_settings)
    scheduler1.add_executor("processpool")
    scheduler1.add_job(fill_ts, 'interval', seconds=20, args=[shangpin, 9])
    scheduler1.add_job(dayin, 'interval', seconds=20, args=[shangpin])

    # 开始任务计划
    scheduler1.start()

在以上代码中,我们可以通过 mp.Manager().Value() 方法创建一个共享内存变量 shangpin,它的类型为 Shangpin 类。然后,在 fill_ts 和 dayin 函数中就可以通过 get_shangpin 和 set_shangpin 函数来访问共享内存中的数据。注意,在修改价格信息时,我们使用了共享内存锁(lock)来避免并发冲突。
有用请采纳,谢谢!

回答部分参考、引用ChatGpt以便为您提供更准确的答案:

根据您描述的情况,您使用的是 Apscheduler 库进行定时任务的调度。在您的程序中,您使用了两种调度器:BackgroundScheduler 和 BlockingScheduler,并尝试使用多进程进行并发运行。然而,您发现当使用 BlockingScheduler 和多进程时,程序出现了混乱的情况。

造成这种混乱的原因可能是多进程环境下的数据共享和同步问题。在多进程环境中,每个进程都有自己的内存空间,因此各个进程之间的数据是相互独立的。当您的程序在多个进程中并发运行时,每个进程会创建自己的 shangpin 对象实例,它们之间的成员变量是相互独立的,因此可能导致不同进程获取的商品价格与成员变量不一致。

为了解决这个问题,您可以考虑使用进程间通信(Inter-Process Communication,IPC)的方式来实现进程间的数据共享。在 Python 中,可以使用 multiprocessing 模块提供的共享内存和进程间队列等机制来实现数据共享。您可以将商品价格存储在共享内存中或使用进程间队列来传递数据,以确保各个进程之间的一致性。

另外,您还可以考虑使用其他的调度器库,如 Celery,它是一个分布式任务队列,可以方便地实现任务的并发执行和数据共享。

总结一下,多进程环境下的数据共享和同步问题可能导致您的程序出现混乱。您可以考虑使用进程间通信的方式来解决数据共享的问题,或者尝试其他的调度器库来实现并发执行和数据共享的需求。

多进程环境下,每个进程有独立的变量作用域。所以进程之间无法直接访问彼此的变量。

帮你搜了一下相关信息,整理如下:
在使用 Apscheduler 库时,需要注意多进程共享数据的问题。由于每个进程都有自己的内存空间,因此如果多个进程同时访问同一个变量,会导致数据不一致的问题。

在你的程序中,如果 fill_ts 函数和 dayin 函数在不同的进程中运行,就会出现多进程共享数据的问题。因为 fill_ts 函数修改了 shangpin 对象的成员变量,而 dayin 函数又要读取该成员变量进行打印。由于进程之间不共享内存空间,因此 dayin 函数可能会读取到 fill_ts 函数修改前的数据,导致打印结果不一致。

为了避免这种问题,可以考虑使用进程池来管理进程。具体地,可以将 fill_ts 和 dayin 函数都放入进程池中运行,此时它们就在同一个进程内,可以共享 shangpin 对象的成员变量,避免多进程共享数据的问题。

下面是修改后的代码示例:

import time
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler

class shangpin:
    def __init__(self, price):
        self.price = price
    def getshi(self,shuju1):
        self.price.append(shuju1)
    def print(self):
        print(self.price)

def fill_ts(jiage):
    se708.getshi(jiage)

def dayin():
    global cishu
    se708.print()
    print('这是第'+ str(cishu) + '次打印')
    cishu += 1
    time.sleep(6)

now66 = datetime.datetime.now()
date_str = now66.strftime('%Y-%m-%d')
timed31 = date_str + ' 12:34:00'
timed32 = date_str + ' 12:34:05'
timed33 = date_str + ' 12:34:10'
timed40 = date_str + ' 12:59:20'

se708 = shangpin([5,6])
fill_ts(8)
dayin()

if __name__ == '__main__':
    # 配置调度器线程数、进程数
    scheduler_settings = {
        "executors": {
            #"default": ThreadPoolExecutor(5),
            "processpool": ProcessPoolExecutor(8)
        },
        "job_defaults": {
            "coalesce": False,
            "misfire_grace_time": 10,
            "replace_existing": True,
            "max_instances": 1
        },
    }
    scheduler1 = BlockingScheduler(**scheduler_settings)
    scheduler1.add_executor("processpool")
    scheduler1.add_job(fill_ts, 'interval', seconds=20, start_date=timed31, end_date=timed40, args=[9])
    scheduler1.add_job(dayin, 'interval', seconds=20, start_date=timed32, end_date=timed40)
    scheduler1.add_job(dayin, 'interval', seconds=20, start_date=timed33, end_date=timed40)

    # 开始任务计划
    scheduler1.start()

在上面的代码中,我们定义了一个进程池,并将其作为调度器的一部分。然后将 fill_ts 和 dayin 函数都添加到调度器中,并指定运行时间。此时,fill_ts 和 dayin 函数都在进程池中运行,在同一个进程内,可以共享 se708 对象的成员变量 price,避免多进程共享数据的问题。

来自百度GPT:
使用Apscheduler库时,当使用BlockingScheduler调度器并开启多进程时,您的程序就出现了奇怪的问题,商品价格的成员变量有时与实际价格不一致。这可能是由于多进程并发导致的竞态条件问题。在多进程环境中,多个进程同时访问和修改同一个共享变量时,可能会导致数据不一致的问题。为了避免这种情况,您可以考虑使用进程间通信(IPC)机制,如共享内存或消息队列,来在多个进程之间共享数据。另外,您还可以使用锁机制来保证同一时间内只有一个进程可以访问和修改共享变量。

关于Apscheduler库多进程运行方式,不同的调度器有不同的特点。BlockingScheduler是阻塞式调度器,它使用单个线程来运行所有的任务,因此不会出现多线程或多进程并发的情况。而BackgroundScheduler是非阻塞式调度器,它可以使用多个线程来运行任务,从而实现并发执行。当使用BackgroundScheduler调度器时,如果您开启了多个线程或进程来执行任务,那么就需要注意线程安全和并发问题,以避免出现数据不一致等问题。

当涉及到多进程环境下的数据共享时,可以使用 multiprocessing 模块提供的共享内存和进程间队列来实现数据共享。 multiprocessing.Value 和 multiprocessing.Array 可以用来创建共享内存对象,这些对象可以被多个进程访问。这些对象与 ctypes 模块中的对象类似,但是专门为多进程环境设计。 multiprocessing.Queue 可以用来创建队列,这些队列可以用来在进程之间传递数据。这是一种线程和进程安全的通信方式,可以用来实现生产者-消费者模式或其他形式的进程间通信。需要注意的是,在共享数据时,需要使用同步机制来确保数据以线程安全的方式访问。 multiprocessing.Lock 和 multiprocessing.RLock 可以用来在访问共享资源时强制执行互斥锁。总的来说, multiprocessing 模块提供了一组强大的工具,用于处理 Python 中的多进程和进程间通信。


# 初始化共享内存
    shangpin = mp.Manager().Value(Shangpin([5, 6], datetime.datetime.now()), lock=True)


```python

import time
import datetime
import multiprocessing as mp
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler


# 定义共享内存数据结构
class Shangpin:
    def __init__(self, price, update_time):
        self.price = price
        self.update_time = update_time


# 获取共享内存数据
def get_shangpin(shangpin):
    return shangpin.price, shangpin.update_time


# 更新共享内存数据
def set_shangpin(shangpin, price):
    with shangpin.get_lock():
        shangpin.price = price
        shangpin.update_time = datetime.datetime.now()


# 定时更新商品价格
def fill_ts(shangpin, jiage):
    set_shangpin(shangpin, jiage)


# 定时打印商品价格
def dayin(shangpin):
    price, update_time = get_shangpin(shangpin)
    print(price, update_time)


if __name__ == '__main__':
    # 初始化共享内存
    shangpin = mp.Manager().Value(Shangpin([5, 6], datetime.datetime.now()), lock=True)

    # 配置调度器线程数、进程数
    scheduler_settings = {
        "executors": {
            # "default": ThreadPoolExecutor(5),
            "processpool": ProcessPoolExecutor(8)
        },
        "job_defaults": {
            "coalesce": False,
            "misfire_grace_time": 10,
            "replace_existing": True,
            "max_instances": 1
        },
    }

    # 创建调度器
    scheduler1 = BlockingScheduler(**scheduler_settings)
    scheduler1.add_executor("processpool")
    scheduler1.add_job(fill_ts, 'interval', seconds=20, args=[shangpin, 9])
    scheduler1.add_job(dayin, 'interval', seconds=20, args=[shangpin])

    # 开始任务计划
    scheduler1.start()






进程异步导致的,需要加互斥锁。另外不知道你这个代码我无法运行。

img