芹菜任务路线工作不正常

问题描述:

我在练芹菜,我想我分配任务特定的队列但预期它不工作芹菜任务路线工作不正常

__init__.py

import os 
import sys 
from celery import Celery 

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) 

sys.path.append(CURRENT_DIR) 

app = Celery() 
app.config_from_object('celery_config') 

celery_config.py

amqp = 'amqp://guest:[email protected]:5672//' 
broker_url = amqp 
result_backend = amqp 

task_routes = ([ 
    ('import_feed', {'queue': 'queue_import_feed'}) 
]) 

tasks.py

from . import app 

@app.task(name='import_feed') 
def import_feed(): 
    pass 

我如何运行我的工人:

celery -A subscriber1.tasks worker -l info 

我的客户__init__.py

import os 
import sys 
from celery import Celery 

CURRENT_DIR = os.path.dirname(os.path.abspath(__file__)) 

sys.path.append(CURRENT_DIR) 

app = Celery() 
app.config_from_object('celery_config') 

我的客户的celery_config.py

from kombu.common import Broadcast 

amqp = 'amqp://guest:[email protected]:5672//' 
BROKER_URL = amqp 
CELERY_RESULT_BACKEND = amqp 

然后在我的客户的壳我想:

from publisher import app 
result = app.send_task('import_feed') 

然后我的工人拿到的任务?我期望的不应该是因为我将它分配给了特定的队列。我在客户端尝试下面的命令并没有任务已经被我的工作人员接收到我期待已收到,而不是在第一个

result = app.send_task('import_feed', queue='queue_import_feed') 

好像我误解路由部分的东西。但我真正想要的是import_feed任务,只有在发送任务时指定queue_import_feed队列时才能运行

您可以更改工作程序处理的默认队列。

app.send_task('import_feed')将任务发送到celery队列。

app.send_task('import_feed', queue='queue_import_feed')将任务发送到queue_import_feed,但您的工作人员只处理celery队列中的任务。

处理特定队列,使用-Q开关

celery -A subscriber1.tasks worker -l info -Q 'queue_import_feed' 

编辑

为了将限制在send_task使得工人反应import_feed任务,只有当它与队列发布,您需要覆盖send_taskCelery,并提供AMQP的定制设置为None

反应器。PY

from celery.app.amqp import AMQP 
from celery import Celery 

class MyCelery(Celery): 
    def send_task(self, name=None, args=None, kwargs=None, **options): 
     if 'queue' in options: 
      return super(MyCelery, self).send_task(name, args, kwargs, **options) 


class MyAMQP(AMQP): 
    default_queue = None 

celery_config.py

from kombu import Exchange, Queue 

... 

task_exchange = Exchange('default', type='direct') 
task_create_missing_queues = False 

task_queues = [ 
    Queue('feed_queue', task_exchange, routing_key='feeds'), 
] 

task_routes = { 
    'import_feed': {'queue': 'feed_queue', 'routing_key': 'feeds'} 
} 

__init__.py

celeree = MyCelery(amqp='reactor.MyAMQP') 
+0

不好意思,但真的没有回答这个问题。我知道可以这样做。我只是想让我的最后一个陈述发生 –

+0

您的最后一个陈述是要求只有在为其指定队列时才运行特定任务。您希望禁用[自动路由](http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-create-missing-queues),以便像这样的任务不会在'芹菜队队列 –

+0

准确地说,我想要的只是运行'celery -A subscriber1.tasks worker -l info'不指定任何运行worker的队列,并且会限制'import_feed'任务在任务发布'queue_import_feed'队列。这不可能吗? –