Celery与Redis结合:Python分布式任务队列与异步处理的高效实践
Celery+Redis:高效实现Python分布式任务队列与异步处理
Celery 是一个简单、灵活且可靠的分布式任务队列,专注于实时处理和任务调度。它能够高效处理大量消息,广泛应用于 Python 项目中,适用于发送电子邮件、数据清洗等耗时任务。
Celery有广泛的用户与贡献者社区,Github地址https://github.com/celery/celery。
send task to broker by celery
receive task from broker
receive task from broker
receive task from broker
User Application
Broker
Redis or RabbitMQ
Celery Worker 1
Celery Worker 2
Celery Worker 3
第一部分 基础概述
一、Celery 核心概念
-
分布式任务队列
Celery 是一个基于 Python 的分布式任务队列系统,专注于实时任务处理和调度,通过异步执行任务提升系统性能。其核心设计支持多台计算机协同工作,实现任务分发与并行处理。 -
消息中间件(Broker)
负责接收和传递任务消息,常用中间件包括 Redis 和 RabbitMQ。它是 Celery 架构的核心组件,确保任务生产者与消费者解耦。 -
任务执行单元(Worker)
Worker 是实际执行任务的进程,通过监听 Broker 队列获取任务并处理。支持动态扩展 Worker 数量以适应负载变化。 -
结果存储(Backend)
用于保存任务执行结果,可选组件如 Redis、数据库或内存。若不需结果存储,可配置为忽略。 -
任务(Task)
用户定义的具体操作逻辑,通过装饰器@app.task
标记为异步任务。任务可以是函数或类方法,支持同步/异步调用。
二、安装
pip install celery redis
三、典型配置示例
# celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
task_default_queue = 'default'
task_routes = {
'tasks.add': {'queue': 'math'},
'tasks.send_email': {'queue': 'email'}
}
task_acks_late = True
worker_prefetch_multiplier = 1 # 公平调度
四、关键场景说明
-
任务优先级
Redis 传输支持优先级(Celery 4.0+),配置 broker_transport_options 中的 priority_steps 参数,定义优先级范围:app.conf.broker_transport_options = { 'priority_steps': [0, 1, 2] # 支持0(低)、1(中)、2(高)三个优先级 'queue_order_strategy': 'priority', }
路由配置,确保任务路由到同一队列,不同优先级通过参数区分:
app.conf.task_routes = { 'high_priority_task': {'queue': 'default', 'priority': 2}, 'medium_priority_task': {'queue': 'default', 'priority': 1}, 'low_priority_task': {'queue': 'default', 'priority': 0} }
指定任务优先级,发送任务时通过 priority 参数设置优先级:
high_priority_task.apply_async(priority=2) # 最高优先级
-
防止内存泄漏
启动 Worker 时限制子进程任务数:celery -A proj worker --max-tasks-per-child=100
-
动态任务重试
在任务内部使用self.retry
控制重试逻辑:@app.task(bind=True, max_retries=5) def fetch_data(self, url): try: response = requests.get(url) return response.json() except requests.Timeout as e: self.retry(exc=e, countdown=60) # 60 秒后重试
第二部分 Celery Worker
一、基础命令结构
celery -A <模块名> worker [参数]
示例:
celery -A tasks worker --loglevel=info --concurrency=4
二、核心参数说明
参数 | 说明 | 典型值 |
---|---|---|
--concurrency / -c |
工作进程数(默认=CPU核心数)。I/O密集型任务可增加,CPU密集型任务建议等于CPU数 | 4 , 10 |
--queues / -Q |
监听的队列(默认监听所有队列)。用于路由任务优先级 | high_priority,low_priority |
--loglevel / -l |
日志级别 | info , debug , warning |
--hostname / -n |
Worker名称(默认自动生成)。用于监控识别 | worker1@%h |
--autoscale |
动态伸缩进程数。格式:最大进程数,最小进程数 |
10,2 |
--max-tasks-per-child |
子进程处理任务数上限。防内存泄漏 | 100 |
--time-limit |
任务硬超时时间(秒)。超时强制终止任务 | 300 |
--soft-time-limit |
任务软超时时间(秒)。超时抛出异常,任务可捕获处理 | 240 |
--without-gossip |
禁用节点间通信。减少网络开销 | 无值 |
--without-mingle |
禁用启动时节点同步。加速启动 | 无值 |
--without-heartbeat |
禁用心跳检测。可能影响监控 | 无值 |
--prefetch-multiplier |
预取任务倍数(默认=4)。控制并发消费速度 | 1 (公平调度) |
--pool / -P |
执行池类型。prefork (默认), eventlet , gevent , solo |
gevent |
--detach |
后台运行(守护进程模式) | 无值 |
三、高级参数说明
参数 | 说明 | 默认值 |
---|---|---|
--statedb |
持久化 Worker 状态到文件(用于恢复) | None |
--scheduler |
自定义调度器(如配合 redbeat 使用) |
None (使用默认调度逻辑) |
--exclude-queues |
排除特定队列 | None (不排除任何队列) |
--pidfile |
指定 PID 文件路径 | None (不生成 PID 文件) |
--heartbeat-interval |
心跳检测间隔(秒) | 30 (秒) |
四、场景化配置示例
1. 高并发处理(I/O密集型)
celery -A tasks worker -P gevent -c 100 --prefetch-multiplier=1
gevent
协程池,启动 100 个协程prefetch-multiplier=1
确保公平调度2. CPU密集型任务
celery -A tasks worker -c 4 --max-tasks-per-child=100
3. 生产环境标准配置
celery -A tasks worker -c 8 -Q pay,email --autoscale=16,4 --time-limit=600
order_pay
和 email
队列4. 调试模式
celery -A tasks worker -l debug -P solo --without-gossip --without-mingle
debug
solo
池),避免多进程干扰调试五、关键操作命令
-
优雅关闭 Worker
celery -A tasks control shutdown
-
查看活跃 Worker
# 查看活跃的工作进程 celery -A tasks inspect active # 查看工作进程的状态 celery -A tasks status # 查看当前正在执行的任务 celery inspect active # 查看已注册的任务 celery inspect registered
-
远程清除队列
celery -A tasks purge -Q queue_name
六、配置文件集成
在 celeryconfig.py
中定义参数:
# celeryconfig.py
worker_concurrency = 8
worker_prefetch_multiplier = 2
worker_max_tasks_per_child = 100
启动时自动加载配置:
celery -A tasks worker --config=celeryconfig
七、官方文档参考
第三部份 Celery Beat
一、基础命令结构
celery -A <模块名> beat [参数]
示例:
celery -A tasks beat --loglevel=info --scheduler=redbeat.RedBeatScheduler
二、核心参数说明
参数 | 说明 | 典型值 |
---|---|---|
--scheduler / -S |
指定调度器。默认 celery.beat.PersistentScheduler (本地文件存储) |
redbeat.RedBeatScheduler (Redis)django_celery_beat.schedulers:DatabaseScheduler (Django) |
--schedule / -s |
调度文件路径(仅限默认调度器)。存储任务元数据 .db |
/var/run/celery/beat-schedule |
--max-interval |
最大调度循环间隔(秒)。默认 0(尽可能快) | 30 (降低 CPU 占用) |
--loglevel / -l |
日志级别 | DEBUG INFO WARNING ERROR CRITICAL FATAL |
--logfile / -f |
日志 文件路径。默认 stderr |
/var/run/celery/beat.pid |
--pidfile |
PID 文件路径。防止多实例冲突 | /var/run/celery/beat.pid |
--detach |
后台运行(守护进程模式) | 无值 |
三、高级参数说明
参数 | 说明 | 默认值 |
---|---|---|
--timezone |
设置时区(需 celery 启用时区支持) |
UTC |
--enable-utc |
强制使用 UTC 时区 | True |
--ssl |
True SSL 连接 Broker |
False |
--workdir |
指定工作目录 | None (当前目录) |
四、场景化配置示例
-
使用默认调度器(本地文件存储)
celery -A tasks beat --schedule=/data/celerybeat-schedule --loglevel=info
- 调度记录存储在文件中
- 适合单机部署
-
使用 RedBeat(Redis 存储)
celery -A tasks beat \ --scheduler=redbeat.RedBeatScheduler \ --redbeat_redis_url=redis://localhost:6379/2 \ --max-interval=10
- 任务配置持久化到 Redis
- 支持分布式部署
-
生产环境高可用
celery -A tasks beat \ --scheduler=redbeat.RedBeatScheduler \ --redbeat_redis_url=redis://localhost:6379/2 \ --pidfile=/var/run/celery/beat.pid \ --max-interval=5 \ --loglevel=warning
- 使用 Redis 集群
- 限制调度间隔为 5 秒
- 日志级别为警告
-
调试模式
celery -A tasks beat --loglevel=debug --max-interval=1
- 实时打印调试日志
- 每秒检查一次任务
五、配置文件集成
在 celeryconfig.py
中定义参数:
# celeryconfig.py
beat_scheduler = "redbeat.RedBeatScheduler"
redbeat_redis_url = "redis://localhost:6379/2"
max_interval = 10
timezone = "Asia/Shanghai"
启动时自动加载:
celery -A tasks beat --config=celeryconfig
六、关键操作
-
防止多实例冲突
celery -A tasks beat --pidfile=/tmp/celerybeat.pid
-
动态添加任务(RedBeat)
from redbeat import RedBeatSchedulerEntry from celery.schedules import crontab entry = RedBeatSchedulerEntry( name='daily-report', task='tasks.generate_report', schedule=crontab(hour=8, minute=0), args=['sales'] ) entry.save()
-
监控 Beat 状态
celery -A tasks inspect scheduled
七、注意事项
- 时区一致性
Broker、Worker、Beat 需配置相同时区。 - Redis 高可用
使用 RedBeat 时建议配置 Redis 哨兵或集群。 - 文件权限
使用--schedule
时确保进程有写入权限。
八、官方文档参考
第四部分 Celery RedBeat
一、RedBeat 核心特性
二、核心参数详解
参数 | 默认值 | 说明 |
---|---|---|
--scheduler |
celery.beat.PersistentScheduler |
必须显式指定为 redbeat.RedBeatScheduler |
--redbeat_redis_url |
redis://localhost:6379/0 |
Redis 连接字符串(支持哨兵/集群) |
--redbeat_key_prefix |
redbeat: |
Redis 键名前缀(建议按项目区分) |
--redbeat_lock_key |
redbeat::lock |
分布式锁键名(多实例需相同) |
--redbeat_lock_timeout |
30 (秒) | 锁自动释放时间(需 > 调度间隔) |
--redbeat_schedule_interval |
30 (秒) | 任务检查频率(影响 CPU 使用率) |
--redbeat_expire_seconds |
604800 (7天) | 任务元数据过期时间 |
--redbeat_initial_task_load |
True | 启动时预加载所有任务 |
--redbeat_max_loop_interval |
300 (秒) | 最大休眠间隔(防止时钟漂移) |
三、配置方法
安装
pip install celery-redbeat
方式一:命令行启动
celery beat -A proj \
--scheduler=redbeat.RedBeatScheduler \
--redbeat_redis_url=redis://:密码@集群节点:6379/2 \
--redbeat_key_prefix=prod:celery:beat \
--redbeat_lock_timeout=60 \
--redbeat_schedule_interval=10
方式二:配置文件(celery.py)
app.conf.update(
# 基础配置
broker_url='redis://localhost:6379/0',
result_backend='redis://localhost:6379/1',
# RedBeat 专用配置
redbeat_redis_url='redis://:密码@集群节点:6379/2',
redbeat_key_prefix='prod:celery:beat',
redbeat_lock_key='prod:celery:cluster_lock',
redbeat_lock_timeout=60,
redbeat_schedule_interval=10
)
四、核心操作示例
1. 动态任务管理
from redbeat import RedBeatSchedulerEntry
from celery.schedules import crontab
# 创建每小时执行的任务
entry = RedBeatSchedulerEntry(
name='hourly_cleanup', # 全局唯一标识
task='tasks.cleanup',
schedule=crontab(minute=0),
args=('logs',),
kwargs={'retry': True},
options={'queue': 'system'}
)
entry.save() # 持久化到Redis
# 修改现有任务
entry = RedBeatSchedulerEntry.from_key('redbeat:hourly_cleanup')
entry.schedule = crontab(minute=0, hour='*/2') # 改为每2小时
entry.save()
# 删除任务
entry.delete()
2. 多实例高可用部署
# 实例1
celery beat -A proj --scheduler=redbeat.RedBeatScheduler \
--redbeat_lock_key=cluster1 --redbeat_lock_timeout=60
# 实例2(自动故障转移)
celery beat -A proj --scheduler=redbeat.RedBeatScheduler \
--redbeat_lock_key=cluster1 --redbeat_lock_timeout=60
五、监控与维护
1. Redis 数据查看
# 列出所有任务
redis-cli --scan --pattern 'redbeat:*'
# 查看任务详情
redis-cli HGETALL redbeat:my_task_name
# 强制清除旧数据
redis-cli --scan --pattern 'redbeat:*' | xargs redis-cli del
2. 日志监控要点
[INFO] redbeat: Starting redbeat scheduler.
[DEBUG] redbeat: Acquired lock
[WARNING] redbeat: Lock timeout reached, releasing lock.
[ERROR] redbeat: Redis connection error: Error 111 connecting to...
六、最佳实践
-
容量规划
Redis 内存建议预留:任务数 * 1KB
,例如 1000 个任务约需 1MB -
安全配置
# 禁用危险操作 app.conf.redbeat_allow_unsafe_write = False
-
监控指标
Prometheus 监控建议采集: redbeat_tasks_total
(Counter)redbeat_lock_acquired
(Gauge)redbeat_errors_total
(Counter)-
灾备方案
定期导出任务配置:import json from redbeat import RedBeatSchedulerEntry tasks = RedBeatSchedulerEntry.all() with open('backup.json', 'w') as f: json.dump([task.to_dict() for task in tasks], f)
七、故障排查指南
现象 | 可能原因 | 解决方案 |
---|---|---|
任务不执行 | Redis 连接失败 | 检查 redbeat_redis_url 网络连通性 |
多实例同时运行 | 锁竞争失败 | 增加 redbeat_lock_timeout |
任务重复执行 | 系统时钟不同步 | 部署 NTP 时间同步服务 |
CPU 使用率高 | schedule_interval 过小 |
适当增大检查间隔 |
八、官方文档参考
第五部分 Celery Flower
一、Flower 核心特性
二、核心参数详解
参数 | 默认值 | 说明 |
---|---|---|
--port |
5555 | 监听端口 |
--address |
0.0.0.0 | 绑定 IP(0.0.0.0 表示所有网络接口) |
--auth |
None | 认证方式(格式:username:password ) |
--basic_auth |
None | 多用户认证(格式:user1:pass1,user2:pass2 ) |
--persistent |
False | 启用持久化(保存状态至 SQLite) |
--db |
flower | 持久化数据库文件名 |
--max_tasks |
10000 | 内存中保留的最大任务记录数 |
--enable_events |
True | 实时接收 Celery 事件 |
--xheaders |
False | 支持代理服务器(如 Nginx)的 X-Forwarded 头 |
--url_prefix |
None | URL 前缀(用于反向代理场景,如 /flower ) |
三、配置方法
安装
pip install flower
命令行启动
celery flower -A proj \
--port=5555 \
--basic_auth=admin:admin123,user:userpass \
--persistent=True \
--db=/data/flower.db
四、核心操作示例
1. 任务管理
# 通过 API 终止任务
import requests
resp = requests.post(
'http://localhost:5555/api/task/revoke/任务ID',
auth=('admin', 'admin123')
)
print(resp.json())
2. 数据导出
# 导出所有任务数据
curl -u admin:admin123 http://localhost:5555/api/tasks > tasks.json
# 查询指定 Worker 状态
curl -u admin:admin123 http://localhost:5555/api/worker/worker1@host
五、监控与告警
1. Prometheus 集成
# Prometheus 配置
scrape_configs:
- job_name: 'flower'
metrics_path: '/metrics'
static_configs:
- targets: ['flower-host:5555']
# 关键指标:
- `flower_workers_total` 存活 Worker 数
- `flower_tasks_total` 历史任务总数
- `flower_task_events` 任务状态变化计数器
2. 告警规则
# Alertmanager 配置示例
- alert: CeleryWorkerDown
expr: flower_workers_total < 3
for: 5m
labels:
severity: critical
annotations:
summary: "Celery 节点宕机 (实例: {{ $labels.instance }})"
六、最佳实践
1. 安全加固
# 强制 HTTPS(需配合反向代理)
flower --xheaders=true --url_prefix=flower
# 使用 JWT 认证
flower --auth_provider=flower.jwt.JWTAuth --auth=JWT_SECRET_KEY
2. 高可用部署
# 多实例负载均衡(需共享持久化存储)
# 节点1
flower --persistent --db=/shared-storage/flower.db --port=5555
# 节点2
flower --persistent --db=/shared-storage/flower.db --port=5555
3. 数据清理策略
# 自动清理旧数据(需自定义插件)
from flower.models import Task
from datetime import datetime, timedelta
def cleanup_old_tasks():
cutoff = datetime.now() - timedelta(days=7)
Task.objects.filter(received__lt=cutoff).delete()
七、故障排查指南
现象 | 可能原因 | 解决方案 |
---|---|---|
无法访问 UI | 防火墙限制 | 检查 --address 和 --port 配置 |
图表数据不更新 | 未启用事件模式 | 启动时添加 --enable_events=true |
认证失败 | 特殊字符未转义 | 使用 URL 编码处理密码中的 @ 和 : |
持久化数据丢失 | 文件权限问题 | 确保 --db 路径可写 |
Prometheus 指标为空 | 未暴露 metrics 端点 | 确认启动参数无 --disable_metrics |
八、官方文档参考
附录 Celery 详细配置参数说明
1. General settings(通用设置)
参数名 | 默认值 | 说明 |
---|---|---|
accept_content |
{'json'} |
允许的内容类型/序列化器列表 |
result_accept_content |
None |
结果后端允许的内容类型/序列化器列表 |
2. Time and date settings(时间和日期设置)
参数名 | 默认值 | 说明 |
---|---|---|
enable_utc |
True |
是否使用 UTC 时间 |
timezone |
"UTC" |
配置 Celery 使用的时区 |
3. Task settings(任务设置)
参数名 | 默认值 | 说明 |
---|---|---|
task_annotations |
None |
用于重写任务属性的注解 |
task_compression |
None |
任务消息的默认压缩方式 |
task_protocol |
2 |
发送任务时使用的默认任务消息协议版本 |
task_serializer |
"json" |
默认序列化方法 |
task_publish_retry |
True |
发布任务消息时是否重试 |
task_publish_retry_policy |
见文档 | 重试发布任务消息的策略 |
4. Task execution settings(任务执行设置)
参数名 | 默认值 | 说明 |
---|---|---|
task_always_eager |
False |
是否总是本地执行任务 |
task_eager_propagates |
False |
是否传播本地执行任务的异常 |
task_store_eager_result |
False |
是否保存本地执行任务的结果 |
task_remote_tracebacks |
False |
是否在任务结果中包含远程追踪信息 |
task_ignore_result |
False |
是否忽略任务的返回值 |
task_store_errors_even_if_ignored |
False |
是否即使忽略任务结果也存储错误 |
task_track_started |
False |
是否报告任务的 ‘started’ 状态 |
task_time_limit |
无 | 任务的硬时限 |
task_soft_time_limit |
无 | 任务的软时限 |
task_acks_late |
False |
是否在任务执行后确认消息 |
task_acks_on_failure_or_timeout |
True |
是否在任务失败或超时时确认消息 |
task_reject_on_worker_lost |
False |
是否在工作进程丢失时拒绝任务 |
task_default_rate_limit |
无 | 全局默认任务速率限制 |
5. Task result backend settings(任务结果后端设置)
参数名 | 默认值 | 说明 |
---|---|---|
result_backend |
无 | 存储任务结果的后端 |
result_backend_always_retry |
False |
是否总是重试可恢复的异常 |
result_backend_max_sleep_between_retries_ms |
10000 |
重试之间的最大睡眠时间 |
result_backend_base_sleep_between_retries_ms |
10 |
重试之间的基础睡眠时间 |
result_backend_max_retries |
无限 | 最大重试次数 |
result_backend_thread_safe |
False |
后端对象是否线程安全 |
result_backend_transport_options |
{} |
传递给底层传输的额外选项 |
result_serializer |
"json" |
结果序列化格式 |
result_compression |
无 | 结果的可选压缩方式 |
result_extended |
False |
是否启用扩展任务结果属性 |
result_expires |
86400 |
存储的任务结果的过期时间 |
result_cache_max |
False |
客户端缓存结果的总数 |
result_chord_join_timeout |
3.0 |
在 chord 中连接结果的超时时间 |
result_chord_retry_interval |
1.0 |
chord 任务的默认重试间隔 |
6. Database backend settings(数据库后端设置)
参数名 | 默认值 | 说明 |
---|---|---|
database_engine_options |
{} |
SQLAlchemy 数据库引擎的额外选项 |
database_short_lived_sessions |
False |
是否使用短生命周期的会话 |
database_table_schemas |
{} |
数据库结果后端的表架构自定义 |
database_table_names |
{} |
数据库结果后端的表名自定义 |
7. RPC backend settings(RPC 后端设置)
参数名 | 默认值 | 说明 |
---|---|---|
result_persistent |
False |
是否持久化结果消息 |
8. Cache backend settings(缓存后端设置)
参数名 | 默认值 | 说明 |
---|---|---|
cache_backend_options |
{} |
传递给缓存后端的额外选项 |
9. MongoDB backend settings(MongoDB 后端设置)
mongodb_backend_settings = {配置字典
}
参数名 | 默认值 | 说明 |
---|---|---|
database |
"celery" |
连接的数据库名称 |
taskmeta_collection |
"celery_taskmeta" |
存储任务元数据的集合名称 |
max_pool_size |
10 |
PyMongo 连接的最大池大小 |
options |
{} |
传递给 MongoDB 连接构造函数的其他关键字参数 |
10. Redis backend settings(Redis 后端设置)
参数名 | 默认值 | 说明 |
---|---|---|
result_backend |
未配置 | Redis 后端连接 'redis://username:password@host:port/db' |
redis_backend_health_check_interval |
未配置 | Redis 后端支持健康检查,设置为两次健康检查之间的秒数 |
redis_backend_use_ssl |
False |
Redis 后端是否使用 SSL 连接 |
redis_max_connections |
无限制 | Redis 连接池的最大连接数 |
redis_socket_connect_timeout |
None |
Redis 套接字连接超时时间(秒) |
redis_socket_timeout |
120.0 秒 |
Redis 套接字读写操作的超时时间 |
redis_retry_on_timeout |
False |
在 Redis 套接字超时时是否重试读写操作 |
redis_socket_keepalive |
False |
是否保持 Redis 套接字连接的活动状态 |
10. File-system backend settings(文件系统 后端设置)
参数名 | 默认值 | 说明 |
---|---|---|
result_backend |
未配置 | 文件系统后端连接 'file:///var/celery/results' |
11. Message Routing(消息路由)
参数名 | 默认值 | 说明 |
---|---|---|
task_queues |
None |
工作进程将从中消费的任务队列列表 |
task_routes |
None |
用于将任务路由到队列的路由表 |
task_queue_max_priority |
None |
队列的最大优先级 |
task_default_priority |
None |
任务的默认优先级 |
task_inherit_parent_priority |
False |
子任务是否继承父任务的优先级 |
worker_direct |
False |
是否启用直接队列 |
task_create_missing_queues |
True |
是否自动创建缺失的队列 |
task_default_queue |
"celery" |
默认队列名称 |
task_default_exchange |
使用 task_default_queue 的值 |
默认交换机名称 |
task_default_exchange_type |
"direct" |
默认交换机类型 |
task_default_routing_key |
使用 task_default_queue 的值 |
默认路由键 |
task_default_delivery_mode |
"persistent" |
默认投递模式 |
12. Broker settings(消息中间件设置)
参数名 | 默认值 | 说明 |
---|---|---|
broker_url |
"amqp://localhost" |
消息中间件地址 |
broker_read_url |
从 broker_url 继承 |
用于读取操作的 broker 连接地址 |
broker_write_url |
从 broker_url 继承 |
用于写入操作的 broker 连接地址 |
broker_failover_strategy |
"round-robin" |
broker 连接失败时的备用策略 |
broker_heartbeat |
120.0 |
broker 连接的心跳间隔 |
broker_heartbeat_checkrate |
2.0 |
broker 心跳检查的频率 |
broker_use_ssl |
False |
是否使用 SSL 连接 broker |
broker_pool_limit |
10 |
broker 连接池的最大连接数 |
broker_connection_timeout |
4.0 |
broker 连接超时时间 |
broker_connection_retry |
True |
是否在连接失败时重试连接 |
broker_connection_retry_on_startup |
True |
在启动时是否重试连接 |
broker_connection_max_retries |
100 |
连接失败时的最大重试次数 |
broker_channel_error_retry |
False |
是否在通道错误时重试连接 |
broker_login_method |
"AMQPLAIN" |
broker 登录方法 |
broker_transport_options |
{} |
传递给底层传输的额外选项 |
13. Worker(工作进程)
参数名 | 默认值 | 说明 |
---|---|---|
imports |
[] |
工作进程启动时导入的模块列表 |
include |
[] |
与 imports 类似的模块导入列表 |
worker_deduplicate_successful_tasks |
False |
是否检查重复的任务 |
worker_concurrency |
CPU 核心数 | 并发工作进程/线程数 |
worker_prefetch_multiplier |
4 |
每个工作进程预取的消息数 |
worker_enable_prefetch_count_reduction |
True |
是否在连接丢失后减少预取数 |
worker_lost_wait |
10.0 |
等待丢失工作进程的时间 |
worker_max_tasks_per_child |
无限制 | 每个工作进程的最大任务数 |
worker_max_memory_per_child |
无限制 | 每个工作进程的最大内存 |
worker_disable_rate_limits |
False |
是否禁用所有任务的速率限制 |
worker_state_db |
None |
存储持久工作进程状态的文件名 |
worker_timer_precision |
1.0 |
定时器的精度 |
worker_enable_remote_control |
True |
是否启用远程控制 |
worker_proc_alive_timeout |
4.0 |
等待新工作进程启动的时间 |
worker_cancel_long_running_tasks_on_connection_loss |
False |
是否在连接丢失时取消长运行任务 |
14. Events(事件)
参数名 | 默认值 | 说明 |
---|---|---|
worker_send_task_events |
False |
是否发送任务相关事件 |
task_send_sent_event |
False |
是否发送任务发送事件 |
event_queue_ttl |
5.0 |
事件队列中消息的过期时间 |
event_queue_expires |
60.0 |
事件队列的过期时间 |
event_queue_prefix |
"celeryev" |
事件接收队列名称的前缀 |
event_exchange |
"celeryev" |
事件交换机名称 |
event_serializer |
"json" |
事件消息的序列化格式 |
events_logfile |
None |
事件日志文件路径 |
events_pidfile |
None |
事件进程 ID 文件路径 |
events_uid |
None |
事件进程的用户 ID |
events_gid |
None |
事件进程的组 ID |
events_umask |
None |
事件进程创建文件时的 umask |
events_executable |
None |
事件进程使用的 Python 可执行路径 |
15. Remote Control Commands(远程控制命令)
参数名 | 默认值 | 说明 |
---|---|---|
control_queue_ttl |
300.0 |
远程控制命令队列中消息的过期时间 |
control_queue_expires |
10.0 |
远程控制命令队列的过期时间 |
control_exchange |
"celery" |
远程控制命令交换机名称 |
16. Logging(日志)
| worker_hijack_root_logger
| True
| 是否劫持根日志记录器 |
| worker_log_color
| True
| 是否在日志输出中使用颜色 |
| worker_log_format
| "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s"
| 工作进程日志格式 |
| worker_task_log_format
| "[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s"
| 任务日志格式 |
| worker_redirect_stdouts
| True
| 是否将标准输出和标准错误重定向到日志 |
| worker_redirect_stdouts_level
| WARNING
| 重定向的标准输出和标准错误的日志级别 |
17. Security(安全)
参数名 | 默认值 | 说明 |
---|---|---|
security_key |
None |
用于消息签名的私钥文件路径 |
security_key_password |
None |
私钥文件的密码 |
security_certificate |
None |
用于消息签名的证书文件路径 |
security_cert_store |
None |
存储 X.509 证书的目录路径 |
security_digest |
sha256 |
消息签名使用的加密摘要算法 |
18. Custom Component Classes(自定义组件类)
参数名 | 默认值 | 说明 |
---|---|---|
worker_pool |
"prefork" |
工作进程池的类名 |
worker_pool_restarts |
False |
是否允许重启工作进程池 |
worker_autoscaler |
"celery.worker.autoscale:Autoscaler" |
自动缩放器类名 |
worker_consumer |
"celery.worker.consumer:Consumer" |
消费者类名 |
worker_timer |
"kombu.asynchronous.hub.timer:Timer" |
定时器类名 |
worker_logfile |
None |
工作进程日志文件路径 |
worker_pidfile |
None |
工作进程 ID 文件路径 |
worker_uid |
None |
工作进程的用户 ID |
worker_gid |
None |
工作进程的组 ID |
worker_umask |
None |
工作进程创建文件时的 umask |
worker_executable |
None |
工作进程使用的 Python 可执行路径 |
19. Beat settings(定时任务设置)
参数名 | 默认值 | 说明 |
---|---|---|
beat_schedule |
{} |
定时任务调度表 |
beat_scheduler |
"celery.beat:PersistentScheduler" |
定时任务调度器类名 |
beat_schedule_filename |
"celerybeat-schedule" |
定时任务调度文件名 |
beat_sync_every |
0 |
定时任务同步间隔 |
beat_max_loop_interval |
0 |
定时任务循环的最大间隔 |
beat_cron_starting_deadline |
None |
定时任务的启动截止时间 |
beat_logfile |
None |
定时任务日志文件路径 |
beat_pidfile |
None |
定时任务进程 ID 文件路径 |
beat_uid |
None |
定时任务进程的用户 ID |
beat_gid |
None |
定时任务进程的组 ID |
beat_umask |
None |
定时任务进程创建文件时的 umask |
beat_executable |
None |
定时任务进程使用的 Python 可执行路径 |
官方文档参考
作者:船长@Quant