Celery 是一个强大的 分布式任务队列 的 异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。
Celery的核心模块和架构
Celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
Celery 主要包含以下几个模块:
任务模块 Task
包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
消息中间件 Broker
Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
任务执行单元 Worker
Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
Beat
定时任务调度器,根据配置定时将任务发送给Broler
任务结果存储 Backend
Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, redis 和 MongoDB 等。
综上celery总结如下:celery 是一个处理大量消息的分布式系统,能异步任务、定时任务,使用场景一般用于耗时操作的多任务或者定时性的任务。
celery 分布式脚本架构图
常见使用场景
使用Celery实现异步任务
创建Celery实例
启动Celery Worker,通过delay()或者apply_async()将任务发布到broker
应用程序调用异步任务
存储结果 Celery Beat: 任务调度器,Beat进程会读取配置文件的内容,周期性的将配置中到期需要执行的任务发送给任务队列
使用Celery定时任务
创建Celery实例
配置文件中配置任务,发送任务celery -A xxx beat
启动Celery Worker celery -A xxx worker -l info -P eventlet
存储结果
celery安装与使用
版本选择
celery 3.1.26.post2
django-celery 3.3.0
django-redis 4.10.0
redis 2.10.6
特殊说明:redis版本过高会报错,具体错误信息为:
1 2
return iter(x.items()) AttributeError: 'str' object has no attribute 'items'
安装命令
因为是集成在django项目中,所以需要往外安装django-celery,当然也可以不安装。
1 2 3 4
pip3 install celery==3.1.26.post2 pip3 install django-redis==4.11.0 pip3 install django-celery==3.3.1 pip3 install redis==2.10.6
django 进行注册
在django的settings文件中将djcelery进行注册:
1 2 3 4 5 6 7 8 9 10
INSTALLED_APPS = [ 'django.contrib.admin' , 'django.contrib.auth' , 'django.contrib.contenttypes' , 'django.contrib.sessions' , 'django.contrib.messages' , 'django.contrib.staticfiles' , #注册 'djcelery' , ]
代码配置目录
1 2 3 4 5 6 7 8 9 10 11 12
任务所在目录 ├── celery_tasss # celery包 如果celery_task只是建了普通文件夹__init__可以没有,如果是包一定要有 │ ├── __init__.py # 包文件 看情况要不要存在 │ ├── celery.py # celery连接和配置相关文件,且名字必须交celery.py,其实也不是必须的不然你指令可能要修改 │ └── tasks.py # 所有任务函数 ├── celery_tasks │ ├── celeryconfig.py # celery 实例 │ ├── celery.py # celery 配置相关文件 │ ├── __init__.py # 包文件,看情况要不要存在 │ └── tasks.py # 所有任务函数
配置 celeryconfig.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
from kombu import Queue, Exchange BROKER_URL = 'redis://127.0.0.1:6379/7' CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/8' CELERY_IMPORTS = ( 'celery_tasks.tasks', ) # 任务序列化和反序列化 pickle CELERY_TASK_SERIALIZER = 'json' # 结果序列化格式,默认为pickle CELERY_RESULT_SERIALIZER = 'json' # 指定接受的内容类型(默认为允许所有格式) ['pickle', 'json', 'msgpack', 'yaml'] CELERY_ACCEPT_CONTENT = ['json'] # 启动时区设置 CELERY_ENABLE_UTC = True # 设置时区 CELERY_TIMEZONE = 'Asia/Shanghai' # 任务结果过期时间 CELERY_TASK_RESULT_EXPIRES = 60 * 60 * 24 # 任务过期时间,600s后结果结束 # CELERY_TASK_TIME_LIMIT= 10 * 60 # 并发的worker数量,也是命令行-c指定的数目,worker数量不是越多越好,保证任务不堆积,加上一些新增任务的预留就可以了 CELERYD_CONCURRENCY = 10 # celery worker 每次去BROKER中预取任务的数量 CELERYD_PREFETCH_MULTIPLIER = 4 # 每个worker最多执行1000个任务就会被销毁,可防止内存泄露 CELERYD_MAX_TASKS_PER_CHILD = 1000 # 任务过期时间,celery任务执行结果的超时时间 CELERY_TASK_RESULT_EXPIRES = 24 * 60 * 60 # Worker在任务执行完后才向Broker发送acks,告诉队列这个任务已经处理了,可靠性较强,但也可能出现重复执行 CELERY_ACKS_LATE = True # 设置默认的队列名称,未指定的情况下都会放入默认队列中 CELERY_DEFAULT_QUEUE = "default" # 再不关注结果的情况下,可以设置成True,忽略结果上传broker # CELERY_IGNORE_RESULT = True CELERY_QUEUES = ( Queue('default', exchange=Exchange('default'), routing_key='default'), Queue('for_email', exchange=Exchange('for_email'), routing_key='for_email'), ) CELERY_ROUTES = { 'celery_tasks.tasks.send_mail_task': {'queue': 'for_email', 'routing_key': 'for_email'}, }
celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
from celery import Celery from celery_tasks import celeryconfig from vdw_mws import settings import os # 为celery设置环境变量 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "vdw_mws.settings") ## 创建celery app app = Celery('celery_tasks') # 从单独的配置模块中加载配置 app.config_from_object(celeryconfig) # 设置app自动加载任务 app.autodiscover_tasks(['celery_tasks'])
task.py 邮件服务task
1 2 3 4 5 6 7 8
from celery_tasks.celery import app as celery_app # 导入创建好的celery应用 from django.core.mail import send_mail # 使用django内置函数发送邮件 from vdw_mws import settings # 导入django的配置 @celery_app.task def send_mail_task(title,email,msg): # 使用django内置函数发送邮件 send_mail(title, '', settings.EMAIL_FROM,[email],html_message=msg)
在django的settings文件中增加邮箱测试配置文件
1 2 3 4 5 6 7 8 9 10
EMAIL_BACKEND = 'django.core.mail.backends.smtp.EmailBackend' EMAIL_HOST = 'smtp.163.com' EMAIL_PORT = 465 EMAIL_USE_SSL = True # SSL加密方式 # #发送邮件的邮箱 EMAIL_HOST_USER = '[email protected] ' # #在邮箱中设置的客户端授权密码 EMAIL_HOST_PASSWORD = 'token95' # #收件人看到的发件人,必须和上面的邮箱一样,否则发不出去 EMAIL_FROM = '天天生鲜<[email protected] >'
启动服务
1 2 3 4 5 6 7 8 9 10 11 12 13
celery worker -A celery_tasks -l INFO -c 2 -Q for_email [tasks] . celery_tasks.tasks.send_mail_task [2020-03-29 23:10:16,922: INFO/MainProcess] Connected to redis://127.0.0.1:6379/8 [2020-03-29 23:10:16,933: INFO/MainProcess] mingle: searching for neighbors [2020-03-29 23:10:17,939: INFO/MainProcess] mingle: all alone [2020-03-29 23:10:17,958: WARNING/MainProcess] /Users/baihe/.virtualenvs/mws/lib/python3.6/site-packages/celery/fixups/django.py:265: UserWarning: Using settings.DEBUG leads to a memory leak, never use this setting in production environments! warnings.warn('Using settings.DEBUG leads to a memory leak, never ' [2020-03-29 23:10:17,959: WARNING/MainProcess] [email protected] ready.
测试邮件发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
(mws) ➜ mws git:(personal/baihe/2020-03-22-celery-framework) python3 Python 3.6.6 (v3.6.6:4cf1f54eb7, Jun 26 2018, 19:50:54) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.57)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> >>> >>> >>> from celery_tasks.tasks import send_mail_task >>> title = '访问百度' >>> msg = '<a href="http://www.baidu.com/" target="_blank">访问百度</a>' >>> email = '[email protected] ' >>> send_mail_task.delay(title, email, msg) <AsyncResult: dd90e491-aeed-44c9-b8f9-08785672d829> # 查看队列服务输出 [2020-03-29 23:11:54,849: INFO/MainProcess] Received task: celery_tasks.tasks.send_mail_task[dd90e491-aeed-44c9-b8f9-08785672d829] [2020-03-29 23:11:56,906: INFO/MainProcess] Task celery_tasks.tasks.send_mail_task[dd90e491-aeed-44c9-b8f9-08785672d829] succeeded in 2.05399569599831s: None
启动命令参数说明
1 2 3 4 5 6 7 8 9 10 11
celery worker -A celery_tasks -Q for_email -l INFO -c 2 -f logs/message.log 参数1:celery 固定参数,用于启动celery 参数2:启动的celery组件,这里启动的是worker,用于执行任务 参数3、4:参数为一组,启动的task任务 参数5、6:参数为一组,用于指定消费队列,参数中,'-Q', 'message_queue'两个参数,是指定这个worker消费名为“message_queue”的队列 参数7、8:参数为一组,指定日志的级别,这里记录级别为info的日志 参数9、10:参数为一组,-c 2,指定启动多少个work进程 参数11、12:参数为一组,指定日志文件的位置,这里将日志记录在log/message.log 额外说明:当在高并发场景下,使用gevent或者event时,可以用-P 参数,-P eventlet。
celery 其他相关命令
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# 发布任务 celery -A celery_task beat # 执行任务 celery -A celery_task worker -l info -P eventlet # 将以上两条合并 celery -B -A celery_task worker # 后台启动celery worker进程 celery multi start work_1 -A appcelery # 停止worker进程,如果无法停止,加上-A celery multi stop WORKNAME # 重启worker进程 celery multi restart WORKNAME # 查看进程数 celery status -A celery_task
celery 监控软件flower
安装flower
启动flower
1 2 3 4 5
# 指定对外访问IP及redis为broker celery flower --address=127.0.0.1 --broker='redis://localhost' # basic_auth开启认证 celery flower --address=127.0.0.1 --broker='redis://localhost' --basic_auth=admin:123456
查看页面
celery 和 supervisor 结合使用
supervisor 安装和使用
supervisor 在centos下的安装和使用文档
celery 邮件队列服务在supervisor中的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
cd /etc/supervisord.d vim send-email-task.conf [program:send-email-task] process_name=%(program_name)s_%(process_num)02d command=/root/.virtualenvs/vdw_mws/bin/python /root/.virtualenvs/vdw_mws/bin/celery worker -A celery_tasks -l INFO -c 8 -Q for_adv_finances_count environment=PATH="~/.virtualenvs/vdw_mws/bin" directory=/data/wwwroot/mws autostart=true autorestart=true user=root numprocs=1 redirect_stderr=true stdout_logfile=/data/log/vdw_mws/celery-adv-finances-count.log stopwatisecs=60 priority=994 stdout_logfile_maxbytes = 10MB
重启supervisor服务
1
systemctl restart supervisord