如何判断任务是否已在django-celery中排队?
这里是我的设置:如何判断任务是否已在django-celery中排队?
- 的Django 1.3
- 芹菜2.2.6
- Django的芹菜2.2.4
- djkombu 0.9.2
在我的settings.py文件我有
BROKER_BACKEND = "djkombu.transport.DatabaseTransport"
即我只是usi使数据库排队任务。
现在到我的问题:我有一个用户启动的任务,可能需要几分钟才能完成。我希望任务只能为每个用户运行一次,并且我会将任务的结果缓存到一个临时文件中,这样如果用户再次启动任务,我只需返回缓存的文件。我在我的视图函数中看起来像这样的代码:
task_id = "long-task-%d" % user_id
result = tasks.some_long_task.AsyncResult(task_id)
if result.state == celery.states.PENDING:
# The next line makes a duplicate task if the user rapidly refreshes the page
tasks.some_long_task.apply_async(task_id=task_id)
return HttpResponse("Task started...")
elif result.state == celery.states.STARTED:
return HttpResponse("Task is still running, please wait...")
elif result.state == celery.states.SUCCESS:
if cached_file_still_exists():
return get_cached_file()
else:
result.forget()
tasks.some_long_task.apply_async(task_id=task_id)
return HttpResponse("Task started...")
此代码几乎可用。但是当用户快速重新加载页面时,我遇到了一个问题。任务排队和任务最终从队列中取出并发给工作人员之间的延迟为1-3秒。在此期间,任务的状态保持为PENDING,这会导致视图逻辑启动重复的任务。
我需要的是某种方式来判断任务是否已经提交给队列,所以我最终没有提交两次。芹菜中有这样做的标准方法吗?
您可以通过手动将结果存储在数据库中来作弊。让我解释这将如何帮助。
例如,如果使用RDBMS(表列 - TASK_ID,状态,结果):
查看部分:
- 使用事务管理。
- 使用SELECT FOR UPDATE获取行,其中task_id ==“long-task-%d”%user_id。 SELECT FOR UPDATE将阻止其他请求,直到这一个COMMIT或ROLLBACK。
- 如果它不存在 - 将状态设置为PENDING并启动'some_long_task',则结束请求。
- 如果状态为PENDING - 通知用户。
- 如果状态为SUCCESS - 将状态设置为PENDING,则启动该任务,返回'result'列指向的文件。我基于这个假设,你想重新运行获得结果的任务。 COMMIT
- 如果状态为ERROR - 将状态设置为PENDING,请启动任务,通知用户。 COMMIT
任务部分:
- 准备文件,在尝试,catch块包裹。
- 成功 - 更新状态为= SUCCESS的正确行,结果。
- 失败时 - 使用state = ERROR更新正确的行。
我用Redis解决了这个问题。只需在每个任务的redis中设置一个密钥,然后在任务的after_return方法中从redis中删除密钥。 Redis重量轻,速度快。
我不认为(正如Tomek和其他人所建议的那样)使用数据库的方式是执行此锁定。 Django具有内置的缓存框架,它应该足以完成这种锁定,并且必须更快。请参阅:
http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial
Django的可以配置为使用memcached
作为缓存后端,这样就可以在多台机器......这似乎是更好的我被分配。思考?
一个美丽的解决方案,正是我在找的东西。感谢您的链接! – 2014-09-18 15:32:05
可以'kick_off_the_long_task_again()'检查以确保任务移出待定?如果是这样,这可能足以延迟用户和芹菜之间的竞争条件。 – 2011-05-04 19:19:54
kick_off_the_long_task_again()不会导致重复的任务。我更新了我的示例以显示代码将执行重复任务的位置。 – cwick 2011-05-04 19:34:00
这不是我的问题。可以'kick_off_the_long_task_again()'检查并等待确定任务在完成之前移出待定吗? – 2011-05-04 19:42:09