python-定时任务模块schedule使用指南

python 中的 schedule 库是一个轻量级的定时任务调度库,可以让你轻松地管理周期性任务。你可以通过这个库定时运行任务,比如每隔一定时间执行一次某个函数,或每天定时执行某个任务。下面将详细介绍 schedule 模块的用法。

1. 安装 schedule 库

首先,你需要安装 schedule 库。可以通过 pip 安装:

pip install schedule

2. 基本使用

python-schedule 通过 schedule 模块提供了一个简单的 API 来安排任务。核心思想是将函数与定时规则(比如每隔多少分钟、每小时、每天等)结合。

示例:每隔 10 秒执行一次任务
import schedule
import time


# 定义要执行的任务
def job():
    print("任务正在执行...")

# 安排任务
schedule.every(10).seconds.do(job)

# 持续运行调度器
while True:
    schedule.run_pending()  # 检查是否有待执行的任务
    time.sleep(1)  # 每秒检查一次

在上面的代码中,schedule.every(10).seconds.do(job) 会安排 job 函数每隔 10 秒执行一次。schedule.run_pending() 检查是否有任务需要执行,time.sleep(1) 控制检查的频率。

3. 常见时间间隔的设置

python-schedule 支持多种常见的时间间隔设置,包括:秒、分钟、小时、天、周等。

示例:设置任务的不同时间间隔
import schedule
import time

def job():
    print("任务正在执行...")

# 每分钟执行一次
schedule.every(1).minute.do(job)

# 每小时执行一次
schedule.every().hour.do(job)

# 每天 9:00 执行一次
schedule.every().day.at("09:00").do(job)

# 每周一早上 8:30 执行一次
schedule.every().monday.at("08:30").do(job)

# 每月 1 号执行一次
schedule.every(1).month.do(job)

while True:
    schedule.run_pending()
    time.sleep(1)

在这段代码中,任务将按照以下规则定时执行:

  • 每分钟一次
  • 每小时一次
  • 每天 9:00 执行一次
  • 每周一 8:30 执行一次
  • 每月 1 号执行一次
  • 4. 取消任务

    你可以通过 schedule.cancel_job() 来取消已安排的任务。

    示例:取消任务
    import schedule
    import time
    
    
    def job():
        print("任务正在执行...")
    
    # 安排一个任务
    job1 = schedule.every(5).seconds.do(job)
    
    # 取消任务
    schedule.cancel_job(job1)
    
    while True:
        schedule.run_pending()
        time.sleep(1)

    在上面的代码中,job1 任务被安排每隔 5 秒执行一次,但在之后通过 schedule.cancel_job(job1) 被取消了。即使调度器还在运行,该任务也不会再被执行。

    5. 任务间隔的复杂设置

    你可以设置更复杂的定时任务间隔,比如每隔一段时间执行某个任务,但只能在特定的时间段内执行。

    示例:设置特定的任务间隔
    import schedule
    import time
    
    
    def job():
        print("任务正在执行...")
    
    # 每小时的第 15 分钟执行任务
    schedule.every().hour.at(":15").do(job)
    
    # 每周的周五执行任务
    schedule.every().friday.do(job)
    
    while True:
        schedule.run_pending()
        time.sleep(1)

    在上面的代码中,任务将安排在以下时刻执行:

  • 每小时的第 15 分钟执行
  • 每周五执行
  • 6. 多任务调度

    你可以安排多个任务在不同的时间间隔下执行。

    示例:多个任务调度
    import schedule
    import time
    
    
    def job1():
        print("任务 1 正在执行...")
    
    def job2():
        print("任务 2 正在执行...")
    
    
    # 安排任务
    schedule.every(3).seconds.do(job1)
    
    schedule.every(5).seconds.do(job2)
    
    while True:
        schedule.run_pending()
        time.sleep(1)

    在这段代码中,job1 每 3 秒执行一次,而 job2 每 5 秒执行一次,两个任务会同时运行。

    7. 限制任务执行的次数

    有时你可能希望限制任务执行的次数,可以通过 job.tag() 来实现。

    示例:限制任务执行次数
    import schedule
    import time
    
    
    def job():
        print("任务正在执行...")
    
    # 安排任务,每 2 秒执行一次
    job1 = schedule.every(2).seconds.do(job)
    
    # 只执行 3 次
    for i in range(3):
        schedule.run_pending()
        time.sleep(1)

    在此示例中,任务只执行 3 次。你也可以通过其他方式来设置次数,具体取决于实际需求。

    8. 调度的高级功能

    使用 Job.tag 添加标记

    你可以给任务添加标签,以便在以后可以通过标签来取消或修改任务。

    import schedule
    import time
    
    
    def job():
        print("任务正在执行...")
    
    # 安排任务并添加标签
    job1 = schedule.every(3).seconds.do(job)
    
    job1.tag('important')
    
    # 根据标签取消任务
    for job in schedule.get_jobs():
        if 'important' in job.tags:
            schedule.cancel_job(job)
    
    while True:
        schedule.run_pending()
        time.sleep(1)

    9. 处理异常和错误

    你可以使用 try…except 语句来捕捉任务执行中的异常。

    import schedule
    import time
    
    
    def job():
        try:
            print("任务开始执行...")
    
            # 模拟可能会抛出异常的操作
            1 / 0  # 故意制造一个异常
    
        except Exception as e:
            print(f"任务发生错误: {e}")
    
    # 安排任务
    schedule.every(5).seconds.do(job)
    
    while True:
        schedule.run_pending()
        time.sleep(1)

    在这个例子中,任务中故意抛出了异常,但程序会继续运行并捕获错误。

    总结

  • schedule 是一个非常轻量且易用的 python 定时任务调度库。
  • 你可以用它来安排简单的定时任务,或通过灵活的时间间隔设定来处理复杂的调度需求。
  • 可以安排任务、取消任务、限制任务的执行次数,并且支持任务标签和错误处理。
  • 作者:网络风云

    物联沃分享整理
    物联沃-IOTWORD物联网 » python-定时任务模块schedule使用指南

    发表回复