程序架构: django+redis+celery+mysql
前端有任务暂停、任务重启按钮,需要实现celery任务的暂停和暂停后的继续开始功能
您可以将整个操作设计为分为多个chain
任务
一旦有了这样的工作流程,就可以最终定义要在整个工作流程中暂停的点。在上述每一个点上,您都可以检查前端用户是否已请求操作暂停并相应地继续操作
一个复杂且耗时的操作O分为5个chain任务-T1,T2,T3,T4和T5-
这些任务(第一个任务除外)中的每一个都取决于前一个任务的返回值。
假设我们定义了每个任务后要暂停的点,因此工作流看起来像-
from typing import Any, Optional
from celery import shared_task
from celery.canvas import Signature, chain, signature
@shared_task(bind=True)
def pause_or_continue(
self, retval: Optional[Any] = None, clause: dict = None, callback: dict = None
):
# 用于确定是否暂停操作链的任务
if signature(clause)(retval):
# 暂停请求,用retval和剩余的链调用给定的回调
# 由于执行顺序从结束到开始,链条应该颠倒过来
signature(callback)(retval, self.request.chain[::-1])
self.request.chain = None
else:
# 继续下一个任务链
return retval
def tappable(ch: chain, clause: Signature, callback: Signature, nth: Optional[int] = 1):
newch = []
for n, sig in enumerate(ch.tasks):
if n != 0 and n % nth == nth - 1:
newch.append(pause_or_continue.s(clause=clause, callback=callback))
newch.append(sig)
ch.tasks = tuple(newch)
return ch
add_consumer和cancel_consumer远程命令可用于开始/停止使用
或者你用一个编号代替任务是否执行
if IsT1on=1
执行t1
if IsT2on=1
执行t2
https://github.com/ask/celery/commit/03b7a417e86df4a2334382705f17a706163d9704
小弟代码跑步起来了,悬赏送大佬了。小弟去解决代码的问题了。。