芹菜和弦在所有任务完成之前执行
问题描述:
我有一堆可以同时执行的任务,但是一旦一切就绪,我想执行最后一个任务。我使用下面的代码:芹菜和弦在所有任务完成之前执行
chunk_tasks = []
for index, chunk in enumerate(chunks):
chunk_tasks.append(import_chunk.s(meta.pk))
g = group(chunk_tasks)
chord(g)(import_completed.s(meta.pk, max_lines=max_lines))
但是它看起来像完成所有任务之前import_completed
执行。 import_chunk
任务看起来像:
@task(bind=True, ignore_result=IGNORE_RESULTS)
def import_chunk(self, meta_pk):
try:
# do some stuff
except Exception, e:
if self.max_retries == self.request.retries:
logger.exception('Unexpected error in import_chunk')
raise self.retry(countdown=60, max_retries=3)
所以问题是我在做什么错了?
答
和弦是只有在组中的所有任务都执行完毕后才执行的任务。所以,它的需要在其头部的任务状态进行同步。
但是,当您将ignore_result
设置为task
时,工作人员将不存储任务状态并返回此任务的值。
这将导致根据您的工作流程重试任务或抛出异常或任何故障。
所以,chord(add.s(i, i) for i in range(10))(tsum.s()).get()
是完全有效的,并给出结果的情形1,但它给出了CASE 2.一些麻烦
案例1:
@app.task
def add(x, y):
return x + y
@app.task
def tsum(numbers):
return sum(numbers)
案例2:
@app.task(ignore_result=True)
def add(x, y):
return x + y
@app.task(ignore_result=True)
def tsum(numbers):
return sum(numbers)
所以,你必须改变ignore_result
或改变窝你的任务的流程。
从文档:
您应该避免使用和弦尽可能。尽管如此,和弦仍然是工具箱中强大的基础,因为同步是许多并行算法的必需步骤。
是的,我已经偶然发现了关于ignore_result的问题,它被设置为False。 – 2014-09-27 08:13:33