# 定时任务
# import apscheduler
:Python 的第三方库,用于执行定时任务。
安装:
pip install apscheduler
原理:当一个任务达到触发时刻时,就创建一个子线程,来执行该任务。
例:
from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime scheduler = BlockingScheduler() # 创建一个调度器,可以往其中添加多个任务 # 添加一个任务,只会在指定时刻,执行一次 scheduler.add_job(func=print, args=('hello',), kwargs=None, trigger='date', next_run_time=datetime(2020, 10, 1, 0, 0, 0)) # 添加一个任务,每隔一段时间就执行一次。间隔时间可以是 seconds、minutes、hours 等单位 scheduler.add_job(func=print, args=('hello',), trigger='interval', seconds=3) # 添加一个任务,在每天的 12:30 执行一次 scheduler.add_job(func=print, args=('hello',), trigger='cron', hour=12, minute=30) scheduler.start() # 启动调度器 scheduler.get_jobs() # 返回一个列表,包含所有待执行的任务,不包含已经失效的任务 scheduler.shutdown() # 终止调度器
- trigger 参数,表示任务的触发方式。
trigger='interval'
类型的任务,可输入以下几种参数,控制触发时间:weeks # 每隔几周。取值为 int 类型 days hours minutes seconds start_date # 开始日期。取值为 datetime 或 str end_date # 结束日志。取值为 datetime 或 str
trigger='cron'
类型的任务,可输入以下几种参数,控制触发时间:year month week day_of_week # 每周的第几天,取值为 0~6 hour minute second start_date end_date
上例创建的 scheduler ,属于
BlockingScheduler
。执行scheduler.start()
时,会阻塞当前线程。如果不想阻塞当前线程,可以使用BackgroundScheduler
:from apscheduler.schedulers.background import BackgroundScheduler scheduler = BackgroundScheduler()
这样,所有任务会在后台运行,不会阻塞当前线程,但依然会将 stdout、stderr 输出到当前终端。
添加到 scheduler 的任务,可以被删除:
job = scheduler.add_job(func=print, args=('hello',), trigger='interval', seconds=3) job.remove()
# import celery
:Python 的第三方库,用于执行大量定时任务。
安装:
pip install celery
celery 是一个分布式系统,架构如下:
- 部署一个 Redis 或 RabbitMQ 服务器,用于存储 celery 待执行的任务信息。称为 broker 。
- 部署一个 Redis 或 RabbitMQ 服务器,用于存储 celery 已执行的任务信息。称为 backend 。broker 与 backend 可以共用一个服务器。
- 部署一个或多个 Python 进程,负责执行 broker 中的任务。称为 worker 。
- 在其它 Python 进程中,调用 celery 的 API ,添加任务到 broker 。
例:启动 worker 进程
>>> import celery >>> app = celery.Celery(backend='redis://127.0.0.1:6379/0', broker='redis://127.0.0.1:6379/0') >>> app.worker_main()
例:创建异步任务
>>> @app.task # 使用装饰器,添加任务 ... def fun1(*args): ... print(*args) ... >>> fun1('hello') # 直接执行该函数 hello >>> task = fun1.delay('hello') # 异步执行该函数。这会封装为一个异步任务,发送到 broker >>> task [2019-10-28 16:00:35,998: WARNING/MainProcess] <AsyncResult: c399d950-c16e-4912-8d19-624393abf1ef> >>> result = celery.result.AsyncResult(id=task.id, app=cel) # 获取任务的执行结果 >>> result.status # 查看任务的状态 [2019-10-28 16:01:01,766: WARNING/MainProcess] 'PENDING'
例:创建定时任务
>>> result = fun1.apply_async(args=('hello',), kwargs=None, countdown=10) >>> result.status 'PENDING'
例:创建周期性任务
app.conf.beat_schedule = { 'schedule1': { 'task': 'task1.fun1', # 将 task1.py 文件中的 fun1() 函数,添加成为一个任务 'args': ('hello',) # 给函数输入参数 'schedule': timedelta(seconds=2), # 间隔时间 }, 'schedule2': { ... }, }