在Flask中成功提交后执行celery任务?
问题描述:
将相对长时间运行的任务委托给另一台服务器上的芹菜工作人员,这些工作人员正在单独运行。在Flask中成功提交后执行celery任务?
但是,结果被添加回关系数据库(根据task_descr.id
作为关键字更新表,见下文),工作人员使用ignore_result
。
任务从瓶的应用要求:
task = app.celery.send_task('tasks.mytask', [task_descr.id, attachments])
的问题是,被请求的任务,而交易尚未烧瓶上侧封闭。这会导致竞争状态,因为有时候芹菜工作者会在Flask应用程序的事务结束之前完成任务。
成功事务后发送任务的正确方法是什么?
还是应该在工作人员检查task_descr.id
可用性,然后尝试条件UPDATE
并重试该任务(这种感觉过于复杂)?
对Run function after a certain type of model is committed的回复讨论了类似的情况,但这里任务发送是明确的,所以不需要在某些模型中侦听更新/插入。
答
的方法之一,是Per-Request After-Request Callbacks,由于阿明Ronacher:
from flask import g
def after_this_request(func):
if not hasattr(g, 'call_after_request'):
g.call_after_request = []
g.call_after_request.append(func)
return func
@app.after_request
def per_request_callbacks(response):
for func in getattr(g, 'call_after_request',()):
response = func(response)
return response
在我的情况的用法是:
@after_this_request
def send_mytask(response):
if response.status_code in {200, 302}:
task = app.celery.send_task('tasks.mytask', [task_descr.id, attachments])
return response
不理想,但似乎工作。我的任务仅限于成功提供服务的请求,所以我不在乎500或其他错误条件。