By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 celery_tasks.py

View raw Download
text/x-script.python • 1.51 kiB
Python script, ASCII text executable
        
            
1
import time
2
import os
3
import config
4
from celery import shared_task
5
from app import db
6
from misc_utils import *
7
from models import *
8
from smtplib import SMTP
9
10
11
@shared_task(ignore_result=False)
12
def send_notification(notification_id, users, level):
13
notification = db.session.get(Notification, notification_id)
14
15
for user in users:
16
db.session.add(UserNotification(db.session.get(User, user), notification, level))
17
db.session.commit()
18
19
return 0 # notification sent successfully
20
21
22
@shared_task(ignore_result=False)
23
def merge_heads(head_route, head_branch, base_route, base_branch):
24
server_repo_location = os.path.join(config.REPOS_PATH, base_route.lstrip("/"))
25
if not os.path.isdir(server_repo_location):
26
raise FileNotFoundError(f"Repo {server_repo_location} not found, cannot merge.")
27
28
if base_route == head_route:
29
git_command(server_repo_location, b"", "checkout", f"{base_branch}")
30
out, err = git_command(server_repo_location, b"", "merge", "--no-ff", f"heads/{head_branch}", return_err=True)
31
32
return out, err
33
34
remote_url = os.path.join(config.BASE_DOMAIN, "git", base_route.lstrip("/"))
35
36
git_command(server_repo_location, b"", "remote", "add", "NEW", remote_url)
37
git_command(server_repo_location, b"", "remote", "update")
38
git_command(server_repo_location, b"", "checkout", f"{base_branch}")
39
git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories", f"NEW/{head_branch}")
40
git_command(server_repo_location, b"", "remote", "rm", "NEW")
41