# 定时任务

# import apscheduler

  • :Python 的第三方库,用于执行定时任务。

  • 官方文档 (opens new window)

  • 安装:pip install apscheduler

  • 原理:当一个任务达到触发时刻时,就创建一个子线程,来执行该任务。

  • 例:

    from apscheduler.schedulers.blocking import BlockingScheduler
    from datetime import datetime
    
    scheduler = BlockingScheduler()  # 创建一个调度器,可以往其中添加多个任务
    
    # 添加一个任务,只会在指定时刻,执行一次
    scheduler.add_job(func=print, args=('hello',), kwargs=None, trigger='date', next_run_time=datetime(2020, 10, 1, 0, 0, 0))
    
    # 添加一个任务,每隔一段时间就执行一次。间隔时间可以是 seconds、minutes、hours 等单位
    scheduler.add_job(func=print, args=('hello',), trigger='interval', seconds=3)
    
    # 添加一个任务,在每天的 12:30 执行一次
    scheduler.add_job(func=print, args=('hello',), trigger='cron', hour=12, minute=30)
    
    scheduler.start()      # 启动调度器
    scheduler.get_jobs()   # 返回一个列表,包含所有待执行的任务,不包含已经失效的任务
    scheduler.shutdown()   # 终止调度器
    
    • trigger 参数,表示任务的触发方式。
    • trigger='interval' 类型的任务,可输入以下几种参数,控制触发时间:
      weeks       # 每隔几周。取值为 int 类型
      days
      hours
      minutes
      seconds
      
      start_date  # 开始日期。取值为 datetime 或 str
      end_date    # 结束日志。取值为 datetime 或 str
      
    • trigger='cron' 类型的任务,可输入以下几种参数,控制触发时间:
      year
      month
      week
      day_of_week # 每周的第几天,取值为 0~6
      hour
      minute
      second
      
      start_date
      end_date
      
  • 上例创建的 scheduler ,属于 BlockingScheduler 。执行 scheduler.start() 时,会阻塞当前线程。如果不想阻塞当前线程,可以使用 BackgroundScheduler

    from apscheduler.schedulers.background import BackgroundScheduler
    scheduler = BackgroundScheduler()
    

    这样,所有任务会在后台运行,不会阻塞当前线程,但依然会将 stdout、stderr 输出到当前终端。

  • 添加到 scheduler 的任务,可以被删除:

    job = scheduler.add_job(func=print, args=('hello',), trigger='interval', seconds=3)
    job.remove()
    

# import celery

  • :Python 的第三方库,用于执行大量定时任务。

  • 官方文档 (opens new window)

  • 安装:pip install celery

  • celery 是一个分布式系统,架构如下:

    • 部署一个 Redis 或 RabbitMQ 服务器,用于存储 celery 待执行的任务信息。称为 broker 。
    • 部署一个 Redis 或 RabbitMQ 服务器,用于存储 celery 已执行的任务信息。称为 backend 。broker 与 backend 可以共用一个服务器。
    • 部署一个或多个 Python 进程,负责执行 broker 中的任务。称为 worker 。
    • 在其它 Python 进程中,调用 celery 的 API ,添加任务到 broker 。
  • 例:启动 worker 进程

    >>> import celery
    >>> app = celery.Celery(backend='redis://127.0.0.1:6379/0', broker='redis://127.0.0.1:6379/0')
    >>> app.worker_main()
    
  • 例:创建异步任务

    >>> @app.task         # 使用装饰器,添加任务
    ... def fun1(*args):
    ...     print(*args)
    ...
    >>> fun1('hello')               # 直接执行该函数
    hello
    >>> task = fun1.delay('hello')  # 异步执行该函数。这会封装为一个异步任务,发送到 broker
    >>> task
    [2019-10-28 16:00:35,998: WARNING/MainProcess] <AsyncResult: c399d950-c16e-4912-8d19-624393abf1ef>
    >>> result = celery.result.AsyncResult(id=task.id, app=cel) # 获取任务的执行结果
    >>> result.status               # 查看任务的状态
    [2019-10-28 16:01:01,766: WARNING/MainProcess] 'PENDING'
    
  • 例:创建定时任务

    >>> result = fun1.apply_async(args=('hello',), kwargs=None, countdown=10)
    >>> result.status
    'PENDING'
    
  • 例:创建周期性任务

    app.conf.beat_schedule = {
        'schedule1': {
            'task': 'task1.fun1',             # 将 task1.py 文件中的 fun1() 函数,添加成为一个任务
            'args': ('hello',)                # 给函数输入参数
            'schedule': timedelta(seconds=2), # 间隔时间
        },
        'schedule2': {
            ...
        },
    }