Django 后端运行定时任务不断的通过淘宝api获取订单数据,但是运行过程中,容易报错,然后卡死不继续执行定时任务了
错误信息提示如下图:
订单任务的代码是:
import apscheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from django_apscheduler.jobstores import DjangoJobStore
import time
from shops.views import downloadOrder
scheduler = BackgroundScheduler(daemon=True)
def test_job():
try:
downloadOrder(1)
# print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
except:
print('--------------------------------------')
print('出现问题啦')
print('重新启动任务…………')
print('--------------------------------------')
scheduler.remove_job('test')
scheduler.start()
# print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
# 定时任务,打印当前的时间
scheduler.add_job(test_job, 'interval', minutes=2, id='test')
# @scheduler.scheduled_job(trigger='interval', seconds=1, id='test')
# scheduler.remove_job('test')
scheduler.start()
我希望忽略错误继续执行定时任务也行。但是很想搞清楚这个错误是怎么来的,怎么解决
看到这个错误,我觉得可能是两个问题导致的:
1、在执行任务的代码中可能存在未捕获的异常,导致任务执行失败并停止。
2、也可能是 downloadOrder 这个函数在执行过程中出现了错误,导致任务执行失败并停止。
为了解决这个问题,可以考虑以下几种方法:
1、在执行任务的代码中,使用 try-except 语句捕获异常,并在 except 块中处理错误。这样就可以在出现错误的情况下继续执行任务。
2、在调用 downloadOrder 函数的时候,使用 try-except 语句捕获异常,并在 except 块中处理错误。这样就可以在出现错误的情况下继续执行任务。
3、在执行任务的代码中,使用 try-except 语句捕获异常,并在 except 块中记录错误日志,方便后续排查问题。这样就可以在出现错误的情况下继续执行任务,并且可以通过日志记录来分析问题。
例如可以将代码修改为如下形式:
import logging
def test_job():
try:
downloadOrder(1)
except Exception as e:
logging.error(e)
print('--------------------------------------')
print('出现问题啦')
print('重新启动任务…………')
print('--------------------------------------')
scheduler.remove_job('test')
scheduler.start()
这样如果在执行任务的过程中出现了错误,就会记录下错误信息,并重新启动任务。
当然,为了更好地排查问题,还可以考虑使用 APScheduler 提供的事件机制,在任务执行成功或出错的时候触发相应的事件,从而可以记录下执行情况,方便后续排查问题。
例如可以在代码中添加如下事件处理函数:
def job_success_handler(event):
if event.exception:
print('任务执行失败')
else:
print('任务执行成功')
def job_error_handler(event):
print('任务执行出错')
scheduler.add_listener(job_success_handler, EVENT_JOB_EXECUTED)
scheduler.add_listener(job_error_handler, EVENT_JOB_ERROR)
这样在任务执行成功或出错的时候,就会触发对应的事件,从而可以记录下执行情况。
望采纳。
捕获错误对象,打印出来看下什么错误
def test_job():
try:
downloadOrder(1)
except Exception as e:
print('出现错误:', e)
望采纳!!点击该回答右侧的“采纳”按钮即可采纳!
我觉得是代码中有一些问题:
1.你可能需要在错误处理代码中加入scheduler.shutdown()函数,来停止定时任务的运行。
2.你的定时任务是每2分钟执行一次,所以你可能需要将scheduler.add_job()函数中的interval参数改为minutes=2。
3.你在scheduler.start()函数中没有加入任何参数,这可能会导致定时任务启动失败。
可以尝试在scheduler.start()函数中加入参数,例如scheduler.start(paused=True)或scheduler.start(blocking=True)。
修改后的代码:
import apscheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from django_apscheduler.jobstores import DjangoJobStore
import time
from shops.views import downloadOrder
scheduler = BackgroundScheduler(daemon=True)
def test_job():
try:
downloadOrder(1)
# print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
except:
print('--------------------------------------')
print('出现问题啦')
print('重新启动任务…………')
print('--------------------------------------')
scheduler.remove_job('test')
scheduler.add_job(test_job, 'interval', minutes=2, id='test')
scheduler.start(blocking=True)
# print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
#定时任务,打印当前的时间
scheduler.add_job(test_job, 'interval', minutes=2, id='test')
scheduler.remove_job('test')
scheduler.start(blocking=True)
提供参考实例【Django定时任务四种实现方法总结】,链接:https://blog.csdn.net/weixin_42782150/article/details/123212604
win系统不支持的 不要费劲了,直接linux。我之前这问题纠结了好久,后来google才发现win系统不支持。
linux里在 Django 中使用 APScheduler 进行定时任务的调度。
安装 Django APScheduler
pip install django-apscheduler
在 Django 项目的 settings.py 文件中添加 django_apscheduler 到 INSTALLED_APPS 中,并配置 Django APScheduler 的参数:
INSTALLED_APPS = [
...
'django_apscheduler',
]
SCHEDULER_CONFIG = {
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///db.sqlite3'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'Asia/Shanghai',
}
在 Django 项目的 urls.py 文件中包含 Django APScheduler 的 URL:
from django.urls import include, path
urlpatterns = [
...
path('django_apscheduler/', include('django_apscheduler.urls')),
]
在 Django 项目的 settings.py 文件中添加 django_apscheduler.schedulers.SchedulerConfig 到 INSTALLED_APPS 中
INSTALLED_APPS = [
...
'django_apscheduler.schedulers.SchedulerConfig',
]
在 Django 项目的 settings.py 文件中添加 django_apscheduler.middleware.SchedulerMiddleware 到 MIDDLEWARE 中
MIDDLEWARE = [
...
'django_apscheduler.middleware
]
Django 应用中创建一个调度器,例如,在 scheduler.py 文件中
from apscheduler.schedulers.background import BackgroundScheduler
scheduler = BackgroundScheduler()
@scheduler.scheduled_job('interval', minutes=1)
def my_job():
print('This job is run every minute.')
scheduler.start()
在 Django 项目的 settings.py 文件中添加调度器的配置
SCHEDULER_CONFIG = {
'apscheduler.jobstores.default': {
'type': 'sqlalchemy',
'url': 'sqlite:///db.sqlite3'
},
'apscheduler.executors.default': {
'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
'max_workers': '20'
},
'apscheduler.job_defaults.coalesce': 'false',
'apscheduler.job_defaults.max_instances': '3',
'apscheduler.timezone': 'Asia/Shanghai',
}
在 Django 项目的 settings.py 文件中添加调度器的启动命令
def start_scheduler():
from .scheduler import scheduler
scheduler.start()
def stop_scheduler():
from .scheduler import scheduler
scheduler.shutdown()
在 Django 项目的 settings.py 文件中添加调度器的启动和停止命令
if not settings.DEBUG:
start_scheduler()
else:
stop_scheduler()
django定时任务django-apscheduler
借鉴下
https://blog.csdn.net/qq_37674086/article/details/107993824