Django apscheduler 定时任务执行的时候报错

Django 后端运行定时任务不断的通过淘宝api获取订单数据,但是运行过程中,容易报错,然后卡死不继续执行定时任务了

错误信息提示如下图:

img

img

订单任务的代码是:

import apscheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from django_apscheduler.jobstores import DjangoJobStore
import time
from shops.views import downloadOrder

scheduler = BackgroundScheduler(daemon=True)

def test_job():
    try:
        downloadOrder(1)
        # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
    except:
        print('--------------------------------------')
        print('出现问题啦')
        print('重新启动任务…………')
        print('--------------------------------------')
        scheduler.remove_job('test')
        scheduler.start()
    # print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))

# 定时任务,打印当前的时间
scheduler.add_job(test_job, 'interval', minutes=2, id='test')
# @scheduler.scheduled_job(trigger='interval', seconds=1, id='test')

# scheduler.remove_job('test')
scheduler.start()

我希望忽略错误继续执行定时任务也行。但是很想搞清楚这个错误是怎么来的,怎么解决

看到这个错误,我觉得可能是两个问题导致的:

1、在执行任务的代码中可能存在未捕获的异常,导致任务执行失败并停止。

2、也可能是 downloadOrder 这个函数在执行过程中出现了错误,导致任务执行失败并停止。

为了解决这个问题,可以考虑以下几种方法:

1、在执行任务的代码中,使用 try-except 语句捕获异常,并在 except 块中处理错误。这样就可以在出现错误的情况下继续执行任务。

2、在调用 downloadOrder 函数的时候,使用 try-except 语句捕获异常,并在 except 块中处理错误。这样就可以在出现错误的情况下继续执行任务。

3、在执行任务的代码中,使用 try-except 语句捕获异常,并在 except 块中记录错误日志,方便后续排查问题。这样就可以在出现错误的情况下继续执行任务,并且可以通过日志记录来分析问题。

例如可以将代码修改为如下形式:

import logging


def test_job():
    try:
        downloadOrder(1)
    except Exception as e:
        logging.error(e)
        print('--------------------------------------')
        print('出现问题啦')
        print('重新启动任务…………')
        print('--------------------------------------')
        scheduler.remove_job('test')
        scheduler.start()

这样如果在执行任务的过程中出现了错误,就会记录下错误信息,并重新启动任务。

当然,为了更好地排查问题,还可以考虑使用 APScheduler 提供的事件机制,在任务执行成功或出错的时候触发相应的事件,从而可以记录下执行情况,方便后续排查问题。

例如可以在代码中添加如下事件处理函数:

def job_success_handler(event):


if event.exception:
    print('任务执行失败')
else:
    print('任务执行成功')


def job_error_handler(event):


    print('任务执行出错')

    scheduler.add_listener(job_success_handler, EVENT_JOB_EXECUTED)
    scheduler.add_listener(job_error_handler, EVENT_JOB_ERROR)

这样在任务执行成功或出错的时候,就会触发对应的事件,从而可以记录下执行情况。
望采纳。

捕获错误对象,打印出来看下什么错误

def test_job():
    try:
        downloadOrder(1)
    except Exception as e:
        print('出现错误:', e)


望采纳!!点击该回答右侧的“采纳”按钮即可采纳!
我觉得是代码中有一些问题:

1.你可能需要在错误处理代码中加入scheduler.shutdown()函数,来停止定时任务的运行。
2.你的定时任务是每2分钟执行一次,所以你可能需要将scheduler.add_job()函数中的interval参数改为minutes=2。
3.你在scheduler.start()函数中没有加入任何参数,这可能会导致定时任务启动失败。
可以尝试在scheduler.start()函数中加入参数,例如scheduler.start(paused=True)或scheduler.start(blocking=True)。
修改后的代码:

import apscheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from django_apscheduler.jobstores import DjangoJobStore
import time
from shops.views import downloadOrder

scheduler = BackgroundScheduler(daemon=True)

def test_job():
try:
downloadOrder(1)
# print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))
except:
print('--------------------------------------')
print('出现问题啦')
print('重新启动任务…………')
print('--------------------------------------')
scheduler.remove_job('test')
scheduler.add_job(test_job, 'interval', minutes=2, id='test')
scheduler.start(blocking=True)
# print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))

#定时任务,打印当前的时间
scheduler.add_job(test_job, 'interval', minutes=2, id='test')

scheduler.remove_job('test')
scheduler.start(blocking=True)

提供参考实例【Django定时任务四种实现方法总结】,链接:https://blog.csdn.net/weixin_42782150/article/details/123212604

win系统不支持的 不要费劲了,直接linux。我之前这问题纠结了好久,后来google才发现win系统不支持。

linux里在 Django 中使用 APScheduler 进行定时任务的调度。

安装 Django APScheduler

pip install django-apscheduler

在 Django 项目的 settings.py 文件中添加 django_apscheduler 到 INSTALLED_APPS 中,并配置 Django APScheduler 的参数:

INSTALLED_APPS = [
    ...
    'django_apscheduler',
]

SCHEDULER_CONFIG = {
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///db.sqlite3'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'Asia/Shanghai',
}

在 Django 项目的 urls.py 文件中包含 Django APScheduler 的 URL:

from django.urls import include, path

urlpatterns = [
    ...
    path('django_apscheduler/', include('django_apscheduler.urls')),
]

在 Django 项目的 settings.py 文件中添加 django_apscheduler.schedulers.SchedulerConfig 到 INSTALLED_APPS 中

INSTALLED_APPS = [
    ...
    'django_apscheduler.schedulers.SchedulerConfig',
]

在 Django 项目的 settings.py 文件中添加 django_apscheduler.middleware.SchedulerMiddleware 到 MIDDLEWARE 中

MIDDLEWARE = [
    ...
    'django_apscheduler.middleware
]

Django 应用中创建一个调度器,例如,在 scheduler.py 文件中


from apscheduler.schedulers.background import BackgroundScheduler

scheduler = BackgroundScheduler()

@scheduler.scheduled_job('interval', minutes=1)
def my_job():
    print('This job is run every minute.')

scheduler.start()

在 Django 项目的 settings.py 文件中添加调度器的配置

SCHEDULER_CONFIG = {
    'apscheduler.jobstores.default': {
        'type': 'sqlalchemy',
        'url': 'sqlite:///db.sqlite3'
    },
    'apscheduler.executors.default': {
        'class': 'apscheduler.executors.pool:ThreadPoolExecutor',
        'max_workers': '20'
    },
    'apscheduler.job_defaults.coalesce': 'false',
    'apscheduler.job_defaults.max_instances': '3',
    'apscheduler.timezone': 'Asia/Shanghai',
}


在 Django 项目的 settings.py 文件中添加调度器的启动命令

def start_scheduler():
    from .scheduler import scheduler
    scheduler.start()

def stop_scheduler():
    from .scheduler import scheduler
    scheduler.shutdown()


在 Django 项目的 settings.py 文件中添加调度器的启动和停止命令


if not settings.DEBUG:
    start_scheduler()
else:
    stop_scheduler()

django定时任务django-apscheduler
借鉴下
https://blog.csdn.net/qq_37674086/article/details/107993824