APScheduler(Python 单体任务调度框架)一文拿下

https://github.com/agronholm/apscheduler/tree/3.x

https://apscheduler.readthedocs.io/en/3.x/

master 分支改动较大,目前(2025.02.10)还没发布新版本,本文基于3.x版本

pip install APScheduler

3.x

triggers(触发器):包含调度逻辑。每个作业都有自己的触发器,该触发器确定下一步应在何时运行该作业。除了其初始配置外,触发器完全是无状态的。
job stores(任务存储器):任务存储器是可以存储任务的地方,默认情况下任务保存在内存,也可将任务保存在各种数据库中。任务存储进去后,会进行序列化,然后也可以反序列化提取出来,继续执行。

job:任务对象

jobstore:持久化 job

executor(执行器):执行器会将任务放到进程或线程中执行,任务执行完成后,执行程序会通知调度器,然后触发一些事件

scheduler(调度器):任务调度器是属于整个调度的总指挥官。他会合理安排作业存储器、执行器、触发器进行工作,并进行添加和删除任务等。调度器通常是只有一个的。事件机制:https://apscheduler.readthedocs.io/en/3.x/modules/events.html#event-codes

不支持分布式调度

https://apscheduler.readthedocs.io/en/latest/faq.html#how-do-i-share-a-single-job-store-among-one-or-more-worker-processes

一个关于分布式调度的讨论:https://www.cnblogs.com/leffss/p/11912364.html

创建任务

任务调度

https://blog.csdn.net/kobepaul123/article/details/123616575

Job

  • id:指定作业的唯一ID
  • name:指定作业的名字
  • trigger:apscheduler定义的触发器,用于确定Job的执行时间,根据设置的trigger规则,计算得到下次执行此job的时间, 满足时将会执行
  • executor:apscheduler定义的执行器,job创建时设置执行器的名字,根据字符串你名字到scheduler获取到执行此
  • job的 执行器,执行job指定的函数
  • max_instances:执行此job的最大实例数,executor执行job时,根据job的id来计算执行次数,根据设置的最大实例数
    来确定是否可执行
  • next_run_time:Job下次的执行时间,创建Job时可以指定一个时间[datetime],不指定的话则默认根据trigger获取触
    发时间
  • misfire_grace_time:Job的延迟执行时间,例如Job的计划执行时间是21:00:00,但因服务重启或其他原因导致 21:00:31才执行,如果设置此key为40,则该job会继续执行,否则将会丢弃此job
  • coalesce:Job是否合并执行,是一个bool值。例如scheduler停止20s后重启启动,而job的触发器设置为5s执行一次,因此此job错过了4个执行时间,如果设置为是,则会合并到一次执行,否则会逐个执行
  • func:Job执行的函数
  • args:Job执行函数需要的位置参数
  • kwargs:Job执行函数需要的关键字参数
  • Job Store

    https://apscheduler.readthedocs.io/en/latest/modules/jobstores/sqlalchemy.html

    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.schedulers.background import BackgroundScheduler
    
    # tablename 若存在则不会再进行表创建
    SCHEDULER = BackgroundScheduler(
        jobstores={
            'default': SQLAlchemyJobStore(
                tablename='abc',
                url='postgresql://127.0.1.1:)
        })
    

    3.x 版本中 SQLAlchemyJobStore 不支持异步操作,在使用异步web框架时,这一点可能是一个坑

    trigger

  • CronTrigger
  • IntervalTrigger
  • DateTrigger
  • # 任务时间参数设置
    # year (int|str) – 年,4位数字
    # month (int|str) – 月 (范围1-12)
    # day (int|str) – 日 (范围1-31)
    # week (int|str) – 周 (范围1-53)
    # day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
    # hour (int|str) – 时 (范围0-23)
    # minute (int|str) – 分 (范围0-59)
    # second (int|str) – 秒 (范围0-59)
    # start_date (datetime|str) – 最早开始日期(包含)
    # end_date (datetime|str) – 最晚结束时间(包含)
    # timezone (datetime.tzinfo|str) – 指定时区
    
    # CronTrigger
    scheduler.add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')
    
    # IntervalTrigger
    # 每隔两分钟执行一次 job_func 方法
    scheduler.add_job(job_func, 'interval', minutes=2)
    # 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
    scheduler.add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
    
    DateTrigger
    # 在 2017-12-13 时刻运行一次 job_func 方法
    scheduler.add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
    # 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法
    scheduler.add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
    # 在 2017-12-13 14:00:01 时刻运行一次 job_func 方法
    scheduler.add_job(job_func, 'date', run_date='2017-12-13 14:00:01', args=['text'])
    

    executor

    scheduler

  • BlockingScheduler:适用于调度程序是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回。
  • BackgroundScheduler:适用于调度程序在应用程序的后台运行,调用start后主线程不会阻塞。
  • AsyncIOScheduler:适用于使用了 asyncio 模块的应用程序。
  • GeventScheduler:适用于使用 gevent 模块的应用程序。
  • TwistedScheduler:适用于构建Twisted的应用程序。
  • QtScheduler:适用于构建Qt的应用程序。
  • scheduler 手动控制任务

    job.remove等操作方式本质上是调用该任务的scheduler对应的remove_job方法,故任务的控制由 scheduler 完成。

    from datetime import datetime
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    scheduler = BlockingScheduler()
    
    
    # 添加任务
    # 使用装饰器
    @scheduler.scheduled_job('interval', seconds=1)
    def test_decorator_job():
        print(f'{datetime.now():%H:%M:%S} Test decorator job')
    
    
    # 手动添加
    def test_job():
        print(f'{datetime.now():%H:%M:%S} Test job')
    
    
    job = scheduler.add_job(test_job, 'cron', id='test_job', second=1)
    
    #
    scheduler.get_jobs()
    
    job = scheduler.get_job('test_job')
    
    # 修改job(仅可修改job_id之外的字段)
    scheduler.modify_job('test_job', minutes=5)
    job.modify(minutes=5)
    
    # 停止
    scheduler.pause_job('test_job')
    job.pause()
    
    scheduler.resume_job('test_job')
    job.resume()
    
    # 删除
    scheduler.remove_job('test_job')
    job.remove()
    
    scheduler.start()
    
    if __name__ == '__main__':
        job = scheduler.add_job(test_job, 'cron', id='test', second=1)
        job.remove()
        scheduler.start()
        # 删除任务
        # scheduler.remove_job('task_id')
        # 停止任务
        # scheduler.pause_job('task_id')
        # 恢复任务
        # scheduler.resume_job('task_id')
    
    

    BlockingScheduler

    from datetime import datetime
    
    from apscheduler.schedulers.blocking import BlockingScheduler
    
    scheduler = BlockingScheduler()
    
    
    @scheduler.scheduled_job('interval', seconds=1)
    def test_decorator_job():
        print(f'{datetime.now():%H:%M:%S} Test decorator job')
    
    
    def test_job():
        print(f'{datetime.now():%H:%M:%S} Test job')
    
    
    if __name__ == '__main__':
        job = scheduler.add_job(test_job, 'cron', id='test', second=1)
        job.remove()
        scheduler.start()
    

    AsyncIOScheduler

    import asyncio
    from datetime import datetime
    
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    
    
    async def my_task():
        print(f'{datetime.now():%H:%M:%S} Test job')
    
    
    async def main():
        scheduler = AsyncIOScheduler()
        scheduler.add_job(my_task, 'interval', seconds=5)
        scheduler.start()
        while True:
            await asyncio.sleep(0)  # skip one event loop run cycle
    
    
    if __name__ == '__main__':
        asyncio.run(main())
    
    

    集成 FastAPI

    import logging
    from contextlib import asynccontextmanager
    from datetime import datetime
    from urllib.parse import quote_plus
    
    import uvicorn
    from apscheduler.events import EVENT_ALL, JobEvent
    from apscheduler.executors.asyncio import AsyncIOExecutor
    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
    from apscheduler.schedulers.asyncio import AsyncIOScheduler
    from apscheduler.triggers.interval import IntervalTrigger
    from fastapi import FastAPI
    from sqlalchemy import create_engine
    
    
    def apscheduler_logging(level=logging.DEBUG):
        logging.basicConfig()
        logging.getLogger('apscheduler').setLevel(level)
    
    
    apscheduler_logging()
    
    SQLALCHEMY_DATABASE_URL = f'mysql+pymysql://root:{quote_plus("mYsql123456_")}@localhost:3306/dev'
    ASYNC_SQLALCHEMY_DATABASE_URL = f'mysql+aiomysql://root:{quote_plus("mYsql123456_")}@localhost:3306/dev'
    
    engine = create_engine(SQLALCHEMY_DATABASE_URL)
    
    jobstores = {
        'default': SQLAlchemyJobStore(engine=engine)
    }
    
    executors = {
        'default': AsyncIOExecutor()  # AsyncIOScheduler 使用 AsyncIOExecutor 执行器
    }
    
    job_defaults = {
        'coalesce': False,  # 是否合并执行
        'max_instances': 1,  # 最大实例数
    }
    
    scheduler = AsyncIOScheduler()
    scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
    
    
    def listener(event: JobEvent) -> None:
        print(event)
    
    
    @asynccontextmanager
    async def lifespan(app: FastAPI):
        scheduler.add_listener(listener, EVENT_ALL)
        scheduler.start()
        yield
        scheduler.shutdown()
    
    
    app = FastAPI(lifespan=lifespan)
    
    
    async def test_job():
        print(f'{datetime.now():%H:%M:%S} Test job')
    
    
    @app.get("/jobs")
    async def get_jobs():
        return [item.id for item in scheduler.get_jobs()]
    
    
    @app.post("/jobs")
    async def post_jobs():
        job = scheduler.add_job(test_job, IntervalTrigger(seconds=1), name="mydemo", id="mydemo")
        return dict(id=job.id, name=job.name, next_run_time=job.next_run_time)
    
    
    if __name__ == "__main__":
        uvicorn.run(app, host="0.0.0.0", port=8000)
    
    

    服务化项目参考:

  • https://github.com/guomaoqiu/JobCenter
  • 作者:予早随笔

    物联沃分享整理
    物联沃-IOTWORD物联网 » APScheduler(Python 单体任务调度框架)一文拿下

    发表回复