celeryIntegration.py
Python script, ASCII text executable
1import flask 2from celery import Celery, Task 3 4 5def initCeleryApp(app: flask.Flask): 6class FlaskTask(Task): 7def __call__(self, *args: object, **kwargs: object): 8with app.app_context(): 9return self.run(*args, **kwargs) 10 11celeryApp = Celery(app.name, task_cls=FlaskTask) 12celeryApp.config_from_object(app.config["CELERY"]) 13celeryApp.set_default() 14app.extensions["celery"] = celeryApp 15return celeryApp 16