flask配置celery异步任务

flask配置celery异步任务

flask 配置 celery 异步任务

# 前言#

转载自:https://www.cnblogs.com/wxhou/p/14399237.html

在使用 flask 开发的时候,接口的返回需要很少的时间,所以我们需要将一些耗时的任务,放到异步后台去处理,例如:发送邮件,耗时的 CPU 任务等。在 python web 框架中 celery 这个库,可能是最合适的。

由于我使用 flask 的时间比较多,但是当我想把 celery 很好的与 flask 进行集成的时候,却发现并不是那么如意。花费了很久的时间去实践最后却是各种报错。出现了循环导入、app 上下文、tasks not found 等问题,尝试了种种却总是不如人意。

好在功夫不负有心人,在结合官方文档并查阅了大量资料后,终于把 celery 很好得集成在了 flask 项目中。我在这里记录一下,同时也希望对你们有所帮助。

# 配置#

开发环境 Windows10
python 3.8.6
flask 2.0.x
celery 5.x
broker redis
pool eventlet

# simple 模式#

由于 celery 5.0 后推荐小写模式,与 flask config 大写规范有冲突,所以我们当同目录下创建一个 celeryconfig.py 文件

1
2
3
celeryconfig.py
broker_url='redis://127.0.0.1:6379/1'
result_backend='redis://127.0.0.1:6379/2'

flask simple 模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
simple.py
from flask import Flask
from celery import Celery
import celeryconfig

app = Flask(__name__)
celery_app = Celery(app.import_name,
broker=celeryconfig.broker_url,
backend=celeryconfig.result_backend)
celery_app.config_from_object(celeryconfig)


@celery_app.task(name='simple/add2')
def add2(x, y):
return x + y


@app.route('/')
def index():
results = add2.delay(3, 5)
return str(results.wait())


if __name__ == '__main__':
app.run(debug=True)

这些就是单文件模式的代码,这其中我们添加了一个任务 add2 ,然后启动 flask。

1
python simple.py

由于 celery 和 flask 是同级别的 app,所以我们需要一个新的窗口启动 celery,加入 - P 参数指定异步 worker eventlet

1
celery -A simple.celery_app worker -l info -P eventlet

当我们启动 celery 之后。看到最后一行的 ready 的时候,说明我们的 celery 已经启动成功了。

然后再看有下面标识说明我们的任务已经被添加成功了。

1
2
[tasks]
. simple/add2

访问网址:http://127.0.0.1:5000/

image-20210211174559291

同时我们查看一下 celery 的窗口:

image-20210211174707759

simple 模式就结束了

# Factory 模式#

当然我们如果用 flask 写一个稍微复杂的东西的话,其实工厂模式我们应该用的更多。下面我们一起来看看工厂模式中的配置。

# 目录结构#

首先我们先规划一个 flask+celery 的目录结构。然后创建下面的文件:

1
2
3
4
5
6
7
8
9
10
11
.
├── app
│ ├── __init__.py ——app主体文件
│ ├── celeryconfig.py ——celery配置文件
│ ├── config.py ——flask配置文件
│ ├── models.py ——模型文件
│ ├── tasks.py ——后台任务
│ └── views.py ——视图文件
├── data.db
├── .flaskenv ——flask环境变量
└── server.py ——运行文件

我们先创建一个注册 celery 的函数,主要功能是使用 flask 应用上下文。

1
2
3
4
5
6
7
8
9
def register_celery(celery, app):
class ContextTask(celery.Task):
abstract = True

def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)

celery.Task = ContextTask

然后我们创建 create_app 函数,将写好的注册 celery 函数加进去。

1
2
3
4
5
6
7
8
def create_app(**kwargs):
app = Flask(__name__)
app.config.from_pyfile('config.py')
db.init_app(app)
register_celery(celery=kwargs.get('celery'), app=app) # >> 注册celery
register_blueprints(app)
register_commands(app)
return app

上面这些都是我们在 __init__ 文件中创建的,下面我们来创建 celery 的 app

打开 server.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from celery import Celery
from app import create_app, celeryconfig


def make_celery(app_name):
celery = Celery(app_name,
broker=celeryconfig.broker_url,
backend=celeryconfig.result_backend)
celery.config_from_object(celeryconfig)
return celery


my_celery = make_celery(__name__)

app = create_app(celery=my_celery)

我们把 celery 配置文件和 flask 工厂应用导入进来。然后创建 make_celery 函数生成 celery 应用。

生成 celery 应用后把 celery 传入到 flask 应用函数中去。这样把生成注册分开写,解决了循环导入的问题。

接着我们创建一个 tasks.py 文件。

1
2
3
4
5
6
7
8
9
10
from server import my_celery
from .models import db, Message


@my_celery.task()
def add2(msg):
message = Message(details=msg)
db.session.add(message)
db.session.commit()
return "success"

从 server 文件中导入 celery 应用,然后创建任务。

然后在视图中引用任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from flask import Blueprint, jsonify
from .models import db, Message
from .tasks import add2

th = Blueprint('', __name__)


@th.route('/')
def index():
res = add2.delay("hello word")
return jsonify(res.wait())


@th.get('/msgs')
def msg_list():
messages = Message.query.all()
results = []
for message in messages:
results.append(message.to_json())
return jsonify(results)

celery 的任务可以通过 delay, 方法调用,参数在 delay 中直接传入。

详细介绍:

# celery 文档#

这些 API 定义了标准的执行选项集,也就是下面这三个方法:

  • apply_async(args[, kwargs[, …]])

    发送一个任务消息。

  • delay(*args, **kwargs)

    直接发送一个任务消息,但是不支持运行参数。

  • calling( __call__ )

    应用一个支持调用接口(例如,add (2,2))的对象,意味着任务不会被一个 worker 执行,但是会在当前线程中执行 (但是消息不会被发送)。

速查表

  • T.delay(arg, kwarg=value)

    调用 apply_async 的快捷方式(.delay (_args, *_kwargs) 等价于调用 .apply_async (args, kwargs))。

  • T.apply_async((arg,), {'kwarg': value})

  • T.apply_async(countdown=10)

    从现在起,十秒内执行。

  • T.apply_async(eta=now + timedelta(seconds=10))

    从现在起十秒内执行,指明使用 eta。

  • T.apply_async(countdown=60, expires=120)

    从现在起一分钟执行,但在两分钟后过期。

  • T.apply_async(expires=now + timedelta(days=2))

    两天内过期,使用 datetime 对象。

例子

delay() 方法就像一个很规则的函数,很方便去调用它:

1
task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

apply_async() 替代你写的:

1
task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

尽管运行十分方便,但是如果像设置额外的行参数,你必须用 apply_async

# 运行一下#

运行之前我们需要先创建一个 .flaskenv 文件,指定以下我们的 FLASK_APP 环境变量是 server.py

1
FLASK_APP=server.py

好了之后,启动 flask

1
flask run

启动 celery

1
celery -A server.my_celery worker -l info -P eventlet

老规矩,看一下任务注册成功没

1
2
[tasks]
. app.tasks.add2

我们打开浏览器查看

image

可以看到执行成功了。再看看命令行。

image-20210821235926547

任务已经成功的执行了。

就这样我们弄好了 flask+celery 项目的配置,并成功执行了任务。

(待补充 django+celery)

Author

y1seco

Posted on

2022-02-13

Updated on

2022-02-13

Licensed under

Comments

:D 一言句子获取中...