By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 celeryIntegration.py

View raw Download
text/x-script.python • 456 B
Python script, ASCII text executable
        
            
1
import flask
2
from celery import Celery, Task
3
4
5
def initCeleryApp(app: flask.Flask):
6
class FlaskTask(Task):
7
def __call__(self, *args: object, **kwargs: object):
8
with app.app_context():
9
return self.run(*args, **kwargs)
10
11
celeryApp = Celery(app.name, task_cls=FlaskTask)
12
celeryApp.config_from_object(app.config["CELERY"])
13
celeryApp.set_default()
14
app.extensions["celery"] = celeryApp
15
return celeryApp
16