celery_tasks.py
Python script, ASCII text executable
1import common 2import time 3import os 4import config 5import email_send 6import shutil 7from celery import shared_task 8from app import db, _ 9from smtplib import SMTP 10from celery.utils.log import get_task_logger 11from sqlalchemy.orm import make_transient 12from datetime import datetime 13 14 15@shared_task(ignore_result=False) 16def send_notification(user_notification_id): 17from models import UserNotification, Commit 18user_notification = db.session.get(UserNotification, user_notification_id) 19user = user_notification.user 20notification = user_notification.notification 21 22if user.email: 23with (SMTP(config.MAIL_SERVER) as mail): 24match notification.data.get("type"): 25case "welcome": 26message = email_send.make_multipart_message( 27"Welcome, {username}".format(username=user.username), 28config.NOTIFICATION_EMAIL, 29user.email, 30"welcome", 31username=user.username) 32case "commit": 33commit = db.session.get(Commit, notification.data.get("commit")) 34line_separator = "\n\n" # hack so it works in older Pythons 35newline = "\n" 36message = email_send.make_multipart_message( 37f"[commit in {notification.data.get('repo')}] {commit.message.partition(line_separator)[0].replace(newline, ' ')}", 38config.NOTIFICATION_EMAIL, 39user.email, 40"commit", 41username=user.username, 42commit=commit, 43url="https://" if config.suggest_https else "http://" + config.BASE_DOMAIN + "/repo/" + notification.data.get("repo") + "/commit/" + notification.data.get("commit") 44) 45 46mail.sendmail(config.NOTIFICATION_EMAIL, user.email, message) 47 48return 0 # notification sent successfully 49 50 51@shared_task(ignore_result=False) 52def merge_heads(head_route, head_branch, base_route, base_branch, simulate=True): 53from models import Repo, Commit 54server_repo_location = os.path.join(config.REPOS_PATH, base_route.lstrip("/")) 55if not os.path.isdir(server_repo_location): 56raise FileNotFoundError(f"Repo {server_repo_location} not found, cannot merge.") 57 58if base_route == head_route: 59common.git_command(server_repo_location, b"", "checkout", f"{base_branch}") 60if simulate: 61out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--no-commit", "--no-ff", f"heads/{head_branch}", 62return_err=True, return_exit=True) 63 64# Undo the merge. 65common.git_command(server_repo_location, b"", "merge", "--abort") 66else: 67out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", f"heads/{head_branch}", 68return_err=True, return_exit=True) 69 70new_commits = common.git_command(server_repo_location, b"", "log", "--oneline", f"heads/{base_branch}..heads/{head_branch}") 71 72return "merge_simulator" if simulate else "merge", out, err, head_route, head_branch, base_route, base_branch, merge_exit, new_commits 73 74# Otherwise, we need to fetch the head repo. 75remote_url = "https://" if config.suggest_https else "http://" + os.path.join(config.BASE_DOMAIN + f":{config.port}" if config.port not in {80, 443} else "", "git", head_route.lstrip("/")) 76 77out, err = b"", b"" 78part_out, part_err = common.git_command(server_repo_location, b"", "remote", "add", "NEW", remote_url, return_err=True) 79out += part_out 80err += part_err 81part_out, part_err = common.git_command(server_repo_location, b"", "remote", "update", return_err=True) 82out += part_out 83err += part_err 84part_out, part_err = common.git_command(server_repo_location, b"", "fetch", "NEW", f"{head_branch}", return_err=True) 85out += part_out 86err += part_err 87part_out, part_err = common.git_command(server_repo_location, b"", "checkout", f"{base_branch}", return_err=True) 88out += part_out 89err += part_err 90new_commits, part_err = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", f"heads/{base_branch}..NEW/{head_branch}", "--", return_err=True) 91new_commits = new_commits.decode().splitlines() 92err += part_err 93 94if simulate: 95part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories", 96"--no-commit", "--no-ff", f"NEW/{head_branch}", return_err=True, return_exit=True) 97else: 98part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories", 99f"NEW/{head_branch}", return_err=True, return_exit=True) 100 101diff, diff_exit = common.git_command(server_repo_location, b"", "diff", "--check", return_exit=True) 102 103out += part_out 104err += part_err 105part_out, part_err = common.git_command(server_repo_location, b"", "remote", "rm", "NEW", return_err=True) 106out += part_out 107err += part_err 108if simulate: 109# Undo the merge. 110common.git_command(server_repo_location, b"", "merge", "--abort") 111else: 112# Copy the commits rows from the head repo to the base repo 113for commit in new_commits: 114commit_data = Commit.query.filter_by(repo_name=head_route, sha=commit).first() 115 116db.session.expunge(commit_data) 117make_transient(commit_data) 118 119commit_data.repo_name = base_route 120commit_data.identifier = f"{base_route}/{commit_data.sha}" 121commit_data.receive_date = datetime.now() 122db.session.add(commit_data) 123 124db.session.commit() 125 126return "merge_simulator" if simulate else "merge", out, err, head_route, head_branch, base_route, base_branch, merge_exit, new_commits 127 128 129@shared_task(ignore_result=False) 130def copy_site(route): 131from models import Repo 132repo = db.session.get(Repo, route) 133server_repo_location = os.path.join(config.REPOS_PATH, route.lstrip("/")) 134subdomain = repo.owner.username 135subpath = repo.name if repo.has_site != 2 else "" 136site_location = os.path.join(config.SITE_PATH, subdomain, subpath) 137# Get the branch to be used for the site; if it somehow doesn't exist, use the default branch. 138branch = repo.site_branch or repo.default_branch 139# Make a shallow clone of the repo; this prevents getting the full git database when it's not needed. 140if os.path.isdir(site_location): 141# Delete the old site. 142shutil.rmtree(site_location) 143 144common.git_command(config.SITE_PATH, b"", "clone", "--depth=1", f"--branch={branch}", os.path.join(os.getcwd(), server_repo_location), os.path.join(subdomain, subpath)) 145 146 147@shared_task(ignore_result=False) 148def delete_site(route): 149from models import Repo 150repo = db.session.get(Repo, route) 151subdomain = repo.owner.username 152subpath = repo.name if repo.has_site != 2 else "." 153site_location = os.path.join(config.SITE_PATH, subdomain, subpath) 154if os.path.isdir(site_location): 155shutil.rmtree(site_location) 156 157# Redo the primary site. 158primary_site = Repo.query.filter_by(owner=repo.owner, has_site=2).first() 159if primary_site: 160copy_site(primary_site.route) 161 162 163@shared_task(ignore_result=False) 164def request_email_change(username, email): 165from models import User, EmailChangeRequest 166user = db.session.get(User, username) 167 168request = EmailChangeRequest(user, email) 169 170db.session.add(request) 171db.session.commit() 172 173message = email_send.make_multipart_message( 174"Email change request for {username}".format(username=username), 175config.NOTIFICATION_EMAIL, 176email, 177"email-change", 178username=username, 179code=request.code, 180new_email=email, 181url="https://" if config.suggest_https else "http://" + config.BASE_DOMAIN + "/settings/confirm-email/" + request.code 182) 183 184with (SMTP(config.MAIL_SERVER) as mail): 185mail.sendmail(config.NOTIFICATION_EMAIL, email, message) 186