Celery技术预研
Celery技术预研
1. 简介
Celery是一个专注于实时处理和任务调度的分布式任务队列。
使用场景
-
异步任务
当用户触发的一个操作需要较长时间才能执行完成时,可以把它作为任务交给Celery去异步执行,执行完再返回给用户。这段时间用户不需要等待,提高了网站的整体吞吐量和响应时间。
-
定时任务
生产环境经常会跑一些定时任务。假如你有上千台的服务器、上千种任务,定时任务的管理很困难,Celery可以帮助我们快速在不同的机器设定不同种任务。
2. 特性
Celery还提供了如下的特性
-
提供任务服务及任务监控功能,需安装flower插件
-
强大的工作流功能
-
可选prefork、线程、协程(gevent,eventlet)三种模式并发执行
-
支持多种消息代理(RabbitMQ, Redis,)和存储后端(AMQP, Redis,memcached,,MongoDB,SQLAlchemy,Django ORM)
3. 架构图
4. 安装
$ pip install -v celery==3.1.25
#celery最新版本是4.1.0但是因为4以上的版本需在python2.7以上版本运行固我们安装3.1.25
$pip install –v redis==3.0.504
#因为celery需消息队列的支持,这里我们使用redis
$pip install –v flower==0.9.1
#如果需要开启对celery的监控需安装测插件,该插件是具有tornado框架开发的一个应用,flower 0.9.2版本及以后
#的版本需在python2.7及以上版本的支持。
$pip install -v django-celery==3.1.9
#如果需要celery任务的后台管理功能需安装此插件
5. 消息交换的方式
Celery的任务分发跟任务发送的Queue、routing_key、Exchange三个相关。
-
Direct
直接交换方式,这种方式任务必须和routing_key一样一样的。
消息消费的流程图如下
-
Topic:主题交换方式,消息通过routing_key 匹配的方式分发到对应的队列,也就是一个消息可以投递到多个匹配的队列中。
#如有两个队列
Queue('topic_queue1', topic_exchange, routing_key='usa.#')
Queue('topic_queue2', topic_exchange, routing_key='#.news')
#投递任务
r=add.apply_async(args=[1440,900], queue='topic_queue2', routing_key='usa.news')
这个任务会投递到这两个队列中进行消息。
如下图:
-
Fanout:广播交换方式,只用是该类型的队列都会收到消息,一个任务可以被多次消费。
6. Djcelery应用配置
import djcelery
from kombu import Queue,Exchange
djcelery.setup_loader()
USE_TZ = False
BROKER_URL = 'redis://:@127.0.0.1:6379/10'
BROKER_TRANSPORT = 'redis'
CELERY_RESULT_BACKEND = 'redis://:@127.0.0.1:6379/10'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_ACCEPT_CONTENT = ['json'] # 指定接受的内容类型
CELERY_RESULT_SERIALIZER = 'json' #读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
CELERY_TASK_SERIALIZER = None
CELERY_TRACK_STARTED = False
CELERY_TIMEZONE = TIME_ZONE
CELERY_ENABLE_UTC = False # 非UTC时间必须设置否则会影响定时任务
7. 部署运行
-
Worker启动
$ celery -A demo1.myapp worker -l info --workdir="E:\python_works\project\" --loglevel=debug --logfile="D:/tmp/celery.log"
参数介绍:
--loglevel 日志级别
--logfile 日志文件名
--workdir 项目目录
-A 应用包路径,如果是celery命名不需要myapp
--Q 队列名可以指定多个
-
Beat 启动:启动检测定时任务的脚本
$ celery beat -A demo1.myapp --workdir="E:/python_works/demo1/" --loglevel=debug --logfile="D:/tmp/celery_beat.log"
Flower:启动服务监控后台
$ celery flower -A demo1.myapp --broker=redis://:@127.0.0.1:6379/10 --workdir="E:/python_works/demo1/"
#默认绑定127.0.0.1:5555 端口
-
效果如下
-
-
-
djcelery:此插件是基于django框架,安装完之后需初始化数据库
$ python manage.py migrate djcelery #django 1.7以后
$ python manage.py syncdb # django 1.7之前
基于django创建一个应用并且配置要celery,进入项目目录
python manage.py runserver
启动django应用
效果如下