celery_integration.py
Python script, ASCII text executable
1""" 2This module helps integrate the Celery task manager with a Flask application. 3 4Roundabout - git hosting for everyone <https://roundabout-host.com> 5Copyright (C) 2023-2025 Roundabout developers <root@roundabout-host.com> 6 7This program is free software: you can redistribute it and/or modify 8it under the terms of the GNU Affero General Public License as published by 9the Free Software Foundation, either version 3 of the License, or 10(at your option) any later version. 11 12This program is distributed in the hope that it will be useful, 13but WITHOUT ANY WARRANTY; without even the implied warranty of 14MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15GNU Affero General Public License for more details. 16 17You should have received a copy of the GNU Affero General Public License 18along with this program. If not, see <http://www.gnu.org/licenses/>. 19""" 20 21import flask 22from celery import Celery, Task 23 24 25def init_celery_app(app: flask.Flask): 26class FlaskTask(Task): 27def __call__(self, *args: object, **kwargs: object): 28with app.app_context(): 29return self.run(*args, **kwargs) 30 31celery_app = Celery(app.name, task_cls=FlaskTask) 32celery_app.config_from_object(app.config["CELERY"]) 33celery_app.set_default() 34app.extensions["celery"] = celery_app 35return celery_app 36