Flask-APScheduler使用教程

Flask-APScheduler使用教程

作者:lizhonglin
github: https://github.com/Leezhonglin/
blog: https://leezhonglin.github.io/

APScheduler是一个Python库,可让您安排稍后执行的Python代码,是一套任务调度框架,可以用来做定时任务控制器,可以添加删除任务。如果将作业存储在数据库中,它们也将在调度程序重新启动后继续运行并保持其状态。重新启动调度程序后,它将运行它应该在脱机时运行的所有作业。

除此之外,APScheduler还可以用作特定于平台的调度程序(如cron守护程序或Windows任务调度程序)的跨平台,特定于应用程序的替代程序。但请注意,APScheduler本身不是守护程序或服务,也不附带任何命令行工具。它主要用于在现有应用程序中运行。也就是说,APScheduler确实为您提供了一些构建块来构建调度程序服务或运行专用的调度程序进程。

APScheduler有三个可以使用的内置调度系统:

  • Cron式调度(可选的开始/结束时间)
  • 基于区间的执行(偶数间隔运行作业,可选的开始/结束时间)
  • 一次性延迟执行(在设定的日期/时间运行一次作业)

APScheduler是一个python的第三方库,用来提供python的后台程序。包含四个组件,分别是:

  • triggers: 任务触发器组件,提供任务触发方式
  • job stores: 任务商店组件,提供任务保存方式
  • executors: 任务调度组件,提供任务调度方式
  • schedulers: 任务调度组件,提供任务工作方式

具体内容如下:

triggers: 支持三种任务触发方式

  • date:固定日期触发器,任务只运行一次,运行完毕自动清除;若错过指定运行时间,任务不会被创建

    | 参数 | 说明 |
    | :——————————– | :——————- |
    | run_date (datetime 或 str) | 作业的运行日期或时间 |
    | timezone (datetime.tzinfo 或 str) | 指定时区 |

    1
    2
    例如# 在 2019-4-24 00:00:01 时刻运行一次 start_system 方法
    scheduler .add_job(start_system, 'date', run_date='2019-4-24 00:00:01', args=['text'])
  • interval:时间间隔触发器,每个一定时间间隔执行一次。

    | 参数 | 说明 |
    | —————————- | ———- |
    | weeks (int) | 间隔几周 |
    | days (int) | 间隔几天 |
    | hours (int) | 间隔几小时 |
    | minutes (int) | 间隔几分钟 |
    | seconds (int) | 间隔多少秒 |
    | start_date (datetime 或 str) | 开始日期 |
    | end_date (datetime 或 str) | 结束日期 |

    1
    2
    # 在 2019-4-24 00:00:00 - 2019-4-24 08:00:00 之间, 每隔两小时执行一次 alarm_job 方法
    scheduler .add_job(alarm_job, 'interval', hours=2, start_date='2019-4-24 00:00:00' , end_date='2019-4-24 08:00:00')
  • cron:cron风格的任务触发
参数 说明
year (int 或 str) 表示四位数的年份 (2019)
month(int\ str) 月 (范围1-12)
day(int\ str) 日 (范围1-31)
week(int\ str) 周 (范围1-53)
day_of_week (int\ str) 表示一周中的第几天,既可以用0-6表示也可以用其英语缩写表示
hour (int\ str) 表示取值范围为0-23时
minute (int\ str) 表示取值范围为0-59分
second (int\ str) 表示取值范围为0-59秒
start_date (datetime\ str) 表示开始时间
end_date (datetime\ str) 表示结束时间
timezone (datetime.tzinfo\ str) 表示时区取值

(int|str) 表示参数既可以是int类型,也可以是str类型
(datetime | str) 表示参数既可以是datetime类型,也可以是str类型

例如:表示每5秒执行该程序一次,相当于interval 间隔调度中seconds = 5

1
sched.add_job(my_job, 'cron',second = '*/5')

job stores: 支持四种任务存储方式

  • memory:默认配置任务存在内存中
  • mongdb:支持文档数据库存储
  • sqlalchemy:支持关系数据库存储
  • redis:支持键值对数据库存储

schedulers: 调度器主要分三种,一种独立运行的,一种是后台运行的,最后一种是配合其它程序使用

  • BlockingScheduler: 当这个调度器是你应用中 唯一要运行 的东西时使用
  • BackgroundScheduler: 当 不运行其它框架 的时候使用,并使你的任务在 后台运行
  • AsyncIOScheduler: 当你的程序是 异步IO模型 的时候使用
  • GeventScheduler: 和 gevent 框架配套使用
  • TornadoScheduler: 和 tornado 框架配套使用
  • TwistedScheduler: 和 Twisted 框架配套使用
  • QtScheduler: 开发 qt 应用的时候使用
1.安装
1
pip install Flask-APScheduler
2.在flask中使用调度器

首先在flask的create_app函数中创建最初的调度器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
from apscheduler.triggers.interval import IntervalTrigger

###此方法负责初始化app
def create_app(config_name):
app=Flask(__name__)
CORS(app)
app.config.from_object(config[config_name])
config[config_name].init_app(app)
###初始化数据库
db.init_app(app)
#####################################################上面代码忽略
# 这里是重点
# 初始化备份数据库定时器
scheduler.init_app(app)
# # 解决FLASK DEBUG模式定时任务执行两次
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
scheduler.api_enabled = True
scheduler.init_app(app)
# 实例话interval对象,如果不实例话的话有可能会报错没有interval这个
interval = IntervalTrigger(
days = 2,
start_date='2019-4-24 08:00:00',
end_date='2099-4-24 08:00:00',
timezone='Asia/Shanghai')
# dbDump 是一个备份数据库的函数,这个根据自己的实际情况来看
scheduler.add_job(func=dbDump,trigger=interval,id='bak_one')
scheduler.start()
######################################################下面代码忽略

###初始化上传下载和导出目录
# init_static_path()
###注册蓝图
register_app(app)
# ###初始化日志
init_log()
return app

如果需要重新替换上面的任务可以在任务中多添加一个参数replace_existing=True由上面的两天备份一次 变成每天备份一次。配置成功后他会自动去替换上面的定时器。

1
2
3
4
5
6
7
8
9
10
11
12
 # 判断配置文件变化 更新定时任务
if os.environ.get('WERKZEUG_RUN_MAIN') == 'true':
interval = IntervalTrigger(
days=1,
start_date='2019-4-24 08:00:00',
end_date='2099-4-24 08:00:00',
timezone='Asia/Shanghai')
scheduler.add_job(
func=dbDump,
trigger=interval,
id='bak_one',
replace_existing=True)

replace_existing=True 这个参数非常重要,在这个地方弄了很久,开始一直不能替换第一次任务。

问题解决:

1
scheduler.add_job(dbDump,'interval',id='bak_one',days=1,start_date='2019-4-24 08:00:00',end_date='2099-4-24 08:00:00',replace_existing=True)

如果是按照上面创建的任务的话在使用pyinstaller打包生成的可执行文件,运行的时候会出现如下面的问题

1
APScheduler: LookupError: No trigger by the name “interval” was found

解决方案是:替换成我上面的方式去创建任务就可以解决的。

-------------本文结束 感谢您的阅读-------------