By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

Support celery

roundabout,
created on Wednesday, 10 January 2024, 17:06:36 (1704906396), received on Wednesday, 31 July 2024, 06:54:40 (1722408880)
Author identity: vlad <vlad.muntoiu@gmail.com>

7a1b303a72981c7eaf5cf49d2d8f979059d84e33

app.py

@@ -1,29 +1,36 @@

                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            import os
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        import shutil
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            import random
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            import subprocess
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from functools import wraps
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        import cairosvg
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        import flask
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from flask_sqlalchemy import SQLAlchemy
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        import platform
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            import git
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            import mimetypes
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            import magic
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        import flask
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        import cairosvg
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from functools import wraps
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from datetime import datetime
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from enum import Enum
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from cairosvg import svg2png
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from flask_sqlalchemy import SQLAlchemy
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            from flask_bcrypt import Bcrypt
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            from markupsafe import escape, Markup
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            from flask_migrate import Migrate
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from datetime import datetime
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from enum import Enum
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        import shutil
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            from PIL import Image
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from cairosvg import svg2png
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        import platform
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from flask_httpauth import HTTPBasicAuth
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from celery import Celery, Task
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            import config
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        import celeryIntegration
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            app = flask.Flask(__name__)
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from flask_httpauth import HTTPBasicAuth
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        app.config.from_mapping(
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            CELERY=dict(
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                                broker_url=config.REDIS_URI,
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                                result_backend=config.REDIS_URI,
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                                task_ignore_result=True,
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            ),
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        )
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        worker = celeryIntegration.initCeleryApp(app)
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            auth = HTTPBasicAuth()
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                            
                                

celeryIntegration.py

@@ -0,0 +1,15 @@

                                
                                
                                
                            
                                
                                    
                                        
                                        import flask
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        from celery import Celery, Task
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        def initCeleryApp(app: flask.Flask):
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            class FlaskTask(Task):
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                                def __call__(self, *args: object, **kwargs: object):
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                                    with app.app_context():
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                                        return self.run(*args, **kwargs)
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            celeryApp = Celery(app.name, task_cls=FlaskTask)
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            celeryApp.config_from_object(app.config["CELERY"])
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            celeryApp.set_default()
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            app.extensions["celery"] = celeryApp
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            return celeryApp
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                            
                                

config.py

@@ -4,6 +4,7 @@ load_dotenv("secrets.env")

                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            DB_PASSWORD: str = os.environ.get("DB_PASSWORD")
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            DB_URI: str = f"postgresql://root:{DB_PASSWORD}@localhost/roundabout"
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                        REDIS_URI: str = "redis://localhost"
                                        
                                        
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            REPOS_PATH: str = "./repos"
                                        
                                        
                                            
                                            
                                            
                                            
                                        
                                    
                                
                                
                                
                            
                                
                                    
                                        
                                            
                                            USERDATA_PATH: str = "./userdata"