flask配置celery异步任务
flask 配置 celery 异步任务
# 前言#
在使用 flask 开发的时候,接口的返回需要很少的时间,所以我们需要将一些耗时的任务,放到异步后台去处理,例如:发送邮件,耗时的 CPU 任务等。在 python web 框架中 celery 这个库,可能是最合适的。
由于我使用 flask 的时间比较多,但是当我想把 celery 很好的与 flask 进行集成的时候,却发现并不是那么如意。花费了很久的时间去实践最后却是各种报错。出现了循环导入、app 上下文、tasks not found 等问题,尝试了种种却总是不如人意。
好在功夫不负有心人,在结合官方文档并查阅了大量资料后,终于把 celery 很好得集成在了 flask 项目中。我在这里记录一下,同时也希望对你们有所帮助。
# 配置#
开发环境 | Windows10 |
---|---|
python | 3.8.6 |
flask | 2.0.x |
celery | 5.x |
broker | redis |
pool | eventlet |
# simple 模式#
由于 celery 5.0 后推荐小写模式,与 flask config 大写规范有冲突,所以我们当同目录下创建一个 celeryconfig.py
文件
1 | celeryconfig.py |
flask simple 模式。
1 | simple.py |
这些就是单文件模式的代码,这其中我们添加了一个任务 add2
,然后启动 flask。
1 | python simple.py |
由于 celery 和 flask 是同级别的 app,所以我们需要一个新的窗口启动 celery,加入 - P 参数指定异步 worker eventlet
1 | celery -A simple.celery_app worker -l info -P eventlet |
当我们启动 celery 之后。看到最后一行的 ready 的时候,说明我们的 celery 已经启动成功了。
然后再看有下面标识说明我们的任务已经被添加成功了。
1 | [tasks] |
同时我们查看一下 celery 的窗口:
simple 模式就结束了
# Factory 模式#
当然我们如果用 flask 写一个稍微复杂的东西的话,其实工厂模式我们应该用的更多。下面我们一起来看看工厂模式中的配置。
# 目录结构#
首先我们先规划一个 flask+celery 的目录结构。然后创建下面的文件:
1 | . |
我们先创建一个注册 celery 的函数,主要功能是使用 flask 应用上下文。
1 | def register_celery(celery, app): |
然后我们创建 create_app 函数,将写好的注册 celery 函数加进去。
1 | def create_app(**kwargs): |
上面这些都是我们在 __init__
文件中创建的,下面我们来创建 celery 的 app
1 | from celery import Celery |
我们把 celery 配置文件和 flask 工厂应用导入进来。然后创建 make_celery 函数生成 celery 应用。
生成 celery 应用后把 celery 传入到 flask 应用函数中去。这样把生成和注册分开写,解决了循环导入的问题。
接着我们创建一个 tasks.py 文件。
1 | from server import my_celery |
从 server 文件中导入 celery 应用,然后创建任务。
然后在视图中引用任务。
1 | from flask import Blueprint, jsonify |
celery 的任务可以通过 delay, 方法调用,参数在 delay 中直接传入。
详细介绍:
# celery 文档#
这些 API 定义了标准的执行选项集,也就是下面这三个方法:
-
apply_async(args[, kwargs[, …]])
发送一个任务消息。
-
delay(*args, **kwargs)
直接发送一个任务消息,但是不支持运行参数。
-
calling(
__call__
)应用一个支持调用接口(例如,add (2,2))的对象,意味着任务不会被一个 worker 执行,但是会在当前线程中执行 (但是消息不会被发送)。
速查表
-
T.delay(arg, kwarg=value)
调用 apply_async 的快捷方式(.delay (_args, *_kwargs) 等价于调用 .apply_async (args, kwargs))。
-
T.apply_async((arg,), {'kwarg': value})
-
T.apply_async(countdown=10)
从现在起,十秒内执行。
-
T.apply_async(eta=now + timedelta(seconds=10))
从现在起十秒内执行,指明使用 eta。
-
T.apply_async(countdown=60, expires=120)
从现在起一分钟执行,但在两分钟后过期。
-
T.apply_async(expires=now + timedelta(days=2))
两天内过期,使用 datetime 对象。
例子
delay()
方法就像一个很规则的函数,很方便去调用它:
1 | task.delay(arg1, arg2, kwarg1='x', kwarg2='y') |
用 apply_async()
替代你写的:
1 | task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'}) |
尽管运行十分方便,但是如果像设置额外的行参数,你必须用 apply_async
# 运行一下#
运行之前我们需要先创建一个 .flaskenv
文件,指定以下我们的 FLASK_APP 环境变量是 server.py
1 | FLASK_APP=server.py |
好了之后,启动 flask
1 | flask run |
启动 celery
1 | celery -A server.my_celery worker -l info -P eventlet |
老规矩,看一下任务注册成功没
1 | [tasks] |
我们打开浏览器查看
可以看到执行成功了。再看看命令行。
任务已经成功的执行了。
就这样我们弄好了 flask+celery 项目的配置,并成功执行了任务。
(待补充 django+celery)
flask配置celery异步任务