celery异步任务控制

程序架构: django+redis+celery+mysql

前端有任务暂停、任务重启按钮,需要实现celery任务的暂停和暂停后的继续开始功能

 

您可以将整个操作设计为分为多个chain任务

一旦有了这样的工作流程,就可以最终定义要在整个工作流程中暂停的点。在上述每一个点上,您都可以检查前端用户是否已请求操作暂停并相应地继续操作

一个复杂且耗时的操作O分为5个chain任务-T1,T2,T3,T4和T5-

这些任务(第一个任务除外)中的每一个都取决于前一个任务的返回值。

假设我们定义了每个任务后要暂停的点,因此工作流看起来像-

  • T1执行
  • T1完成,请检查用户是否已请求暂停
    • 如果用户未请求暂停-继续
    • 如果用户已经请求暂停,序列化的剩余工作流程链,并存储在某个地方后继续
from typing import Any, Optional

from celery import shared_task
from celery.canvas import Signature, chain, signature

@shared_task(bind=True)
def pause_or_continue(
    self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
):
    # 用于确定是否暂停操作链的任务
    if signature(clause)(retval):
        # 暂停请求,用retval和剩余的链调用给定的回调
        # 由于执行顺序从结束到开始,链条应该颠倒过来
        signature(callback)(retval, self.request.chain[::-1])
        self.request.chain = None
    else:
        # 继续下一个任务链
        return retval


def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
   
    newch = []
    for n, sig in enumerate(ch.tasks):
        if n != 0 and n % nth == nth - 1:
            newch.append(pause_or_continue.s(clause=clause, callback=callback))
        newch.append(sig)
    ch.tasks = tuple(newch)
    return ch

add_consumer和cancel_consumer远程命令可用于开始/停止使用

或者你用一个编号代替任务是否执行

if IsT1on=1

执行t1

if IsT2on=1

执行t2

https://github.com/ask/celery/commit/03b7a417e86df4a2334382705f17a706163d9704

小弟代码跑步起来了,悬赏送大佬了。小弟去解决代码的问题了。。