celery任务调度框架实践
celery任务调度框架实践
celery架构图:
Celery Beat:任务调度器,Beat进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
Celery Worker:执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。
Broker:消息代理,或者叫作消息中间件,接受任务生产者发送过来的任务消息,存进队列再按序分发给任务消费方(通常是消息队列或者数据库)。
Producer:调用了Celery提供的API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery默认已支持Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy等方式。
初始化:
1.运行celery
mac环境下的celery一下载就能直接运行。
但是centos6.9 环境下好像下载之后并不能直接运行,估计是环境变量没有配好。
需要加上celery的安装路径。
使用/usr/local/bin/celery -A your_app worker --loglevel=info
2.以守护进程运行celery:
需要一个初始化脚本:celeryd
使用方法:/etc/init.d/celeryd {start|stop|restart|status}
配置文件:/etc/default/celeryd
参考:
3.使用redis:
mac下安装redis
- brew install redis
- 如果需要后台运行 redis 服务,使用命令 brew services start redis
- 如果不需要后台服务,则使用命令 redis-server /usr/local/etc/redis.conf。
- mac os 安装 redis
安装redis后,启动时指定配置文件
redis-server ./redis.conf
检测后台进程是否存在
ps -ef |grep redis
使用分布式时,其他worker机子无法访问redis去取任务:
- redis默认的安全策略,只准许本地访问。
- 需要通过简单配置,完成允许外网访问。
- 参考:开启redis 允许外网IP 访问
celery基本操作命令:
参考:
celery worker:
1.启动worker:
export PYTHONOPTIMIZE=1 && /usr/local/bin/celery -A your_app worker --loglevel=debug --workdir=/your_dir/your_dir/your_dir/
2.停止worker:
ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
3.在celery中使用多进程:
我用的是from multiprocessing import Pool
来实现多进程。
但是在运行过程中会直接报出这个错误 AssertionError: daemonic processes are not allowed to have children
解决方法:
重写一个Mypool
https://stackoverflow.com/questions/6974695/python-process-pool-non-daemonic
设置环境变量
export PYTHONOPTIMIZE=1
由于过段时间就会失效。。所以每次启动worker的时候的时候: export PYTHONOPTIMIZE=1 && /usr/local/bin/celery -A your_app worker --loglevel=debug --workdir=/your_dir/your_dir/your_dir/
以下 的方法没试过:
there are two method to solve this problem ,disable assert:
1.where celery starts set export PYTHONOPTIMIZE=1 OR start celery with this parameter -O OPTIMIZATION
2.disable python packet multiprocessing process.py line 102:
assert not _current_process._config.get(‘daemon’), \ ‘daemonic processes are not allowed to have children’
4.调用worker、添加任务:
delay()和apply_async()
我们之前调用任务使用了”delay()”方法,它其实是对”apply_async()”方法的封装,使得你只要传入任务所需的参数即可。对于特殊的任务调度需求,你需要使用”apply_async()”,其常用的参数有:
- countdown: 指定多少秒后任务才被执行
- eta: 指定任务被调度的时间,参数类型是datetime
- expires: 任务过期时间,参数类型可以是int(秒),也可以是datetime
- retry: 任务发送失败的重试次数
- priority: 任务优先级,范围是0-9
- serializer: 参数和返回值的序列化方式
celery beat 定时任务
一个时间的Bug:
- 当前使用pip安装的celery,默认是安装的最新版本4.1.0,但是在这个版本中在获取当前时间的逻辑中存在bug,会导致定时任务配置后并不能在指定的时间被执行
- 回退版本 到4.0.2才行
- celery 4.1.0 版本定时任务执行时间 bug
另一种可行但麻烦的思路:
celery的定时任务会有一定时间的延迟。比如,我规定模拟登陆新浪微博任务每隔10个小时执行一次,那么定时任务第一次执行就会在开启定时任务之后的10个小时后才会执行。而我抓取微博需要马上执行,需要带上cookie,所以不能等那1个小时。这个没有一个比较好的解决方法,可以使用celery的crontab()来代替schdule做定时,它会在启动的时候就执行。我采用的方法是第一次手动执行该任务,然后再通过schedule执行。
使用命令:
进入到对应your_app对应的目录下:cd /your_dir/your_dir/your_dir
再执行:/usr/local/bin/celery -A your_app beat -l info
最方便的是,在命令中指定工作目录,一条命令即可:
/usr/local/bin/celery -A your_dir beat -l info --workdir=/your_dir/your_dir/your_dir/
定时任务参数参考表:
动态管理定时任务:
查过挺多资料,有两种解决方法。跟celery运行的调度器(schedule)息息相关的。
- 使用第三方schedule:
如django-celery-beat库会将定时任务的规则存入到数据库中,而不用通过配置文件来定义。
try to install django-celery instead of django-celery-beat. django-celery works with Celery 3 (unlike django-celery-beat). You can then, for example import PeriodicTask from djcelery.models instead of from django_celery_beat.models . This allows you to add/delete/manipulate tasks both dynamically AND PROGRAMMATICALLY (not only from the Django admin site). The drawback to this workaround is that if one doesn’t need django-celery for anything other than this, then it bloats one’s app. Thus, It would be better to have Celery 4.0 included in cookiecutter-django so that django-celery-beat models can be used
- 使用框架默认schedule
我使用的是这一种方法。
只能管理celery的配置文件了,每次增加或减少定时任务的时候,都要对配置文件进行相应的修改。
每次修改完都要重启celerybeat,试过多种方法,发现用supervisor来管理celery beat的进程是比较好的。
管理celery beat进程
用supervisor来管理celery beat进程。
安装:
pip install supervisor
-
Supervisor配置
/usr/local/bin/echo_supervisord_conf > /etc/supervisord.conf
-
操作:
- 开启:
supervisord -c /etc/supervisord.conf
- 重启:
supervisorctl -c /etc/supervisord.conf reload
- 关闭:
supervisorctl -c /etc/supervisord.conf shutdown
- 开启:
other:
分布式:
队列:
flower:
图形化管理celery界面:
/usr/local/bin/celery -A celery_app flower --port=5555