You're looking at it

Homepage: https://roundabout-host.com

By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 celery_integration.py

View raw Download
text/plain • 1.29 kiB
Python script, ASCII text executable
        
            
1
"""
2
This module helps integrate the Celery task manager with a Flask application.
3
4
Roundabout - git hosting for everyone <https://roundabout-host.com>
5
Copyright (C) 2023-2025 Roundabout developers <root@roundabout-host.com>
6
7
This program is free software: you can redistribute it and/or modify
8
it under the terms of the GNU Affero General Public License as published by
9
the Free Software Foundation, either version 3 of the License, or
10
(at your option) any later version.
11
12
This program is distributed in the hope that it will be useful,
13
but WITHOUT ANY WARRANTY; without even the implied warranty of
14
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
GNU Affero General Public License for more details.
16
17
You should have received a copy of the GNU Affero General Public License
18
along with this program. If not, see <http://www.gnu.org/licenses/>.
19
"""
20
21
import flask
22
from celery import Celery, Task
23
24
25
def init_celery_app(app: flask.Flask):
26
class FlaskTask(Task):
27
def __call__(self, *args: object, **kwargs: object):
28
with app.app_context():
29
return self.run(*args, **kwargs)
30
31
celery_app = Celery(app.name, task_cls=FlaskTask)
32
celery_app.config_from_object(app.config["CELERY"])
33
celery_app.set_default()
34
app.extensions["celery"] = celery_app
35
return celery_app
36