python-定时任务模块schedule使用指南
python 中的 schedule 库是一个轻量级的定时任务调度库,可以让你轻松地管理周期性任务。你可以通过这个库定时运行任务,比如每隔一定时间执行一次某个函数,或每天定时执行某个任务。下面将详细介绍 schedule 模块的用法。
1. 安装 schedule 库
首先,你需要安装 schedule 库。可以通过 pip 安装:
pip install schedule
2. 基本使用
python-schedule 通过 schedule 模块提供了一个简单的 API 来安排任务。核心思想是将函数与定时规则(比如每隔多少分钟、每小时、每天等)结合。
示例:每隔 10 秒执行一次任务
import schedule
import time
# 定义要执行的任务
def job():
print("任务正在执行...")
# 安排任务
schedule.every(10).seconds.do(job)
# 持续运行调度器
while True:
schedule.run_pending() # 检查是否有待执行的任务
time.sleep(1) # 每秒检查一次
在上面的代码中,schedule.every(10).seconds.do(job) 会安排 job 函数每隔 10 秒执行一次。schedule.run_pending() 检查是否有任务需要执行,time.sleep(1) 控制检查的频率。
3. 常见时间间隔的设置
python-schedule 支持多种常见的时间间隔设置,包括:秒、分钟、小时、天、周等。
示例:设置任务的不同时间间隔
import schedule
import time
def job():
print("任务正在执行...")
# 每分钟执行一次
schedule.every(1).minute.do(job)
# 每小时执行一次
schedule.every().hour.do(job)
# 每天 9:00 执行一次
schedule.every().day.at("09:00").do(job)
# 每周一早上 8:30 执行一次
schedule.every().monday.at("08:30").do(job)
# 每月 1 号执行一次
schedule.every(1).month.do(job)
while True:
schedule.run_pending()
time.sleep(1)
在这段代码中,任务将按照以下规则定时执行:
4. 取消任务
你可以通过 schedule.cancel_job() 来取消已安排的任务。
示例:取消任务
import schedule
import time
def job():
print("任务正在执行...")
# 安排一个任务
job1 = schedule.every(5).seconds.do(job)
# 取消任务
schedule.cancel_job(job1)
while True:
schedule.run_pending()
time.sleep(1)
在上面的代码中,job1 任务被安排每隔 5 秒执行一次,但在之后通过 schedule.cancel_job(job1) 被取消了。即使调度器还在运行,该任务也不会再被执行。
5. 任务间隔的复杂设置
你可以设置更复杂的定时任务间隔,比如每隔一段时间执行某个任务,但只能在特定的时间段内执行。
示例:设置特定的任务间隔
import schedule
import time
def job():
print("任务正在执行...")
# 每小时的第 15 分钟执行任务
schedule.every().hour.at(":15").do(job)
# 每周的周五执行任务
schedule.every().friday.do(job)
while True:
schedule.run_pending()
time.sleep(1)
在上面的代码中,任务将安排在以下时刻执行:
6. 多任务调度
你可以安排多个任务在不同的时间间隔下执行。
示例:多个任务调度
import schedule
import time
def job1():
print("任务 1 正在执行...")
def job2():
print("任务 2 正在执行...")
# 安排任务
schedule.every(3).seconds.do(job1)
schedule.every(5).seconds.do(job2)
while True:
schedule.run_pending()
time.sleep(1)
在这段代码中,job1 每 3 秒执行一次,而 job2 每 5 秒执行一次,两个任务会同时运行。
7. 限制任务执行的次数
有时你可能希望限制任务执行的次数,可以通过 job.tag() 来实现。
示例:限制任务执行次数
import schedule
import time
def job():
print("任务正在执行...")
# 安排任务,每 2 秒执行一次
job1 = schedule.every(2).seconds.do(job)
# 只执行 3 次
for i in range(3):
schedule.run_pending()
time.sleep(1)
在此示例中,任务只执行 3 次。你也可以通过其他方式来设置次数,具体取决于实际需求。
8. 调度的高级功能
使用 Job.tag 添加标记
你可以给任务添加标签,以便在以后可以通过标签来取消或修改任务。
import schedule
import time
def job():
print("任务正在执行...")
# 安排任务并添加标签
job1 = schedule.every(3).seconds.do(job)
job1.tag('important')
# 根据标签取消任务
for job in schedule.get_jobs():
if 'important' in job.tags:
schedule.cancel_job(job)
while True:
schedule.run_pending()
time.sleep(1)
9. 处理异常和错误
你可以使用 try…except 语句来捕捉任务执行中的异常。
import schedule
import time
def job():
try:
print("任务开始执行...")
# 模拟可能会抛出异常的操作
1 / 0 # 故意制造一个异常
except Exception as e:
print(f"任务发生错误: {e}")
# 安排任务
schedule.every(5).seconds.do(job)
while True:
schedule.run_pending()
time.sleep(1)
在这个例子中,任务中故意抛出了异常,但程序会继续运行并捕获错误。
总结
作者:网络风云