如何判断任务是否已在django-celery中排队?

问题描述:

这里是我的设置:如何判断任务是否已在django-celery中排队?

  • 的Django 1.3
  • 芹菜2.2.6
  • Django的芹菜2.2.4
  • djkombu 0.9.2

在我的settings.py文件我有

BROKER_BACKEND = "djkombu.transport.DatabaseTransport" 

即我只是usi使数据库排队任务。

现在到我的问题:我有一个用户启动的任务,可能需要几分钟才能完成。我希望任务只能为每个用户运行一次,并且我会将任务的结果缓存到一个临时文件中,这样如果用户再次启动任务,我只需返回缓存的文件。我在我的视图函数中看起来像这样的代码:

task_id = "long-task-%d" % user_id 
result = tasks.some_long_task.AsyncResult(task_id) 

if result.state == celery.states.PENDING: 
    # The next line makes a duplicate task if the user rapidly refreshes the page 
    tasks.some_long_task.apply_async(task_id=task_id) 
    return HttpResponse("Task started...") 
elif result.state == celery.states.STARTED: 
    return HttpResponse("Task is still running, please wait...") 
elif result.state == celery.states.SUCCESS: 
    if cached_file_still_exists(): 
     return get_cached_file() 
    else: 
     result.forget() 
     tasks.some_long_task.apply_async(task_id=task_id) 
     return HttpResponse("Task started...") 

此代码几乎可用。但是当用户快速重新加载页面时,我遇到了一个问题。任务排队和任务最终从队列中取出并发给工作人员之间的延迟为1-3秒。在此期间,任务的状态保持为PENDING,这会导致视图逻辑启动重复的任务。

我需要的是某种方式来判断任务是否已经提交给队列,所以我最终没有提交两次。芹菜中有这样做的标准方法吗?

+0

可以'kick_off_the_long_task_again()'检查以确保任务移出待定?如果是这样,这可能足以延迟用户和芹菜之间的竞争条件。 – 2011-05-04 19:19:54

+0

kick_off_the_long_task_again()不会导致重复的任务。我更新了我的示例以显示代码将执行重复任务的位置。 – cwick 2011-05-04 19:34:00

+0

这不是我的问题。可以'kick_off_the_long_task_again()'检查并等待确定任务在完成之前移出待定吗? – 2011-05-04 19:42:09

您可以通过手动将结果存储在数据库中来作弊。让我解释这将如何帮助。

例如,如果使用RDBMS(表列 - TASK_ID,状态,结果):

查看部分:

  1. 使用事务管理。
  2. 使用SELECT FOR UPDATE获取行,其中task_id ==“long-task-%d”%user_id。 SELECT FOR UPDATE将阻止其他请求,直到这一个COMMIT或ROLLBACK。
  3. 如果它不存在 - 将状态设置为PENDING并启动'some_long_task',则结束请求。
  4. 如果状态为PENDING - 通知用户。
  5. 如果状态为SUCCESS - 将状态设置为PENDING,则启动该任务,返回'result'列指向的文件。我基于这个假设,你想重新运行获得结果的任务。 COMMIT
  6. 如果状态为ERROR - 将状态设置为PENDING,请启动任务,通知用户。 COMMIT

任务部分:

  1. 准备文件,在尝试,catch块包裹。
  2. 成功 - 更新状态为= SUCCESS的正确行,结果。
  3. 失败时 - 使用state = ERROR更新正确的行。

我用Redis解决了这个问题。只需在每个任务的redis中设置一个密钥,然后在任务的after_return方法中从redis中删除密钥。 Redis重量轻,速度快。

我不认为(正如Tomek和其他人所建议的那样)使用数据库的方式是执行此锁定。 Django具有内置的缓存框架,它应该足以完成这种锁定,并且必须更快。请参阅:

http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html#cookbook-task-serial

Django的可以配置为使用memcached作为缓存后端,这样就可以在多台机器......这似乎是更好的我被分配。思考?

+0

一个美丽的解决方案,正是我在找的东西。感谢您的链接! – 2014-09-18 15:32:05