APScheduler(Python 单体任务调度框架)一文拿下
https://github.com/agronholm/apscheduler/tree/3.x
https://apscheduler.readthedocs.io/en/3.x/
master 分支改动较大,目前(2025.02.10)还没发布新版本,本文基于3.x版本
pip install APScheduler
3.x
triggers(触发器):包含调度逻辑。每个作业都有自己的触发器,该触发器确定下一步应在何时运行该作业。除了其初始配置外,触发器完全是无状态的。
job stores(任务存储器):任务存储器是可以存储任务的地方,默认情况下任务保存在内存,也可将任务保存在各种数据库中。任务存储进去后,会进行序列化,然后也可以反序列化提取出来,继续执行。
job:任务对象
jobstore:持久化 job
executor(执行器):执行器会将任务放到进程或线程中执行,任务执行完成后,执行程序会通知调度器,然后触发一些事件
scheduler(调度器):任务调度器是属于整个调度的总指挥官。他会合理安排作业存储器、执行器、触发器进行工作,并进行添加和删除任务等。调度器通常是只有一个的。事件机制:https://apscheduler.readthedocs.io/en/3.x/modules/events.html#event-codes
不支持分布式调度
https://apscheduler.readthedocs.io/en/latest/faq.html#how-do-i-share-a-single-job-store-among-one-or-more-worker-processes
一个关于分布式调度的讨论:https://www.cnblogs.com/leffss/p/11912364.html
创建任务
任务调度
https://blog.csdn.net/kobepaul123/article/details/123616575
Job
来确定是否可执行
发时间
Job Store
https://apscheduler.readthedocs.io/en/latest/modules/jobstores/sqlalchemy.html
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
# tablename 若存在则不会再进行表创建
SCHEDULER = BackgroundScheduler(
jobstores={
'default': SQLAlchemyJobStore(
tablename='abc',
url='postgresql://127.0.1.1:)
})
3.x 版本中 SQLAlchemyJobStore 不支持异步操作,在使用异步web框架时,这一点可能是一个坑
trigger
# 任务时间参数设置
# year (int|str) – 年,4位数字
# month (int|str) – 月 (范围1-12)
# day (int|str) – 日 (范围1-31)
# week (int|str) – 周 (范围1-53)
# day_of_week (int|str) – 周内第几天或者星期几 (范围0-6 或者 mon,tue,wed,thu,fri,sat,sun)
# hour (int|str) – 时 (范围0-23)
# minute (int|str) – 分 (范围0-59)
# second (int|str) – 秒 (范围0-59)
# start_date (datetime|str) – 最早开始日期(包含)
# end_date (datetime|str) – 最晚结束时间(包含)
# timezone (datetime.tzinfo|str) – 指定时区
# CronTrigger
scheduler.add_job(job_func, 'cron', month='1-3,7-9',day='0, tue', hour='0-3')
# IntervalTrigger
# 每隔两分钟执行一次 job_func 方法
scheduler.add_job(job_func, 'interval', minutes=2)
# 在 2017-12-13 14:00:01 ~ 2017-12-13 14:00:10 之间, 每隔两分钟执行一次 job_func 方法
scheduler.add_job(job_func, 'interval', minutes=2, start_date='2017-12-13 14:00:01' , end_date='2017-12-13 14:00:10')
DateTrigger
# 在 2017-12-13 时刻运行一次 job_func 方法
scheduler.add_job(job_func, 'date', run_date=date(2017, 12, 13), args=['text'])
# 在 2017-12-13 14:00:00 时刻运行一次 job_func 方法
scheduler.add_job(job_func, 'date', run_date=datetime(2017, 12, 13, 14, 0, 0), args=['text'])
# 在 2017-12-13 14:00:01 时刻运行一次 job_func 方法
scheduler.add_job(job_func, 'date', run_date='2017-12-13 14:00:01', args=['text'])
executor
scheduler
scheduler 手动控制任务
job.remove
等操作方式本质上是调用该任务的scheduler
对应的remove_job
方法,故任务的控制由 scheduler 完成。
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
# 添加任务
# 使用装饰器
@scheduler.scheduled_job('interval', seconds=1)
def test_decorator_job():
print(f'{datetime.now():%H:%M:%S} Test decorator job')
# 手动添加
def test_job():
print(f'{datetime.now():%H:%M:%S} Test job')
job = scheduler.add_job(test_job, 'cron', id='test_job', second=1)
#
scheduler.get_jobs()
job = scheduler.get_job('test_job')
# 修改job(仅可修改job_id之外的字段)
scheduler.modify_job('test_job', minutes=5)
job.modify(minutes=5)
# 停止
scheduler.pause_job('test_job')
job.pause()
scheduler.resume_job('test_job')
job.resume()
# 删除
scheduler.remove_job('test_job')
job.remove()
scheduler.start()
if __name__ == '__main__':
job = scheduler.add_job(test_job, 'cron', id='test', second=1)
job.remove()
scheduler.start()
# 删除任务
# scheduler.remove_job('task_id')
# 停止任务
# scheduler.pause_job('task_id')
# 恢复任务
# scheduler.resume_job('task_id')
BlockingScheduler
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
scheduler = BlockingScheduler()
@scheduler.scheduled_job('interval', seconds=1)
def test_decorator_job():
print(f'{datetime.now():%H:%M:%S} Test decorator job')
def test_job():
print(f'{datetime.now():%H:%M:%S} Test job')
if __name__ == '__main__':
job = scheduler.add_job(test_job, 'cron', id='test', second=1)
job.remove()
scheduler.start()
AsyncIOScheduler
import asyncio
from datetime import datetime
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def my_task():
print(f'{datetime.now():%H:%M:%S} Test job')
async def main():
scheduler = AsyncIOScheduler()
scheduler.add_job(my_task, 'interval', seconds=5)
scheduler.start()
while True:
await asyncio.sleep(0) # skip one event loop run cycle
if __name__ == '__main__':
asyncio.run(main())
集成 FastAPI
import logging
from contextlib import asynccontextmanager
from datetime import datetime
from urllib.parse import quote_plus
import uvicorn
from apscheduler.events import EVENT_ALL, JobEvent
from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI
from sqlalchemy import create_engine
def apscheduler_logging(level=logging.DEBUG):
logging.basicConfig()
logging.getLogger('apscheduler').setLevel(level)
apscheduler_logging()
SQLALCHEMY_DATABASE_URL = f'mysql+pymysql://root:{quote_plus("mYsql123456_")}@localhost:3306/dev'
ASYNC_SQLALCHEMY_DATABASE_URL = f'mysql+aiomysql://root:{quote_plus("mYsql123456_")}@localhost:3306/dev'
engine = create_engine(SQLALCHEMY_DATABASE_URL)
jobstores = {
'default': SQLAlchemyJobStore(engine=engine)
}
executors = {
'default': AsyncIOExecutor() # AsyncIOScheduler 使用 AsyncIOExecutor 执行器
}
job_defaults = {
'coalesce': False, # 是否合并执行
'max_instances': 1, # 最大实例数
}
scheduler = AsyncIOScheduler()
scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults)
def listener(event: JobEvent) -> None:
print(event)
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler.add_listener(listener, EVENT_ALL)
scheduler.start()
yield
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
async def test_job():
print(f'{datetime.now():%H:%M:%S} Test job')
@app.get("/jobs")
async def get_jobs():
return [item.id for item in scheduler.get_jobs()]
@app.post("/jobs")
async def post_jobs():
job = scheduler.add_job(test_job, IntervalTrigger(seconds=1), name="mydemo", id="mydemo")
return dict(id=job.id, name=job.name, next_run_time=job.next_run_time)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
服务化项目参考:
作者:予早随笔