Celery与Redis结合:Python分布式任务队列与异步处理的高效实践

Celery+Redis:高效实现Python分布式任务队列与异步处理

Celery 是一个简单、灵活且可靠的分布式任务队列,专注于实时处理和任务调度。它能够高效处理大量消息,广泛应用于 Python 项目中,适用于发送电子邮件、数据清洗等耗时任务。

Celery有广泛的用户与贡献者社区,Github地址https://github.com/celery/celery。

send task to broker by celery

receive task from broker

receive task from broker

receive task from broker

User Application

Broker

Redis or RabbitMQ

Celery Worker 1

Celery Worker 2

Celery Worker 3

第一部分 基础概述

一、Celery 核心概念

  1. 分布式任务队列
    Celery 是一个基于 Python 的分布式任务队列系统,专注于实时任务处理和调度,通过异步执行任务提升系统性能。其核心设计支持多台计算机协同工作,实现任务分发与并行处理。

  2. 消息中间件(Broker)
    负责接收和传递任务消息,常用中间件包括 Redis 和 RabbitMQ。它是 Celery 架构的核心组件,确保任务生产者与消费者解耦。

  3. 任务执行单元(Worker)
    Worker 是实际执行任务的进程,通过监听 Broker 队列获取任务并处理。支持动态扩展 Worker 数量以适应负载变化。

  4. 结果存储(Backend)
    用于保存任务执行结果,可选组件如 Redis、数据库或内存。若不需结果存储,可配置为忽略。

  5. 任务(Task)
    用户定义的具体操作逻辑,通过装饰器 @app.task 标记为异步任务。任务可以是函数或类方法,支持同步/异步调用。

二、安装

pip install celery redis

三、典型配置示例

# celery_config.py
broker_url = 'redis://localhost:6379/0'
result_backend = 'redis://localhost:6379/1'
task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Asia/Shanghai'
task_default_queue = 'default'
task_routes = {
    'tasks.add': {'queue': 'math'},
    'tasks.send_email': {'queue': 'email'}
}
task_acks_late = True
worker_prefetch_multiplier = 1  # 公平调度

四、关键场景说明

  1. 任务优先级
    Redis 传输支持优先级(Celery 4.0+),配置 broker_transport_options 中的 priority_steps 参数,定义优先级范围:

    app.conf.broker_transport_options = {
     'priority_steps': [0, 1, 2]  # 支持0(低)、1(中)、2(高)三个优先级
     'queue_order_strategy': 'priority',
     }
    

    路由配置,确保任务路由到同一队列,不同优先级通过参数区分:

    app.conf.task_routes = {
     'high_priority_task': {'queue': 'default', 'priority': 2},
     'medium_priority_task': {'queue': 'default', 'priority': 1},
     'low_priority_task': {'queue': 'default', 'priority': 0}
     }
    

    指定任务优先级,发送任务时通过 priority 参数设置优先级:

    high_priority_task.apply_async(priority=2)  # 最高优先级
    
  2. 防止内存泄漏
    启动 Worker 时限制子进程任务数:

    celery -A proj worker --max-tasks-per-child=100
    
  3. 动态任务重试
    在任务内部使用 self.retry 控制重试逻辑:

    @app.task(bind=True, max_retries=5)
    def fetch_data(self, url):
        try:
            response = requests.get(url)
            return response.json()
        except requests.Timeout as e:
            self.retry(exc=e, countdown=60)  # 60 秒后重试
    

第二部分 Celery Worker

一、基础命令结构

celery -A <模块名> worker [参数]

示例

celery -A tasks worker --loglevel=info --concurrency=4

二、核心参数说明

参数 说明 典型值
--concurrency / -c 工作进程数(默认=CPU核心数)。I/O密集型任务可增加,CPU密集型任务建议等于CPU数 4, 10
--queues / -Q 监听的队列(默认监听所有队列)。用于路由任务优先级 high_priority,low_priority
--loglevel / -l 日志级别 info, debug, warning
--hostname / -n Worker名称(默认自动生成)。用于监控识别 worker1@%h
--autoscale 动态伸缩进程数。格式:最大进程数,最小进程数 10,2
--max-tasks-per-child 子进程处理任务数上限。防内存泄漏 100
--time-limit 任务硬超时时间(秒)。超时强制终止任务 300
--soft-time-limit 任务软超时时间(秒)。超时抛出异常,任务可捕获处理 240
--without-gossip 禁用节点间通信。减少网络开销 无值
--without-mingle 禁用启动时节点同步。加速启动 无值
--without-heartbeat 禁用心跳检测。可能影响监控 无值
--prefetch-multiplier 预取任务倍数(默认=4)。控制并发消费速度 1(公平调度)
--pool / -P 执行池类型prefork(默认), eventlet, gevent, solo gevent
--detach 后台运行(守护进程模式) 无值

三、高级参数说明

参数 说明 默认值
--statedb 持久化 Worker 状态到文件(用于恢复) None
--scheduler 自定义调度器(如配合 redbeat 使用) None(使用默认调度逻辑)
--exclude-queues 排除特定队列 None(不排除任何队列)
--pidfile 指定 PID 文件路径 None(不生成 PID 文件)
--heartbeat-interval 心跳检测间隔(秒) 30(秒)

四、场景化配置示例

1. 高并发处理(I/O密集型)

celery -A tasks worker -P gevent -c 100 --prefetch-multiplier=1
  • 使用 gevent 协程池,启动 100 个协程
  • prefetch-multiplier=1 确保公平调度
  • 2. CPU密集型任务

    celery -A tasks worker -c 4 --max-tasks-per-child=100
    
  • 进程数等于 CPU 核心数
  • 每进程处理 100 个任务后重启,避免内存泄漏
  • 3. 生产环境标准配置

    celery -A tasks worker -c 8 -Q pay,email --autoscale=16,4 --time-limit=600
    
  • 监听 order_payemail 队列
  • 进程数动态调整(4~16)
  • 任务超时 10 分钟强制终止
  • 4. 调试模式

    celery -A tasks worker -l debug -P solo --without-gossip --without-mingle
    
  • 日志级别 debug
  • 单进程运行(solo 池),避免多进程干扰调试
  • 禁用节点通信加速启动
  • 五、关键操作命令

    1. 优雅关闭 Worker

      celery -A tasks control shutdown
      
    2. 查看活跃 Worker

      # 查看活跃的工作进程
      celery -A tasks inspect active
      # 查看工作进程的状态
      celery -A tasks status
      # 查看当前正在执行的任务
      celery inspect active
      # 查看已注册的任务
      celery inspect registered
      
    3. 远程清除队列

      celery -A tasks purge -Q queue_name
      

    六、配置文件集成

    celeryconfig.py 中定义参数:

    # celeryconfig.py
    worker_concurrency = 8
    worker_prefetch_multiplier = 2
    worker_max_tasks_per_child = 100
    

    启动时自动加载配置:

    celery -A tasks worker --config=celeryconfig
    

    七、官方文档参考

  • Celery Worker 官方文档
  • Celery Worker 参数列表
  • 第三部份 Celery Beat

    一、基础命令结构

    celery -A <模块名> beat [参数]
    

    示例

    celery -A tasks beat --loglevel=info --scheduler=redbeat.RedBeatScheduler
    

    二、核心参数说明

    参数 说明 典型值
    --scheduler / -S 指定调度器。默认 celery.beat.PersistentScheduler(本地文件存储) redbeat.RedBeatScheduler(Redis)
    django_celery_beat.schedulers:DatabaseScheduler(Django)
    --schedule / -s 调度文件路径(仅限默认调度器)。存储任务元数据 .db /var/run/celery/beat-schedule
    --max-interval 最大调度循环间隔(秒)。默认 0(尽可能快) 30(降低 CPU 占用)
    --loglevel / -l 日志级别 DEBUG INFO WARNING ERROR CRITICAL FATAL
    --logfile / -f 日志 文件路径。默认 stderr /var/run/celery/beat.pid
    --pidfile PID 文件路径。防止多实例冲突 /var/run/celery/beat.pid
    --detach 后台运行(守护进程模式) 无值

    三、高级参数说明

    参数 说明 默认值
    --timezone 设置时区(需 celery 启用时区支持) UTC
    --enable-utc 强制使用 UTC 时区 True
    --ssl True SSL 连接 Broker False
    --workdir 指定工作目录 None(当前目录)

    四、场景化配置示例

    1. 使用默认调度器(本地文件存储)

      celery -A tasks beat --schedule=/data/celerybeat-schedule --loglevel=info
      
    2. 调度记录存储在文件中
    3. 适合单机部署
    4. 使用 RedBeat(Redis 存储)

      celery -A tasks beat \
      --scheduler=redbeat.RedBeatScheduler \
      --redbeat_redis_url=redis://localhost:6379/2 \
      --max-interval=10
      
    5. 任务配置持久化到 Redis
    6. 支持分布式部署
    7. 生产环境高可用

      celery -A tasks beat \
      --scheduler=redbeat.RedBeatScheduler \
      --redbeat_redis_url=redis://localhost:6379/2 \
      --pidfile=/var/run/celery/beat.pid \
      --max-interval=5 \
      --loglevel=warning
      
    8. 使用 Redis 集群
    9. 限制调度间隔为 5 秒
    10. 日志级别为警告
    11. 调试模式

      celery -A tasks beat --loglevel=debug --max-interval=1
      
    12. 实时打印调试日志
    13. 每秒检查一次任务

    五、配置文件集成

    celeryconfig.py 中定义参数:

    # celeryconfig.py
    beat_scheduler = "redbeat.RedBeatScheduler"
    redbeat_redis_url = "redis://localhost:6379/2"
    max_interval = 10
    timezone = "Asia/Shanghai"
    

    启动时自动加载:

    celery -A tasks beat --config=celeryconfig
    

    六、关键操作

    1. 防止多实例冲突

      celery -A tasks beat --pidfile=/tmp/celerybeat.pid
      
    2. 动态添加任务(RedBeat)

      from redbeat import RedBeatSchedulerEntry
      from celery.schedules import crontab
      
      entry = RedBeatSchedulerEntry(
          name='daily-report',
          task='tasks.generate_report',
          schedule=crontab(hour=8, minute=0),
          args=['sales']
      )
      entry.save()
      
    3. 监控 Beat 状态

      celery -A tasks inspect scheduled
      

    七、注意事项

    1. 时区一致性
      Broker、Worker、Beat 需配置相同时区。
    2. Redis 高可用
      使用 RedBeat 时建议配置 Redis 哨兵或集群。
    3. 文件权限
      使用 --schedule 时确保进程有写入权限。

    八、官方文档参考

  • Celery Beat 官方文档
  • Celery Beat 参数列表
  • 第四部分 Celery RedBeat

    一、RedBeat 核心特性

  • 分布式调度:基于 Redis 实现多 Beat 实例高可用
  • 动态任务管理:支持运行时增删改查定时任务
  • 持久化存储:任务配置存储在 Redis 中,重启不丢失
  • 精确调度:毫秒级任务触发精度
  • 二、核心参数详解

    参数 默认值 说明
    --scheduler celery.beat.PersistentScheduler 必须显式指定为 redbeat.RedBeatScheduler
    --redbeat_redis_url redis://localhost:6379/0 Redis 连接字符串(支持哨兵/集群)
    --redbeat_key_prefix redbeat: Redis 键名前缀(建议按项目区分)
    --redbeat_lock_key redbeat::lock 分布式锁键名(多实例需相同)
    --redbeat_lock_timeout 30 (秒) 锁自动释放时间(需 > 调度间隔)
    --redbeat_schedule_interval 30 (秒) 任务检查频率(影响 CPU 使用率)
    --redbeat_expire_seconds 604800 (7天) 任务元数据过期时间
    --redbeat_initial_task_load True 启动时预加载所有任务
    --redbeat_max_loop_interval 300 (秒) 最大休眠间隔(防止时钟漂移)

    三、配置方法

    安装

    pip install celery-redbeat
    

    方式一:命令行启动

    celery beat -A proj \
      --scheduler=redbeat.RedBeatScheduler \
      --redbeat_redis_url=redis://:密码@集群节点:6379/2 \
      --redbeat_key_prefix=prod:celery:beat \
      --redbeat_lock_timeout=60 \
      --redbeat_schedule_interval=10
    

    方式二:配置文件(celery.py)

    app.conf.update(
        # 基础配置
        broker_url='redis://localhost:6379/0',
        result_backend='redis://localhost:6379/1',
        
        # RedBeat 专用配置
        redbeat_redis_url='redis://:密码@集群节点:6379/2',
        redbeat_key_prefix='prod:celery:beat',
        redbeat_lock_key='prod:celery:cluster_lock',
        redbeat_lock_timeout=60,
        redbeat_schedule_interval=10
    )
    

    四、核心操作示例

    1. 动态任务管理

    from redbeat import RedBeatSchedulerEntry
    from celery.schedules import crontab
    
    # 创建每小时执行的任务
    entry = RedBeatSchedulerEntry(
        name='hourly_cleanup',  # 全局唯一标识
        task='tasks.cleanup',
        schedule=crontab(minute=0),
        args=('logs',),
        kwargs={'retry': True},
        options={'queue': 'system'}
    )
    entry.save()  # 持久化到Redis
    
    # 修改现有任务
    entry = RedBeatSchedulerEntry.from_key('redbeat:hourly_cleanup')
    entry.schedule = crontab(minute=0, hour='*/2')  # 改为每2小时
    entry.save()
    
    # 删除任务
    entry.delete()
    

    2. 多实例高可用部署

    # 实例1
    celery beat -A proj --scheduler=redbeat.RedBeatScheduler \
      --redbeat_lock_key=cluster1 --redbeat_lock_timeout=60
    
    # 实例2(自动故障转移)
    celery beat -A proj --scheduler=redbeat.RedBeatScheduler \
      --redbeat_lock_key=cluster1 --redbeat_lock_timeout=60
    

    五、监控与维护

    1. Redis 数据查看

    # 列出所有任务
    redis-cli --scan --pattern 'redbeat:*'
    
    # 查看任务详情
    redis-cli HGETALL redbeat:my_task_name
    
    # 强制清除旧数据
    redis-cli --scan --pattern 'redbeat:*' | xargs redis-cli del
    

    2. 日志监控要点

    [INFO] redbeat: Starting redbeat scheduler.
    [DEBUG] redbeat: Acquired lock
    [WARNING] redbeat: Lock timeout reached, releasing lock.
    [ERROR] redbeat: Redis connection error: Error 111 connecting to...
    

    六、最佳实践

    1. 容量规划
      Redis 内存建议预留:任务数 * 1KB,例如 1000 个任务约需 1MB

    2. 安全配置

      # 禁用危险操作
      app.conf.redbeat_allow_unsafe_write = False
      
    3. 监控指标
      Prometheus 监控建议采集:

    4. redbeat_tasks_total (Counter)
    5. redbeat_lock_acquired (Gauge)
    6. redbeat_errors_total (Counter)
    7. 灾备方案
      定期导出任务配置:

      import json
      from redbeat import RedBeatSchedulerEntry
      
      tasks = RedBeatSchedulerEntry.all()
      with open('backup.json', 'w') as f:
          json.dump([task.to_dict() for task in tasks], f)
      

    七、故障排查指南

    现象 可能原因 解决方案
    任务不执行 Redis 连接失败 检查 redbeat_redis_url 网络连通性
    多实例同时运行 锁竞争失败 增加 redbeat_lock_timeout
    任务重复执行 系统时钟不同步 部署 NTP 时间同步服务
    CPU 使用率高 schedule_interval 过小 适当增大检查间隔

    八、官方文档参考

  • RedBeat 官方文档
  • RedBeat GitHub 仓库
  • Celery 分布式任务调度
  • 第五部分 Celery Flower

    一、Flower 核心特性

  • 实时监控:动态展示任务状态、Worker 心跳及队列详情
  • 任务追踪:记录任务历史,支持按结果、参数、状态筛选
  • 远程控制:支持终止任务、重启 Worker 等管理操作
  • Web API:提供 RESTful 接口,便于集成第三方监控系统
  • 可视化统计:生成任务执行时间分布图、成功率统计等图表
  • 安全机制:支持 Basic Auth、OAuth 等认证方式
  • 告警通知:可配置任务失败时触发邮件或 Webhook

  • 二、核心参数详解

    参数 默认值 说明
    --port 5555 监听端口
    --address 0.0.0.0 绑定 IP(0.0.0.0 表示所有网络接口)
    --auth None 认证方式(格式:username:password
    --basic_auth None 多用户认证(格式:user1:pass1,user2:pass2
    --persistent False 启用持久化(保存状态至 SQLite)
    --db flower 持久化数据库文件名
    --max_tasks 10000 内存中保留的最大任务记录数
    --enable_events True 实时接收 Celery 事件
    --xheaders False 支持代理服务器(如 Nginx)的 X-Forwarded 头
    --url_prefix None URL 前缀(用于反向代理场景,如 /flower

    三、配置方法

    安装

    pip install flower
    

    命令行启动

    celery flower -A proj \
      --port=5555 \
      --basic_auth=admin:admin123,user:userpass \
      --persistent=True \
      --db=/data/flower.db
    

    四、核心操作示例

    1. 任务管理

    # 通过 API 终止任务
    import requests
    
    resp = requests.post(
        'http://localhost:5555/api/task/revoke/任务ID',
        auth=('admin', 'admin123')
    )
    print(resp.json())
    

    2. 数据导出

    # 导出所有任务数据
    curl -u admin:admin123 http://localhost:5555/api/tasks > tasks.json
    
    # 查询指定 Worker 状态
    curl -u admin:admin123 http://localhost:5555/api/worker/worker1@host
    

    五、监控与告警

    1. Prometheus 集成

    # Prometheus 配置
    scrape_configs:
      - job_name: 'flower'
        metrics_path: '/metrics'
        static_configs:
          - targets: ['flower-host:5555']
    
    # 关键指标:
    - `flower_workers_total` 存活 Worker 数
    - `flower_tasks_total` 历史任务总数
    - `flower_task_events` 任务状态变化计数器
    

    2. 告警规则

    # Alertmanager 配置示例
    - alert: CeleryWorkerDown
      expr: flower_workers_total < 3
      for: 5m
      labels:
        severity: critical
      annotations:
        summary: "Celery 节点宕机 (实例: {{ $labels.instance }})"
    

    六、最佳实践

    1. 安全加固

    # 强制 HTTPS(需配合反向代理)
    flower --xheaders=true --url_prefix=flower
    
    # 使用 JWT 认证
    flower --auth_provider=flower.jwt.JWTAuth --auth=JWT_SECRET_KEY
    

    2. 高可用部署

    # 多实例负载均衡(需共享持久化存储)
    # 节点1
    flower --persistent --db=/shared-storage/flower.db --port=5555
    
    # 节点2
    flower --persistent --db=/shared-storage/flower.db --port=5555
    

    3. 数据清理策略

    # 自动清理旧数据(需自定义插件)
    from flower.models import Task
    from datetime import datetime, timedelta
    
    def cleanup_old_tasks():
        cutoff = datetime.now() - timedelta(days=7)
        Task.objects.filter(received__lt=cutoff).delete()
    

    七、故障排查指南

    现象 可能原因 解决方案
    无法访问 UI 防火墙限制 检查 --address--port 配置
    图表数据不更新 未启用事件模式 启动时添加 --enable_events=true
    认证失败 特殊字符未转义 使用 URL 编码处理密码中的 @:
    持久化数据丢失 文件权限问题 确保 --db 路径可写
    Prometheus 指标为空 未暴露 metrics 端点 确认启动参数无 --disable_metrics

    八、官方文档参考

  • Flower 官方文档
  • Flower REST API 示例
  • Flower GitHub 仓库
  • 附录 Celery 详细配置参数说明

    1. General settings(通用设置)

    参数名 默认值 说明
    accept_content {'json'} 允许的内容类型/序列化器列表
    result_accept_content None 结果后端允许的内容类型/序列化器列表

    2. Time and date settings(时间和日期设置)

    参数名 默认值 说明
    enable_utc True 是否使用 UTC 时间
    timezone "UTC" 配置 Celery 使用的时区

    3. Task settings(任务设置)

    参数名 默认值 说明
    task_annotations None 用于重写任务属性的注解
    task_compression None 任务消息的默认压缩方式
    task_protocol 2 发送任务时使用的默认任务消息协议版本
    task_serializer "json" 默认序列化方法
    task_publish_retry True 发布任务消息时是否重试
    task_publish_retry_policy 见文档 重试发布任务消息的策略

    4. Task execution settings(任务执行设置)

    参数名 默认值 说明
    task_always_eager False 是否总是本地执行任务
    task_eager_propagates False 是否传播本地执行任务的异常
    task_store_eager_result False 是否保存本地执行任务的结果
    task_remote_tracebacks False 是否在任务结果中包含远程追踪信息
    task_ignore_result False 是否忽略任务的返回值
    task_store_errors_even_if_ignored False 是否即使忽略任务结果也存储错误
    task_track_started False 是否报告任务的 ‘started’ 状态
    task_time_limit 任务的硬时限
    task_soft_time_limit 任务的软时限
    task_acks_late False 是否在任务执行后确认消息
    task_acks_on_failure_or_timeout True 是否在任务失败或超时时确认消息
    task_reject_on_worker_lost False 是否在工作进程丢失时拒绝任务
    task_default_rate_limit 全局默认任务速率限制

    5. Task result backend settings(任务结果后端设置)

    参数名 默认值 说明
    result_backend 存储任务结果的后端
    result_backend_always_retry False 是否总是重试可恢复的异常
    result_backend_max_sleep_between_retries_ms 10000 重试之间的最大睡眠时间
    result_backend_base_sleep_between_retries_ms 10 重试之间的基础睡眠时间
    result_backend_max_retries 无限 最大重试次数
    result_backend_thread_safe False 后端对象是否线程安全
    result_backend_transport_options {} 传递给底层传输的额外选项
    result_serializer "json" 结果序列化格式
    result_compression 结果的可选压缩方式
    result_extended False 是否启用扩展任务结果属性
    result_expires 86400 存储的任务结果的过期时间
    result_cache_max False 客户端缓存结果的总数
    result_chord_join_timeout 3.0 在 chord 中连接结果的超时时间
    result_chord_retry_interval 1.0 chord 任务的默认重试间隔

    6. Database backend settings(数据库后端设置)

    参数名 默认值 说明
    database_engine_options {} SQLAlchemy 数据库引擎的额外选项
    database_short_lived_sessions False 是否使用短生命周期的会话
    database_table_schemas {} 数据库结果后端的表架构自定义
    database_table_names {} 数据库结果后端的表名自定义

    7. RPC backend settings(RPC 后端设置)

    参数名 默认值 说明
    result_persistent False 是否持久化结果消息

    8. Cache backend settings(缓存后端设置)

    参数名 默认值 说明
    cache_backend_options {} 传递给缓存后端的额外选项

    9. MongoDB backend settings(MongoDB 后端设置)

    mongodb_backend_settings = {配置字典}

    参数名 默认值 说明
    database "celery" 连接的数据库名称
    taskmeta_collection "celery_taskmeta" 存储任务元数据的集合名称
    max_pool_size 10 PyMongo 连接的最大池大小
    options {} 传递给 MongoDB 连接构造函数的其他关键字参数

    10. Redis backend settings(Redis 后端设置)

    参数名 默认值 说明
    result_backend 未配置 Redis 后端连接 'redis://username:password@host:port/db'
    redis_backend_health_check_interval 未配置 Redis 后端支持健康检查,设置为两次健康检查之间的秒数
    redis_backend_use_ssl False Redis 后端是否使用 SSL 连接
    redis_max_connections 无限制 Redis 连接池的最大连接数
    redis_socket_connect_timeout None Redis 套接字连接超时时间(秒)
    redis_socket_timeout 120.0 Redis 套接字读写操作的超时时间
    redis_retry_on_timeout False 在 Redis 套接字超时时是否重试读写操作
    redis_socket_keepalive False 是否保持 Redis 套接字连接的活动状态

    10. File-system backend settings(文件系统 后端设置)

    参数名 默认值 说明
    result_backend 未配置 文件系统后端连接 'file:///var/celery/results'

    11. Message Routing(消息路由)

    参数名 默认值 说明
    task_queues None 工作进程将从中消费的任务队列列表
    task_routes None 用于将任务路由到队列的路由表
    task_queue_max_priority None 队列的最大优先级
    task_default_priority None 任务的默认优先级
    task_inherit_parent_priority False 子任务是否继承父任务的优先级
    worker_direct False 是否启用直接队列
    task_create_missing_queues True 是否自动创建缺失的队列
    task_default_queue "celery" 默认队列名称
    task_default_exchange 使用 task_default_queue 的值 默认交换机名称
    task_default_exchange_type "direct" 默认交换机类型
    task_default_routing_key 使用 task_default_queue 的值 默认路由键
    task_default_delivery_mode "persistent" 默认投递模式

    12. Broker settings(消息中间件设置)

    参数名 默认值 说明
    broker_url "amqp://localhost" 消息中间件地址
    broker_read_url broker_url 继承 用于读取操作的 broker 连接地址
    broker_write_url broker_url 继承 用于写入操作的 broker 连接地址
    broker_failover_strategy "round-robin" broker 连接失败时的备用策略
    broker_heartbeat 120.0 broker 连接的心跳间隔
    broker_heartbeat_checkrate 2.0 broker 心跳检查的频率
    broker_use_ssl False 是否使用 SSL 连接 broker
    broker_pool_limit 10 broker 连接池的最大连接数
    broker_connection_timeout 4.0 broker 连接超时时间
    broker_connection_retry True 是否在连接失败时重试连接
    broker_connection_retry_on_startup True 在启动时是否重试连接
    broker_connection_max_retries 100 连接失败时的最大重试次数
    broker_channel_error_retry False 是否在通道错误时重试连接
    broker_login_method "AMQPLAIN" broker 登录方法
    broker_transport_options {} 传递给底层传输的额外选项

    13. Worker(工作进程)

    参数名 默认值 说明
    imports [] 工作进程启动时导入的模块列表
    include [] imports 类似的模块导入列表
    worker_deduplicate_successful_tasks False 是否检查重复的任务
    worker_concurrency CPU 核心数 并发工作进程/线程数
    worker_prefetch_multiplier 4 每个工作进程预取的消息数
    worker_enable_prefetch_count_reduction True 是否在连接丢失后减少预取数
    worker_lost_wait 10.0 等待丢失工作进程的时间
    worker_max_tasks_per_child 无限制 每个工作进程的最大任务数
    worker_max_memory_per_child 无限制 每个工作进程的最大内存
    worker_disable_rate_limits False 是否禁用所有任务的速率限制
    worker_state_db None 存储持久工作进程状态的文件名
    worker_timer_precision 1.0 定时器的精度
    worker_enable_remote_control True 是否启用远程控制
    worker_proc_alive_timeout 4.0 等待新工作进程启动的时间
    worker_cancel_long_running_tasks_on_connection_loss False 是否在连接丢失时取消长运行任务

    14. Events(事件)

    参数名 默认值 说明
    worker_send_task_events False 是否发送任务相关事件
    task_send_sent_event False 是否发送任务发送事件
    event_queue_ttl 5.0 事件队列中消息的过期时间
    event_queue_expires 60.0 事件队列的过期时间
    event_queue_prefix "celeryev" 事件接收队列名称的前缀
    event_exchange "celeryev" 事件交换机名称
    event_serializer "json" 事件消息的序列化格式
    events_logfile None 事件日志文件路径
    events_pidfile None 事件进程 ID 文件路径
    events_uid None 事件进程的用户 ID
    events_gid None 事件进程的组 ID
    events_umask None 事件进程创建文件时的 umask
    events_executable None 事件进程使用的 Python 可执行路径

    15. Remote Control Commands(远程控制命令)

    参数名 默认值 说明
    control_queue_ttl 300.0 远程控制命令队列中消息的过期时间
    control_queue_expires 10.0 远程控制命令队列的过期时间
    control_exchange "celery" 远程控制命令交换机名称

    16. Logging(日志)

    | worker_hijack_root_logger | True | 是否劫持根日志记录器 |
    | worker_log_color | True | 是否在日志输出中使用颜色 |
    | worker_log_format | "[%(asctime)s: %(levelname)s/%(processName)s] %(message)s" | 工作进程日志格式 |
    | worker_task_log_format | "[%(asctime)s: %(levelname)s/%(processName)s] %(task_name)s[%(task_id)s]: %(message)s" | 任务日志格式 |
    | worker_redirect_stdouts | True | 是否将标准输出和标准错误重定向到日志 |
    | worker_redirect_stdouts_level | WARNING | 重定向的标准输出和标准错误的日志级别 |

    17. Security(安全)

    参数名 默认值 说明
    security_key None 用于消息签名的私钥文件路径
    security_key_password None 私钥文件的密码
    security_certificate None 用于消息签名的证书文件路径
    security_cert_store None 存储 X.509 证书的目录路径
    security_digest sha256 消息签名使用的加密摘要算法

    18. Custom Component Classes(自定义组件类)

    参数名 默认值 说明
    worker_pool "prefork" 工作进程池的类名
    worker_pool_restarts False 是否允许重启工作进程池
    worker_autoscaler "celery.worker.autoscale:Autoscaler" 自动缩放器类名
    worker_consumer "celery.worker.consumer:Consumer" 消费者类名
    worker_timer "kombu.asynchronous.hub.timer:Timer" 定时器类名
    worker_logfile None 工作进程日志文件路径
    worker_pidfile None 工作进程 ID 文件路径
    worker_uid None 工作进程的用户 ID
    worker_gid None 工作进程的组 ID
    worker_umask None 工作进程创建文件时的 umask
    worker_executable None 工作进程使用的 Python 可执行路径

    19. Beat settings(定时任务设置)

    参数名 默认值 说明
    beat_schedule {} 定时任务调度表
    beat_scheduler "celery.beat:PersistentScheduler" 定时任务调度器类名
    beat_schedule_filename "celerybeat-schedule" 定时任务调度文件名
    beat_sync_every 0 定时任务同步间隔
    beat_max_loop_interval 0 定时任务循环的最大间隔
    beat_cron_starting_deadline None 定时任务的启动截止时间
    beat_logfile None 定时任务日志文件路径
    beat_pidfile None 定时任务进程 ID 文件路径
    beat_uid None 定时任务进程的用户 ID
    beat_gid None 定时任务进程的组 ID
    beat_umask None 定时任务进程创建文件时的 umask
    beat_executable None 定时任务进程使用的 Python 可执行路径

    官方文档参考

  • Celery 参数列表
  • 作者:船长@Quant

    物联沃分享整理
    物联沃-IOTWORD物联网 » Celery与Redis结合:Python分布式任务队列与异步处理的高效实践

    发表回复