Celery的使用(2)---从配置开始

前言

上文中我们将配置直接设置在了tasks.py中:

tasks.py
from celery import Celery
app = Celery('tasks', broker='redis://:password@host:port/database_num',backend='redis://:password@host:port/database_num')
#第一个次数‘tasks’是当前模块的名字,第二个参数是broker代理的url,这里选用了redis

这样的做法适合于小项目,如果想规范化的话还是建议单独存放在一个配置文件中。

开始配置

配置文件化的好处是容易管理,可以增加复用率,不然每个tasks.py文件中你都写一遍冗长的redis、MQ的url,一方面麻烦,另一方面容易泄露一些敏感字段。

配置文件化

这时我们可以在与tasks.py 同级的目录下新建一个celeryconfig.py的文件,文件名并不是指定的,可以任意取,但是最好能一眼就能看出它的功能。
打开celeryconfig.py,我们开始将tasks.py中的broker和backend信息搬到配置文件中,接下介绍一下用到的两个配置,有些配置celery4.x和老版本有一些区别,有区别的在后面已注释:

broker的url
CELERY_BROKER_URL='redis://:password@host:port/database_num' ##4.x之前用这个
BROKER_URL = 'redis://:password@host:port/database_num' ##4.x之后用这个
接受结果的url
CELERY_RESULT_BACKEND = 'redis://:password@host:port/database_num'

此时,celeryconfig.py文件内容如下:

CELERY_BROKER_URL='redis://:password@host:port/database_num'
CELERY_RESULT_BACKEND = 'redis://:password@host:port/database_num'

相应的我们将tasks.py中的内容也做一些调整:

from celery import Celery
app = Celery("tasks")
app.config_from_object('celeryconfig', namespace='CELERY')

可以看出,配置信息已经没有在task.py中出现,内容已减少,看着没那么乱了,创建celery实例也只用传递一个参数,与此同时,我们引入了一个新的函数:

app.config_from_object(self, obj,silent=False, force=False, namespace=None)
#obj:即为我们配置文件或者是一个带有配置的对象;
#silent:为True时导入发生错误会被忽略;
#force:为True时回强制立刻读取配置文件,默认只有需要的时候回被调用;
#namespace:命名;

这个函数可以使我们很容易的就读取配置文件。配置文件所在目录不同导入方式也可以不同:

app.config_from_object('func_app.celeryconfig')
#func_app为celeryconfig所在目录名称

这种情况适用于tasks.py与不在一个目录下,如果都在一个目录下则直接引入即可,不必加前面的func_app,还有一种方式:

##这里假设celeryconfig 在func_app 下且tasks.py不在func_app 下。
from func_app import celeryconfig 
app.config_from_object(celeryconfig)

需要注意的是,调用config_from_object函数将重置前面的所有配置,你如果想添加额外的设置则需要在这个函数调用后添加。

前面说到config_from_object中的obj可以为配置对象,这里举一个小例,如果你确实需要这么做那么这个例子可以作为一个参考:

from celery import Celery

app = Celery("tasks")

class Config:
    enable_utc = True#启动时区
    timezone = 'Europe/London' #设置时区

app.config_from_object(Config)#Config为类,上面我们用的是配置文件,这里使用类

从环境变量中读取配置

还可以从环境变量中读取配置,这里使用到了下面这个函数:

 app.config_from_envvar()
import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

作为一个了解吧,很少会用到这种方式,大多数应该还是采用了文件或者类的方式。

检查配置

如果你想把配置打印出来作为调试信息,同时还希望能屏蔽一些敏感的信息,可以使用Celery提供的这几个API:
humanize()函数:

app.conf.humanize(with_defaults=False, censored=True)

这个方法默认情况下只会以字符串返回你配置的内容,但是你可以通过设置with_defaults来返回所有默认配置,以下是执行后返回的内容。

>>>app.conf.humanize(with_defaults=False, censored=True)
CELERY_BROKER_URL: 'redis://:********@127.0.0.1:6379/1'
CELERY_RESULT_BACKEND: 'redis://:********@127.0.0.1:6379/2'

可以看出它把用户名和密码字段加密了。

如果想以字典形式返回配置信息请使用:
table()函数

>>>app.conf.table(with_defaults=False, censored=True)
{'CELERY_BROKER_URL': 'redis://:********@127.0.0.1:6379/1', 'CELERY_RESULT_BACKEND': 'redis://:********@127.0.0.1:6379/2'}

注意,celery并不能屏蔽所有敏感字段,它也只是依照规则使用正则表达式进行识别,如果你想为你的一些自定义特殊字段进行加密,应该使用celery规定的命名方式,如果你自定义配置里包含这些字符串则会被加密:

API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE

其他常用配置

并发的worker数量,也是命令行-c指定的数目
事实上并不是worker数量越多越好,保证任务不堆积,加上一些新增任务的预留就可以了

CELERYD_CONCURRENCY = 20

celery worker每次去redis取任务的数量,默认值就是4

CELERYD_PREFETCH_MULTIPLIER = 4

每个worker执行了多少次任务后就会死掉,建议数量大一些

CELERYD_MAX_TASKS_PER_CHILD = 200

celery任务执行结果的超时时间

CELERY_TASK_RESULT_EXPIRES = 1200

单个任务的运行时间限制,否则会被杀死

CELERYD_TASK_TIME_LIMIT = 60

使用redis存储任务执行结果,默认不使用

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/1'

将任务结果使用’pickle’序列化成’json’格式
任务序列化方式

CELERY_TASK_SERIALIZER = 'pickle'

任务执行结果序列化方式

CELERY_RESULT_SERIALIZER = 'json'

也可以直接在Celery对象中设置序列化方式

app = Celery('tasks', broker='...', task_serializer='yaml')

指定任务序列化方式

CELERY_TASK_SERIALIZER = 'msgpack'

指定结果序列化方式

CELERY_RESULT_SERIALIZER = 'msgpack'

指定任务接受的序列化类型.

CELERY_ACCEPT_CONTENT = ['msgpack']

任务过期时间,celery任务执行结果的超时时间

CELERY_TASK_RESULT_EXPIRES = 24 * 60 * 60

任务发送完成是否需要确认,对性能会稍有影响

CELERY_ACKS_LATE = True

压缩方案选择,可以是zlib, bzip2,默认是发送没有压缩的数据

CELERY_MESSAGE_COMPRESSION = 'zlib'

规定完成任务的时间
在5s内完成任务,否则执行该任务的worker将被杀死,任务移交给父进程

CELERYD_TASK_TIME_LIMIT = 5

celery worker的并发数,默认是服务器的内核数目,也是命令行-c参数指定的数目

CELERYD_CONCURRENCY = 4

celery worker 每次去BROKER中预取任务的数量

CELERYD_PREFETCH_MULTIPLIER = 4

每个worker执行了多少任务就会死掉,默认是无限的

CELERYD_MAX_TASKS_PER_CHILD = 40

设置默认的队列名称,如果一个消息不符合其他的队列就会放在默认队列里面,如果什么都不设置的话,数据都会发送到默认的队列中

CELERY_DEFAULT_QUEUE = "default"

设置时区

CELERY_TIMEZONE = 'Asia/Shanghai'

启动时区设置

CELERY_ENABLE_UTC = True

限制任务的执行频率
下面这个就是限制tasks模块下的add函数,每秒钟只能执行10次

CELERY_ANNOTATIONS = {'tasks.add':{'rate_limit':'10/s'}}

或者限制所有的任务的刷新频率

CELERY_ANNOTATIONS = {'*':{'rate_limit':'10/s'}}

结束

以上就是配置相关的内容,更多资料请移步从今天开始种树


   转载规则


《Celery的使用(2)---从配置开始》 罗华 采用 知识共享署名 4.0 国际许可协议 进行许可。
 上一篇
docker安装mysql docker安装mysql
最近在部署django,不想在手动安装一遍mysql,便尝试使用docker,总结了安装的心得,这些前提都是在安装了docker后: 1. 查看mysql镜像; docker search mysql 2.有镜像,直接拉取最新镜像dock
2020-01-14
下一篇 
Celery的使用(1)---简单配置与初次运行 Celery的使用(1)---简单配置与初次运行
前言celery是一个基于python开发的简单、灵活且可靠的分布式任务队列框架, 是一个分布式队列的管理工具, 可以用 Celery 提供的接口快速实现并管理一个分布式的任务队列.。它采用了典型的生产者-消费者模型,主要由三部分组成:
2019-11-21
  目录