apscheduler_handler.py
import logging
from apscheduler.schedulers.background import BackgroundScheduler
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.jobstores import register_events
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, EVENT_SCHEDULER_SHUTDOWN
logger = logging.getLogger('django')
def run_scheduler():
scheduler = BackgroundScheduler({"apscheduler.timezone": "Asia/Shanghai"}, deamon=True)
scheduler.add_jobstore(DjangoJobStore(), 'default')
def job_listener(event):
if event.code == 2 ** 1:
logger.info('监听:调度器关闭')
else:
job_id = event.job_id
if event.scheduled_run_time:
scheduler_run_time = event.scheduled_run_time.strftime("%Y-%m-%d %H:%M:%S")
else:
scheduler_run_time = ''
if event.exception:
logger.error(f'监听->作业{job_id}在{scheduler_run_time}执行失败,原因:{event.exception.args[0]}')
else:
logger.info(f'监听->作业{job_id}在{scheduler_run_time}执行成功.')
scheduler.add_listener(job_listener, EVENT_JOB_ERROR | EVENT_JOB_EXECUTED | EVENT_SCHEDULER_SHUTDOWN)
register_events(scheduler)
scheduler.start()
logger.info(f"APScheduler运行状态:{scheduler.state}")
return scheduler
class APschedulerHandler:
def __init__(self, scheduler, job_fun=None):
self.scheduler = scheduler
self.job_fun = job_fun
def add_scheduler_task(self, task_type=None, task_id=None, func_kwargs=None, trigger_kwargs=None, run_time=None):
if task_type == 'circulation':
self.scheduler.add_job(func=self.job_fun, trigger='cron', id=task_id, replace_existing=True, kwargs=func_kwargs, **trigger_kwargs)
if task_type == 'timing':
try:
self.scheduler.add_job(func=self.job_fun, trigger='date', id=task_id, kwargs=func_kwargs, run_date=run_time, replace_existing=True)
except Exception as e:
print(e)
def del_scheduler_task(self, task_type, taskId):
self.scheduler.remove_job(taskId)
scheduler = APschedulerHandler(run_scheduler())
view.py
from api_automation_test.apscheduler_handler import scheduler
from api_test.common.apscheduler_handler import cases_run
scheduler.job_fun = cases_run
def add_job(request):
data = request.data
scheduler.add_scheduler_task(task_type=data['type'], task_id=data['taskId'], func_kwargs=fun_kwargs,
trigger_kwargs=trigger_kwargs)
python manger.py runserver后,前端操作添加corn的任务,比如10:30/35/40 执行,然后时间到10:30分的时候job并没有执行。但是关掉调式,重新runserver后,提示10:30分的任务已经miss了,然后35/40那两个时间点的任务会正常跑
我怀疑是不是添加完job,并没有被schedul, 然后我试着添加完job,reschedul_job 和 resume_job 貌似也不生效。
你这个是两种任务添加的方式在混合使用,你删除 register_events(scheduler)试试