celery_integration.py
Python script, ASCII text executable
1import flask 2from celery import Celery, Task 3 4 5def init_celery_app(app: flask.Flask): 6class FlaskTask(Task): 7def __call__(self, *args: object, **kwargs: object): 8with app.app_context(): 9return self.run(*args, **kwargs) 10 11celery_app = Celery(app.name, task_cls=FlaskTask) 12celery_app.config_from_object(app.config["CELERY"]) 13celery_app.set_default() 14app.extensions["celery"] = celery_app 15return celery_app 16