Python Celery快速入门教程

Celery 是一个简单、灵活且可靠的分布式任务队列框架,用于处理大量的异步任务、定时任务等。它允许你将任务发送到消息队列,然后由后台的工作进程(worker)来执行这些任务,并且支持多种消息中间件,如 RabbitMQ、Redis 等。

Celery简介

Celery是一个简单、灵活、可靠的分布式系统,可以处理大量的消息,同时为操作提供维护这样一个系统所需的工具。

Celery有很多应用场景,典型示例如下:

  • 发送电子邮件:可以将发送电子邮件的任务交给Celery,并向用户显示一个感谢页面,而不是让用户在填写完注册表格后等待。你可能会说,执行电子邮件发送代码不需要花费时间,但是如果电子邮件服务器没有响应,如果将这部分设置为同步,站点访问者将不得不等待,直到超时发生。
  • 图片/其他文件上传任务:现在通过网页上传图片或其他类型的文档是非常常见的。假设希望提供一个工具来上传包含产品图像的产品信息,同时还需要根据要求调整图像大小,并增加与品牌相关的水印,用户在所有这些操作期间等待看起来不太好。他想要的只是看到他的过程已经完成的文本,然后继续前进。你可以创建多个celery任务来实现目标。
  • 计划任务:celery也可以作为一个调度程序,执行周期性任务。
  • Celery主要概念

    Celery基本架构如下图所示:

    生产者:这个应用程序负责推送消息与所有需要的信息。

    Broker: 这个模块实际上是作为消息队列服务的,像Redis或RabbitMQ这样的应用程序可以在这里使用。

    任务:任务是序列化后在代理中排队的Python函数或任务。然后,任务函数由负责反序列化并执行它的工作程序挑选。默认的序列化格式是JSON,您可以将其更改为msgpack, YAML或pickle。

    后端:该组件负责存储函数产生的结果.

    环境准备

  • 安装redis
  • 这里为了快速演示,直接适用docker容器:

    docker run -d -p 6379:6379 redis
    
  • 安装依赖
  • 首先安装 Celery 和 Redis(python连接redis):

    pip install celery redis
    

    基本示例

  • 创建任务
  • 创建项目(如使用poetry工具),以下是简单Celery任务模块示例:

    # tasks.py 文件
    from celery import Celery
    
    # 创建Celery实例,指定名称和消息中间件(这里是Redis)的URL
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def add(x, y):
        return x + y
    

    首先导入Celery类,然后创建一个Celery实例,名称为tasks,并指定broker(消息中间件)为本地 Redis 服务器(redis://localhost:6379/0)。

    定义了一个名为add的任务,它是一个被@app.task装饰的函数。这个任务接收两个参数xy,并返回它们的和。

  • 启动Celery工作进程
  • 在终端运行命令启动Celery工作进程:

    celery -A tasks worker --loglevel=info
    

    这里-A tasks表示任务模块是tasks.pyworker表示启动工作进程,--loglevel=info设置日志级别为info,这样可以看到任务执行的相关信息。

  • 调用任务
  • 在另一个python脚本(如:main.py)中调用任务:

    # main.py
    from tasks import add
    
    # 异步调用任务
    result = add.delay(66, 4)
    print("任务已发送,等待结果...")
    # 获取任务结果
    print("结果:", result.get())
    

    首先从tasks.py模块中导入add任务。

    然后使用add.delay(66, 4)异步调用add任务,传递参数664。这会将任务发送到消息队列,由 Celery 工作进程来执行。接着打印出任务已发送的消息,等待任务执行结果。

    最后,使用result.get()获取任务的最终结果。当任务还没有执行完成时,get()方法会阻塞,直到任务完成并返回结果。在这里,最终会打印出70,即66 + 4的结果。

    定时任务示例

    要定时执行任务,我们需要重构task模块,添加定时配置:

    # tasks.py 文件
    from celery import Celery
    from celery.schedules import crontab
    
    app = Celery('tasks', broker='redis://localhost:6379/0')
    
    @app.task
    def add(x, y):
        return x + y
    
    @app.on_after_configure.connect
    def setup_periodic_tasks(sender, **kwargs):
        # 每30秒执行一次add任务,参数为4和6
        sender.add_periodic_task(30.0, add.s(66, 4))
    

    在这里,新增了一个setup_periodic_tasks函数,它通过@app.on_after_configure.connect装饰器连接到 Celery 的配置完成后的事件。在这个函数中,使用sender.add_periodic_task来添加一个定时任务,每隔30秒执行一次add任务,参数为664。add_periodic_task函数表示增加周期性任务,第一个参数以秒为单位,5分钟可以设置为300。第二个参数表示任务,这里是add.s(66, 4),其中add是之前定义好的 Celery 任务函数(被@app.task装饰过的函数),而.s是 Celery 的一种语法糖,用于对任务进行签名(signature)操作,它可以固定任务执行时的参数。

  • 重新启动Celery工作进程
  • 像之前一样,在终端中运行celery -A tasks worker --loglevel=info来启动工作进程。同时,你还可以启动一个 Celery 调度器(beat)来管理定时任务。这样,Celery 就会按照配置的时间间隔定期执行add任务,并且可以在工作进程的日志中看到任务执行的记录。

    运行任务过程

    首先在命令行输出任务ID:

    e8460939-7bff-4541-8843-c22448ba81a6
    

    在redis中有记录:

    {"body": "W1s2NiwgNF0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==", "content-encoding": "utf-8", "content-type": "application/json", "headers": {"lang": "py", "task": "Add two numbers", "id": "020e9130-e22a-4cea-8463-ba95c2a03103", "shadow": null, "eta": null, "expires": null, "group": null, "group_index": null, "retries": 0, "timelimit": [null, null], "root_id": "020e9130-e22a-4cea-8463-ba95c2a03103", "parent_id": null, "argsrepr": "(66, 4)", "kwargsrepr": "{}", "origin": "gen17869@LAPTOP-F569632U", "ignore_result": false, "replaced_task_nesting": 0, "stamped_headers": null, "stamps": {}}, "properties": {"correlation_id": "020e9130-e22a-4cea-8463-ba95c2a03103", "reply_to": "d40ebae3-1285-35bf-a96f-bdde29d95121", "delivery_mode": 2, "delivery_info": {"exchange": "", "routing_key": "celery"}, "priority": 0, "body_encoding": "base64", "delivery_tag": "d2cd8663-fba8-458f-aee7-c0342ad3253e"}}
    

    body内容是base64编码,对W1s2NiwgNF0sIHt9LCB7ImNhbGxiYWNrcyI6IG51bGwsICJlcnJiYWNrcyI6IG51bGwsICJjaGFpbiI6IG51bGwsICJjaG9yZCI6IG51bGx9XQ==进行解码,内容如下:

    [[66, 4], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]
    

    注意值为66和4的列表。这些是你传递给函数的参数。其他信息,如任务名称、语言等,也可以在上面的JSON中看到。

    到目前为止,你所做的就是把数据推入队列,然后序列化。它还没有被消费,因此你需要启动工人。你像下面这样调用工人(如果之前没有启动,现在启动):

    celery -A tasks worker --loglevel=INFO
    

    -A 指定应用程序名称,这里是tasks。该名称本身来自任务文件tasks.py的名称。然后告诉你希望调用worker和日志级别。

    这时你可以注意到任务Add两个数字以及任务id,它首先被接收并成功执行。注意70这个数字,它是66和4的和。因此,推送到队列中的任务不一定要立即处理。

    由于我们也使用后端,所以任务执行的结果存储如下:

    [2024-12-30 20:55:56,562: INFO/MainProcess] Task Add two numbers[e8460939-7bff-4541-8843-c22448ba81a6] received
    [2024-12-30 20:55:56,566: INFO/ForkPoolWorker-14] Task Add two numbers[e8460939-7bff-4541-8843-c22448ba81a6] succeeded in 0.003371259997948073s: 70
    

    总结

    对于那些可以推迟的任务,Celery是很好的选择。其灵活的架构使其可用于多种用途。我只是讨论了它的基本用法。后续我们继续分享,一起学习Celery。

    作者:梦想画家

    物联沃分享整理
    物联沃-IOTWORD物联网 » Python Celery快速入门教程

    发表回复