By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 celery_tasks.py

View raw Download
text/x-script.python • 1.01 kiB
Python script, ASCII text executable
        
            
1
import time
2
import os
3
import config
4
from celery import shared_task
5
from app import db
6
from misc_utils import *
7
from models import *
8
from smtplib import SMTP
9
10
11
@shared_task(ignore_result=False)
12
def send_notification(notification_id, users, level):
13
notification = db.session.get(Notification, notification_id)
14
15
for user in users:
16
db.session.add(UserNotification(db.session.get(User, user), notification, level))
17
db.session.commit()
18
19
return 0 # notification sent successfully
20
21
22
@shared_task(ignore_result=False)
23
def merge_heads(head_route, head_branch, base_route, base_branch):
24
server_repo_location = os.path.join(config.REPOS_PATH, base_route.lstrip("/"))
25
if not os.path.isdir(server_repo_location):
26
raise FileNotFoundError(f"Repo {server_repo_location} not found, cannot merge.")
27
28
git_command(server_repo_location, b"", "checkout", "-f", f"heads/{base_branch}")
29
out, err = git_command(server_repo_location, b"", "merge", f"heads/{head_branch}", return_err=True)
30
31
return out, err
32