各位请指教!本人非专业,业余水平。
我最近在做一个定时获取商品价格,然后处理商品价格的小程序,用的是Apscheduler库,BlockingScheduler调度器,多进程计算处理(商品比较多,数据计算工作量大,需要并发运行)。
程序的思路是:定时运行fill_ts函数,获取价格,修改共享内存;再定时运行计算价格程序,为方便描述,这里改成了dayin打印函数。
问题是:我这个程序运算结束后,需要自动退出,再启动运行其他程序。之前用的sys.exit(),但是用BlockingScheduler调度器后,Scheduler.start()之后的程序好像都不会再运行,用sys.exit()没反应。有没有好的办法,使我这个程序运算完成后,自动退出,关闭窗口。
简要程序如下:
import time
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
import multiprocessing as mp
def fill_ts(shangpin, cishu1, lock):
with lock:
li0 = shangpin[0]
li1 = shangpin[1]
li2 = shangpin[2]
del shangpin[:]
li0['SE700']['jiage'] = 116
li0['SE702']['kucun'].append(188)
shangpin.append(li0)
shangpin.append(li1)
shangpin.append(li2)
jishu = cishu1['cishu']
jishu += 1
cishu1['cishu'] = jishu
print('这是第' + str(cishu1['cishu']) + '次计算')
def dayin(jiage, cishu1, lock):
with lock:
print('这是第'+ str(cishu1['cishu']) + '次打印')
print(jiage)
time.sleep(2)
ligx0 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
ligx1 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
ligx2 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
if __name__ == '__main__':
#配置调度器线程数、进程数
scheduler_settings = {
"executors" : {
#"default": ThreadPoolExecutor(5),
"processpool": ProcessPoolExecutor(8)
},
"job_defaults": {
"coalesce": False,
"misfire_grace_time": 10,
"replace_existing": True,
"max_instances": 1
},
}
list0 = [ligx0, ligx1, ligx2]
cishu = {'cishu':0}
manager = mp.Manager()
lock = mp.Manager().Lock()
d = manager.list(list0)
d2 = manager.dict(cishu)
scheduler1 = BlockingScheduler(**scheduler_settings)
scheduler1.add_executor("processpool")
scheduler1.add_job(fill_ts, 'interval', seconds=10, args=[d, d2, lock])
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
#开始任务计划
scheduler1.start()
import time
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ProcessPoolExecutor
import multiprocessing as mp
# 更新商品价格函数
def fill_ts(shangpin, cishu1, lock):
with lock:
li0 = shangpin[0]
li1 = shangpin[1]
li2 = shangpin[2]
del shangpin[:]
li0['SE700']['jiage'] = 116
li0['SE702']['kucun'].append(188)
shangpin.append(li0)
shangpin.append(li1)
shangpin.append(li2)
jishu = cishu1['cishu']
jishu += 1
cishu1['cishu'] = jishu
print('这是第' + str(cishu1['cishu']) + '次计算')
# 打印商品价格函数
def dayin(jiage, cishu1, lock):
with lock:
print('这是第'+ str(cishu1['cishu']) + '次打印')
print(jiage)
time.sleep(2)
if __name__ == '__main__':
# 配置调度器线程数、进程数
scheduler_settings = {
"executors" : {
"processpool": ProcessPoolExecutor(8)
},
"job_defaults": {
"coalesce": False,
"misfire_grace_time": 10,
"replace_existing": True,
"max_instances": 1
},
}
# 初始化商品信息和计算次数
list0 = [
{'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]}},
{'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]}},
{'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]}}
]
cishu = {'cishu':0}
manager = mp.Manager()
lock = mp.Manager().Lock()
d = manager.list(list0)
d2 = manager.dict(cishu)
# 使用后台调度器
scheduler1 = BackgroundScheduler(**scheduler_settings)
scheduler1.add_executor("processpool")
# 设置最大计算次数
MAX_COUNT = 5
# 添加任务
scheduler1.add_job(fill_ts, 'interval', seconds=10, args=[d, d2, lock])
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
# 开始调度
scheduler1.start()
# 检查计算次数,当达到最大次数时关闭调度器
while True:
if d2['cishu'] >= MAX_COUNT:
scheduler1.shutdown()
print("调度器已关闭,程序将退出.")
break
time.sleep(1)
print("程序退出...")
可以使用atexit模块注册一个退出函数,在主程序退出时执行。在函数中,你可以将计算进程杀死,关闭窗口等
基于new bing部分指引作答:
要实现程序运算结束后自动退出和关闭窗口,可以使用以下方法:以下是在你提供的代码基础上进行了修改的版本,以实现在程序运算结束后自动退出和关闭窗口:
import sys
import signal
import atexit
from apscheduler.schedulers.blocking import BlockingScheduler
# ...
def exit_handler():
print("Exiting...")
# 这里可以添加需要在程序退出时执行的清理操作
sys.exit()
def fill_ts(shangpin, cishu1, lock):
with lock:
li0 = shangpin[0]
li1 = shangpin[1]
li2 = shangpin[2]
del shangpin[:]
li0['SE700']['jiage'] = 116
li0['SE702']['kucun'].append(188)
shangpin.append(li0)
shangpin.append(li1)
shangpin.append(li2)
jishu = cishu1['cishu']
jishu += 1
cishu1['cishu'] = jishu
print('这是第' + str(cishu1['cishu']) + '次计算')
# 运算结束后自动退出
exit_handler()
def dayin(jiage, cishu1, lock):
with lock:
print('这是第'+ str(cishu1['cishu']) + '次打印')
print(jiage)
time.sleep(2)
# 运算结束后自动退出
exit_handler()
ligx0 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
ligx1 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
ligx2 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
if __name__ == '__main__':
# 配置调度器线程数、进程数
scheduler_settings = {
"executors" : {
#"default": ThreadPoolExecutor(5),
"processpool": ProcessPoolExecutor(8)
},
"job_defaults": {
"coalesce": False,
"misfire_grace_time": 10,
"replace_existing": True,
"max_instances": 1
},
}
list0 = [ligx0, ligx1, ligx2]
cishu = {'cishu':0}
scheduler1 = BlockingScheduler(**scheduler_settings)
scheduler1.add_executor("processpool")
lock = mp.Manager().Lock()
d = manager.list(list0)
d2 = manager.dict(cishu)
scheduler1.add_job(fill_ts, 'interval', seconds=10, args=[d, d2, lock])
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
# 注册退出函数和信号处理
atexit.register(exit_handler)
signal.signal(signal.SIGINT, exit_handler)
signal.signal(signal.SIGTERM, exit_handler)
# 开始任务计划
scheduler1.start()
在上述代码中,我们添加了 exit_handler() 函数来处理程序退出时的清理操作,并在 fill_ts() 和 dayin() 函数末尾调用 exit_handler() 来实现运算结束后自动退出。
此外,我们还使用 atexit.register() 函数注册了 exit_handler() 作为退出函数,并使用 signal.signal() 函数注册了 exit_handler() 来处理 SIGINT 和 SIGTERM 信号。
使用 atexit 模块注册一个退出函数,在该函数中执行关闭窗口的操作。
import sys
import atexit
from apscheduler.schedulers.blocking import BlockingScheduler
# ...
def exit_handler():
# 执行关闭窗口的操作,例如关闭GUI窗口或终止命令行进程
print("Exiting...")
# 这里替换为关闭窗口的具体代码
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(fill_ts, 'interval', minutes=5)
scheduler.add_job(dayin, 'interval', minutes=10)
atexit.register(exit_handler)
scheduler.start()
在上述代码中,我们使用 atexit.register() 函数将 exit_handler() 注册为程序退出时的处理函数。在 exit_handler() 中,你可以编写关闭窗口的逻辑代码。
注意:如果你使用的是 GUI 库(如Tkinter、PyQt等),则需要使用该库提供的关闭窗口的方法来替换 exit_handler() 中的占位符代码。
另一种方法是使用信号处理来捕获系统的退出信号,例如 SIGINT(Ctrl+C)或 SIGTERM(kill命令)。
import sys
import signal
from apscheduler.schedulers.blocking import BlockingScheduler
# ...
def exit_handler(signum, frame):
# 执行关闭窗口的操作,例如关闭GUI窗口或终止命令行进程
print("Exiting...")
# 这里替换为关闭窗口的具体代码
sys.exit()
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_job(fill_ts, 'interval', minutes=5)
scheduler.add_job(dayin, 'interval', minutes=10)
signal.signal(signal.SIGINT, exit_handler)
signal.signal(signal.SIGTERM, exit_handler)
scheduler.start()
在上述代码中,我们使用 signal.signal() 函数将 exit_handler() 注册为 SIGINT 和 SIGTERM 信号的处理函数。在 exit_handler() 中,你可以编写关闭窗口的逻辑代码。
请注意,使用信号处理方式时,可能需要根据具体的应用场景和平台来选择适合的信号和信号处理函数。
import time
import datetime
import sys
import atexit
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
import multiprocessing as mp
def fill_ts(shangpin, cishu1, lock):
with lock:
li0 = shangpin[0]
li1 = shangpin[1]
li2 = shangpin[2]
del shangpin[:]
li0['SE700']['jiage'] = 116
li0['SE702']['kucun'].append(188)
shangpin.append(li0)
shangpin.append(li1)
shangpin.append(li2)
jishu = cishu1['cishu']
jishu += 1
cishu1['cishu'] = jishu
print('这是第' + str(cishu1['cishu']) + '次计算')
def dayin(jiage, cishu1, lock):
with lock:
print('这是第'+ str(cishu1['cishu']) + '次打印')
print(jiage)
time.sleep(2)
ligx0 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
ligx1 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
ligx2 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
def exit_program():
# 自动退出的逻辑
print("程序运行结束,自动退出")
sys.exit()
if __name__ == '__main__':
# 配置调度器线程数、进程数
scheduler_settings = {
"executors" : {
#"default": ThreadPoolExecutor(5),
"processpool": ProcessPoolExecutor(8)
},
"job_defaults": {
"coalesce": False,
"misfire_grace_time": 10,
"replace_existing": True,
"max_instances": 1
},
}
list0 = [ligx0, ligx1, ligx2]
cishu = {'cishu':0}
manager = mp.Manager()
lock = mp.Manager().Lock()
d = manager.list(list0)
d2 = manager.dict(cishu)
scheduler1 = BlockingScheduler(**scheduler_settings)
scheduler1.add_executor("processpool")
scheduler1.add_job(fill_ts, 'interval', seconds=10, args=[d, d2, lock])
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
# 注册退出函数
atexit.register(exit_program)
# 开始任务计划
scheduler1.start()
你已经使用了BlockingScheduler调度器,在你的代码中,调度器已经开始运行了,然后你想要在计算完成后退出程序并关闭窗口。我的建议是在程序尾部添加scheduler.shutdown()方法,这样可以保证在调度器执行任务完毕后完全停止。
调试修改后的代码如下所示:
import time
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler
import multiprocessing as mp
def fill_ts(shangpin, cishu1, lock):
with lock:
li0 = shangpin[0]
li1 = shangpin[1]
li2 = shangpin[2]
del shangpin[:]
li0['SE700']['jiage'] = 116
li0['SE702']['kucun'].append(188)
shangpin.append(li0)
shangpin.append(li1)
shangpin.append(li2)
jishu = cishu1['cishu']
jishu += 1
cishu1['cishu'] = jishu
print('这是第' + str(cishu1['cishu']) + '次计算')
def dayin(jiage, cishu1, lock):
with lock:
print('这是第'+ str(cishu1['cishu']) + '次打印')
print(jiage)
time.sleep(2)
ligx0 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
ligx1 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
ligx2 = {
'SE700':{'name':'####', 'jiage':1, 'kucun':[]},
'SE701':{'name':'####', 'jiage':1, 'kucun':[]},
'SE702':{'name':'####', 'jiage':1, 'kucun':[]},
}
if __name__ == '__main__':
#配置调度器线程数、进程数
scheduler_settings = {
"executors" : {
#"default": ThreadPoolExecutor(5),
"processpool": ProcessPoolExecutor(8)
},
"job_defaults": {
"coalesce": False,
"misfire_grace_time": 10,
"replace_existing": True,
"max_instances": 1
},
}
list0 = [ligx0, ligx1, ligx2]
cishu = {'cishu':0}
manager = mp.Manager()
lock = mp.Manager().Lock()
d = manager.list(list0)
d2 = manager.dict(cishu)
scheduler1 = BlockingScheduler(**scheduler_settings)
scheduler1.add_executor("processpool")
scheduler1.add_job(fill_ts, 'interval', seconds=10, args=[d, d2, lock])
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
scheduler1.add_job(dayin, 'interval', seconds=10, args=[d, d2, lock], jitter=0.1)
#开始任务计划
scheduler1.start()
# 停止调度器
scheduler1.shutdown()
这样程序运行的时候就可以执行完所有任务后退出程序并关闭窗口了。
答案参考ChatGPT Plus版,整理汇总。希望能帮助你解决问题 你可以使用以下方法在程序运算完成后自动退出并关闭窗口:
在程序运算完成后调用scheduler1.shutdown()
方法关闭调度器:在合适的位置,例如fill_ts
函数内部的适当位置,添加以下代码来手动关闭调度器。
scheduler1.shutdown()
这将停止调度器并等待当前运行的作业完成。
使用sys.exit()
退出程序:在调用scheduler1.shutdown()
之后,添加以下代码来退出程序。
import sys
sys.exit()
这将终止程序并关闭窗口。
请确保在程序的适当位置调用scheduler1.shutdown()
和sys.exit()
。例如,在fill_ts
函数的末尾调用这些方法,以便在数据处理完成后自动退出程序并关闭窗口。
注意:在多进程环境中,使用sys.exit()
只能终止当前进程,并不能完全退出整个程序。如果你的程序中涉及到多个子进程,你可能需要进一步处理子进程的退出。在这种情况下,你可以考虑使用multiprocessing.Pool
来管理子进程,并在必要时关闭子进程池。
可以使用SystemEvent事件来实现在程序运行结束时自动退出。
代码构思:使用atexit模块注册一个退出处理器,当程序运行结束时,exit_handler函数将被调用,程序将自动退出。
在exit_handler函数中,打印一条消息,并使用os._exit(0)退出程序。
注意,使用os._exit(0)可以直接退出程序,无需等待当前正在执行的任务完成。
同时,使用BlockingScheduler创建了一个调度器,并添加了两个作业,分别是fill_ts和dayin函数。调度器启动后,将按照设定的时间间隔执行这两个作业。
需要注意的是,如果程序中存在死循环等无法结束的作业,调度器将无法正常结束,从而导致程序无法退出
具体实现如下:
import atexit
import os
def exit_handler():
print("程序运行结束,退出...")
os._exit(0) # 退出程序
# 注册退出处理器
atexit.register(exit_handler)
# 调度器启动
scheduler = BlockingScheduler()
scheduler.add_job(fill_ts, 'interval', seconds=10, args=(shangpin, cishu1, lock))
scheduler.add_job(dayin, 'interval', seconds=15, args=(jiage, cishu1, lock))
scheduler.start()
#未完待续,如有帮助,恭请采纳
你好,我曾经在开发内部系统时,也遇到过定时调度的各种问题。考虑过django-scheduler,celery等等。
最后,我发现,最好用的最简单的是一款开源定时任务工具:xxl-job,它支持几乎所有的语言,支持单一脚本,支持api接口。部署方式和使用方式都很简单,有详细的文档。并且,具有很好的管理性,可以随时在控制台启动,暂停,停止,更改定时任务,不需要在服务器进行操作,也无需停止,重启python应用。
我强烈推荐你也去试用一下试试。https://www.xuxueli.com/xxl-job/
需要注意的是,如果使用接口调度,一定要配置token,或者使用内网环境,否则易遭受攻击,植入木马或病毒。配置token后,即使不小心暴露到公网,由于无正确的token,也无法植入病毒脚本。
time.sleep(86400)
except (KeyboardInterrupt, SystemExit):如果使用sys.exit()无法退出程序,那么可以尝试使用以下方法:
调用scheduler1.shutdown(wait=False)来停止调度器:
在你的代码中注册退出回调函数,在函数中添加scheduler1.shutdown(wait=False)来停止调度器。wait=False参数可以确保程序立即退出而不用等待所有任务完成。
python
def exit_handler():
# 这里可以执行一些清理工作
print("程序运行结束")
scheduler1.shutdown(wait=False)
sys.exit()
atexit.register(exit_handler)
使用os._exit(0)来强制退出程序:
替代sys.exit(),可以使用os._exit(0)来直接退出程序。注意,这种方式是强制终止程序,可能会导致一些资源没有被正确释放,请谨慎使用。
python
def exit_handler():
# 这里可以执行一些清理工作
print("程序运行结束")
os._exit(0)
atexit.register(exit_handler)
请尝试以上两种方法,看看是否可以满足你的需求。
线程池执行器默认为10,内存任务存储器为memoryjobstore,如果想自己配置的话可以执行以下操作
两个任务储存器分别搭配两个执行器;同时,还要修改任务的默认参数;最后还要改时区
名称为“mongo”的MongoDBJobStore
名称为“default”的SQLAlchemyJobStore
名称为“ThreadPoolExecutor ”的ThreadPoolExecutor,最大线程20个
名称“processpool”的ProcessPoolExecutor,最大进程5个
UTC时间作为调度器的时区
默认为新任务关闭合并模式()
from pytz import utc
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.mongodb import MongoDBJobStore
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
jobstores = {
'mongo': MongoDBJobStore(),
'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')
}
executors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(5)
}
job_defaults = {
'coalesce': False,
'max_instances': 3
}
scheduler = BackgroundScheduler(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=utc)
设置新任务的默认最大实例数为3