celery_tasks.py
Python script, ASCII text executable
1import common 2import time 3import os 4import config 5import email_send 6import shutil 7from celery import shared_task 8from app import db, _ 9from smtplib import SMTP 10from celery.utils.log import get_task_logger 11from sqlalchemy.orm import make_transient 12from datetime import datetime 13 14 15@shared_task(ignore_result=False) 16def send_notification(user_notification_id): 17from models import UserNotification, Commit, Post, PullRequest 18user_notification = db.session.get(UserNotification, user_notification_id) 19user = user_notification.user 20notification = user_notification.notification 21 22if user.email: 23with (SMTP(config.MAIL_SERVER) as mail): 24match notification.data.get("type"): 25case "welcome": 26message = email_send.make_multipart_message( 27f"[system] Welcome, {user.username}", 28config.NOTIFICATION_EMAIL, 29user.email, 30"welcome", 31username=user.username 32) 33case "commit": 34commit = db.session.get(Commit, notification.data.get("commit")) 35line_separator = "\n\n" # hack so it works in older Pythons 36newline = "\n" 37message = email_send.make_multipart_message( 38f"[commit in {notification.data.get('repo')}] {commit.message.partition(line_separator)[0].replace(newline, ' ')}", 39config.NOTIFICATION_EMAIL, 40user.email, 41"commit", 42username=user.username, 43commit=commit, 44url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/repo/" + notification.data.get("repo") + "/commit/" + notification.data.get("commit") 45) 46case "post": 47post = db.session.get(Post, notification.data.get("post")) 48message = email_send.make_multipart_message( 49f"[post in {notification.data.get('repo')}] {post.subject}", 50config.NOTIFICATION_EMAIL, 51user.email, 52"forum", 53username=user.username, 54post=post, 55url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/repo/" + notification.data.get("repo") + "/post/" + notification.data.get("post") 56) 57case "pr": 58pr = db.session.get(PullRequest, notification.data.get("pr")) 59message = email_send.make_multipart_message( 60f"[PR in {notification.data.get('repo')}] {pr.head_route}:{pr.head_branch} -> {pr.base_route}:{pr.base_branch}", 61config.NOTIFICATION_EMAIL, 62user.email, 63"pr", 64username=user.username, 65pr=pr, 66url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + notification.data.get("base") + "/prs/" 67) 68 69mail.sendmail(config.NOTIFICATION_EMAIL, user.email, message) 70 71return 0 # notification sent successfully 72 73 74@shared_task(ignore_result=False) 75def merge_heads(head_route, head_branch, base_route, base_branch, pr_id, simulate=True, method="merge", username=None): 76from models import Repo, Commit, PullRequest, User 77server_repo_location = os.path.join(config.REPOS_PATH, base_route.lstrip("/")) 78pull_request = db.session.get(PullRequest, pr_id) 79if not os.path.isdir(server_repo_location): 80raise FileNotFoundError(f"Repo {server_repo_location} not found, cannot merge.") 81 82# Change to the user's identity. 83common.git_command(server_repo_location, b"", "config", "user.email", db.session.get(User, username).email or f"noreply@{config.BASE_DOMAIN}") 84common.git_command(server_repo_location, b"", "config", "user.name", username) 85 86if base_route == head_route: 87common.git_command(server_repo_location, b"", "checkout", f"{base_branch}") 88if simulate: 89if method == "merge": 90out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--no-commit", "--no-ff", f"heads/{head_branch}", 91return_err=True, return_exit=True) 92# Undo the merge. 93common.git_command(server_repo_location, b"", "merge", "--abort") 94elif method == "fast-forward": 95out, err, merge_exit = common.git_command(server_repo_location, b"", "merge-base", "--is-ancestor", f"heads/{base_branch}", f"heads/{head_branch}", 96return_err=True, return_exit=True) 97elif method == "rebase": 98# To attempt a rebase dry run, switch to a detached head. 99common_ancestor = common.git_command(server_repo_location, b"", "merge-base", 100f"heads/{base_branch}", 101f"heads/{head_branch}").strip().decode() 102common.git_command(server_repo_location, b"", "checkout", "--detach", f"heads/{base_branch}") 103out, err, merge_exit = common.git_command(server_repo_location, b"", "rebase", 104"--onto", f"heads/{head_branch}", 105common_ancestor, return_err=True, 106return_exit=True) 107# Undo the rebase. 108common.git_command(server_repo_location, b"", "rebase", "--abort") 109else: 110if method == "merge": 111out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--no-ff", f"heads/{head_branch}", 112return_err=True, return_exit=True) 113elif method == "fast-forward": 114out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--ff-only", f"heads/{head_branch}", 115return_err=True, return_exit=True) 116elif method == "rebase": 117out, err, merge_exit = common.git_command(server_repo_location, b"", "rebase", f"heads/{head_branch}", return_err=True, return_exit=True) 118 119pull_request.state = 1 120 121for resolves in pull_request.resolves: 122resolves.post.state = 0 123 124db.session.commit() 125 126new_commits = common.git_command(server_repo_location, b"", "log", "--oneline", f"heads/{base_branch}..heads/{head_branch}") 127 128# Undo the identity change. 129common.git_command(server_repo_location, b"", "config", "--unset", "user.email") 130common.git_command(server_repo_location, b"", "config", "--unset", "user.name") 131 132return "merge_simulator" if simulate else "merge", out, err, head_route, head_branch, base_route, base_branch, merge_exit, new_commits 133 134# Otherwise, we need to fetch the head repo. 135remote_url = ("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + (f":{config.port}" if config.port not in {80, 443} else "") + "/git" + head_route 136 137out, err = b"", b"" 138part_out, part_err = common.git_command(server_repo_location, b"", "remote", "add", "NEW", remote_url, return_err=True) 139out += part_out 140err += part_err 141part_out, part_err = common.git_command(server_repo_location, b"", "remote", "update", return_err=True) 142out += part_out 143err += part_err 144part_out, part_err = common.git_command(server_repo_location, b"", "fetch", "NEW", f"{head_branch}", return_err=True) 145out += part_out 146err += part_err 147part_out, part_err = common.git_command(server_repo_location, b"", "checkout", f"{base_branch}", return_err=True) 148out += part_out 149err += part_err 150new_commits, part_err = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", f"heads/{base_branch}..NEW/{head_branch}", "--", return_err=True) 151new_commits = new_commits.decode().splitlines() 152err += part_err 153 154if simulate: 155if method == "merge": 156part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories", 157"--no-commit", "--no-ff", f"NEW/{head_branch}", return_err=True, return_exit=True) 158# Undo the merge. 159common.git_command(server_repo_location, b"", "merge", "--abort") 160elif method == "fast-forward": 161part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge-base", "--is-ancestor", f"heads/{base_branch}", f"NEW/{head_branch}", 162return_err=True, return_exit=True) 163elif method == "rebase": 164# To attempt a rebase dry run, switch to a detached head. 165common_ancestor = common.git_command(server_repo_location, b"", "merge-base", 166f"heads/{base_branch}", 167f"NEW/{head_branch}").strip().decode() 168common.git_command(server_repo_location, b"", "checkout", "--detach", f"heads/{base_branch}") 169part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "rebase", 170"--onto", f"NEW/{head_branch}", 171common_ancestor, return_err=True, 172return_exit=True) 173# Undo the rebase. 174common.git_command(server_repo_location, b"", "rebase", "--abort") 175else: 176if method == "merge": 177part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories", 178"--no-ff", f"NEW/{head_branch}", return_err=True, return_exit=True) 179elif method == "fast-forward": 180part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--ff-only", f"NEW/{head_branch}", return_err=True, return_exit=True) 181elif method == "rebase": 182part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "rebase", f"NEW/{head_branch}", return_err=True, return_exit=True) 183 184# Copy the commits rows from the head repo to the base repo 185for commit in new_commits: 186commit_data = Commit.query.filter_by(repo_name=head_route, sha=commit).first() 187 188db.session.expunge(commit_data) 189make_transient(commit_data) 190 191commit_data.repo_name = base_route 192commit_data.identifier = f"{base_route}/{commit_data.sha}" 193commit_data.receive_date = datetime.now() 194db.session.add(commit_data) 195 196# Consider the PR merged. 197pull_request.state = 1 198 199for resolves in pull_request.resolves: 200resolves.post.state = 0 201 202db.session.commit() 203 204out += part_out 205err += part_err 206part_out, part_err = common.git_command(server_repo_location, b"", "remote", "rm", "NEW", return_err=True) 207out += part_out 208err += part_err 209 210# Undo the identity change. 211common.git_command(server_repo_location, b"", "config", "--unset", "user.email") 212common.git_command(server_repo_location, b"", "config", "--unset", "user.name") 213 214return "merge_simulator" if simulate else "merge", out, err, head_route, head_branch, base_route, base_branch, merge_exit, new_commits 215 216 217@shared_task(ignore_result=False) 218def copy_site(route): 219from models import Repo 220repo = db.session.get(Repo, route) 221server_repo_location = os.path.join(config.REPOS_PATH, route.lstrip("/")) 222subdomain = repo.owner.username 223subpath = repo.name if repo.has_site != 2 else "" 224site_location = os.path.join(config.SITE_PATH, subdomain, subpath) 225# Get the branch to be used for the site; if it somehow doesn't exist, use the default branch. 226branch = repo.site_branch or repo.default_branch 227# Make a shallow clone of the repo; this prevents getting the full git database when it's not needed. 228if os.path.isdir(site_location): 229# Delete the old site. 230shutil.rmtree(site_location) 231 232common.git_command(config.SITE_PATH, b"", "clone", "--depth=1", f"--branch={branch}", os.path.join(os.getcwd(), server_repo_location), os.path.join(subdomain, subpath)) 233 234 235@shared_task(ignore_result=False) 236def delete_site(route): 237from models import Repo 238repo = db.session.get(Repo, route) 239subdomain = repo.owner.username 240subpath = repo.name if repo.has_site != 2 else "." 241site_location = os.path.join(config.SITE_PATH, subdomain, subpath) 242if os.path.isdir(site_location): 243shutil.rmtree(site_location) 244 245# Redo the primary site. 246primary_site = Repo.query.filter_by(owner=repo.owner, has_site=2).first() 247if primary_site: 248copy_site(primary_site.route) 249 250 251@shared_task(ignore_result=False) 252def request_email_change(username, email): 253from models import User, EmailChangeRequest 254user = db.session.get(User, username) 255 256request = EmailChangeRequest(user, email) 257 258db.session.add(request) 259db.session.commit() 260 261message = email_send.make_multipart_message( 262"Email change request for {username}".format(username=username), 263config.NOTIFICATION_EMAIL, 264email, 265"email-change", 266username=username, 267code=request.code, 268new_email=email, 269url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/settings/confirm-email/" + request.code 270) 271 272with (SMTP(config.MAIL_SERVER) as mail): 273mail.sendmail(config.NOTIFICATION_EMAIL, email, message) 274