By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 celery_integration.py

View raw Download
text/x-script.python • 463 B
Python script, ASCII text executable
        
            
1
import flask
2
from celery import Celery, Task
3
4
5
def init_celery_app(app: flask.Flask):
6
class FlaskTask(Task):
7
def __call__(self, *args: object, **kwargs: object):
8
with app.app_context():
9
return self.run(*args, **kwargs)
10
11
celery_app = Celery(app.name, task_cls=FlaskTask)
12
celery_app.config_from_object(app.config["CELERY"])
13
celery_app.set_default()
14
app.extensions["celery"] = celery_app
15
return celery_app
16