celery_tasks.py
Python script, ASCII text executable
1""" 2This module contains asynchronous tasks for Roundabout using Celery. 3 4Roundabout - git hosting for everyone <https://roundabout-host.com> 5Copyright (C) 2023-2025 Roundabout developers <root@roundabout-host.com> 6 7This program is free software: you can redistribute it and/or modify 8it under the terms of the GNU Affero General Public License as published by 9the Free Software Foundation, either version 3 of the License, or 10(at your option) any later version. 11 12This program is distributed in the hope that it will be useful, 13but WITHOUT ANY WARRANTY; without even the implied warranty of 14MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15GNU Affero General Public License for more details. 16 17You should have received a copy of the GNU Affero General Public License 18along with this program. If not, see <http://www.gnu.org/licenses/>. 19""" 20 21import common 22import time 23import os 24import config 25import email_send 26import shutil 27from celery import shared_task 28from app import db, _ 29from misc_utils import * 30from smtplib import SMTP 31from celery.utils.log import get_task_logger 32from sqlalchemy.orm import make_transient 33from datetime import datetime 34import httpx 35import bs4 as bs 36 37 38def _get_foreign_commit_owner(server, repo_route, sha): 39response = httpx.get(f"http://{server}/data-api/commit{repo_route}/{sha}", follow_redirects=True) # repo_route is supposed to be /<owner>/<repo>, so the slash is already there 40if response.status_code == 200: 41soup = bs.BeautifulSoup(response.text, "lxml-xml") 42return soup.find("commit")["owner"] + "@" + server 43 44 45def _parse_commit(directory, repo_route, sha, pusher, owner_name=None): 46from models import User, Repo, Commit 47commit_info = common.git_command(directory, b"", "show", "-s", "--format='%H%n%at%n%cn <%ce>%n%B'", sha).strip().decode() 48 49sha, time, identity, body = commit_info.split("\n", 3) 50 51if owner_name is None: 52owner_name = get_commit_identity(identity, pusher, db.session.get(Repo, repo_route)).username 53 54return Commit(sha, pusher, db.session.get(Repo, repo_route), time, body, identity, owner_name) 55 56 57@shared_task(ignore_result=False) 58def send_notification(user_notification_id): 59from models import UserNotification, Commit, Post, PullRequest 60user_notification = db.session.get(UserNotification, user_notification_id) 61user = user_notification.user 62notification = user_notification.notification 63 64if user.email: 65with (SMTP(config.MAIL_SERVER) as mail): 66match notification.data.get("type"): 67case "welcome": 68message = email_send.make_multipart_message( 69f"[system] Welcome, {user.username}", 70config.NOTIFICATION_EMAIL, 71user.email, 72"welcome", 73username=user.username 74) 75case "commit": 76commit = db.session.get(Commit, notification.data.get("commit")) 77line_separator = "\n\n" # hack so it works in older Pythons 78newline = "\n" 79message = email_send.make_multipart_message( 80f"[commit in {notification.data.get('repo')}] {commit.message.partition(line_separator)[0].replace(newline, ' ')}", 81config.NOTIFICATION_EMAIL, 82user.email, 83"commit", 84username=user.username, 85commit=commit, 86url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/repo/" + notification.data.get("repo") + "/commit/" + notification.data.get("commit") 87) 88case "post": 89post = db.session.get(Post, notification.data.get("post")) 90message = email_send.make_multipart_message( 91f"[post in {notification.data.get('repo')}] {post.subject}", 92config.NOTIFICATION_EMAIL, 93user.email, 94"forum", 95username=user.username, 96post=post, 97url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/repo/" + notification.data.get("repo") + "/post/" + notification.data.get("post") 98) 99case "pr": 100pr = db.session.get(PullRequest, notification.data.get("pr")) 101message = email_send.make_multipart_message( 102f"[PR in {notification.data.get('repo')}] {pr.head_route}:{pr.head_branch} -> {pr.base_route}:{pr.base_branch}", 103config.NOTIFICATION_EMAIL, 104user.email, 105"pr", 106username=user.username, 107pr=pr, 108url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + notification.data.get("base") + "/prs/" 109) 110 111mail.sendmail(config.NOTIFICATION_EMAIL, user.email, message) 112 113return 0 # notification sent successfully 114 115 116@shared_task(ignore_result=False) 117def merge_heads(head_route, head_branch, base_route, base_branch, pr_id, simulate=True, method="merge", username=None): 118from models import Repo, Commit, PullRequest, User 119server_repo_location = os.path.join(config.REPOS_PATH, base_route.lstrip("/")) 120pull_request = db.session.get(PullRequest, pr_id) 121if not os.path.isdir(server_repo_location): 122raise FileNotFoundError(f"Repo {server_repo_location} not found, cannot merge.") 123 124# Change to the user's identity. 125common.git_command(server_repo_location, b"", "config", "user.email", db.session.get(User, username).email or f"noreply@{config.BASE_DOMAIN}") 126common.git_command(server_repo_location, b"", "config", "user.name", username) 127 128if base_route == head_route: 129common.git_command(server_repo_location, b"", "checkout", f"{base_branch}") 130if simulate: 131if method == "merge": 132out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--no-commit", "--no-ff", f"heads/{head_branch}", 133return_err=True, return_exit=True) 134# Undo the merge. 135common.git_command(server_repo_location, b"", "merge", "--abort") 136elif method == "fast-forward": 137out, err, merge_exit = common.git_command(server_repo_location, b"", "merge-base", "--is-ancestor", f"heads/{base_branch}", f"heads/{head_branch}", 138return_err=True, return_exit=True) 139elif method == "rebase": 140# To attempt a rebase dry run, switch to a detached head. 141common_ancestor = common.git_command(server_repo_location, b"", "merge-base", 142f"heads/{base_branch}", 143f"heads/{head_branch}").strip().decode() 144common.git_command(server_repo_location, b"", "checkout", "--detach", f"heads/{base_branch}") 145out, err, merge_exit = common.git_command(server_repo_location, b"", "rebase", 146"--onto", f"heads/{head_branch}", 147common_ancestor, return_err=True, 148return_exit=True) 149# Undo the rebase. 150common.git_command(server_repo_location, b"", "rebase", "--abort") 151else: 152if method == "merge": 153out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--no-ff", f"heads/{head_branch}", 154return_err=True, return_exit=True) 155elif method == "fast-forward": 156out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--ff-only", f"heads/{head_branch}", 157return_err=True, return_exit=True) 158elif method == "rebase": 159old_head_commit = common.git_command(server_repo_location, b"", "rev-parse", f"heads/{head_branch}") 160out, err, merge_exit = common.git_command(server_repo_location, b"", "rebase", f"heads/{head_branch}", return_err=True, return_exit=True) 161 162# Add the merge commit. 163if method == "merge": 164last_commit = common.git_command(server_repo_location, b"", "log", 165"--pretty=format:\"%H\"", "-n", "1").strip().decode() 166last_commit_info = common.git_command(server_repo_location, b"", "show", "-s", 167"--format='%H%n%at%n%cn <%ce>%n%B'", 168last_commit).strip().decode() 169 170if not db.session.get(Commit, f"{base_route}/{last_commit}"): 171sha, time, identity, body = last_commit_info.split("\n", 3) 172if not db.session.get(Commit, f"/{base_route}/{sha}"): 173commit = Commit(sha, db.session.get(User, username), 174db.session.get(Repo, base_route), time, body, identity, 175db.session.get(User, username)) 176db.session.add(commit) 177elif method == "rebase": 178new_commits = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", 179f"{old_head_commit}..heads/{head_branch}", "--").decode().splitlines() 180 181# Copy the commit rows from the head repo to the base repo. 182for commit in new_commits: 183commit_data = _parse_commit(server_repo_location, base_route, commit, db.session.get(User, username)) 184db.session.add(commit_data) 185 186# Consider the PR merged. 187pull_request.state = 1 188 189for resolves in pull_request.resolves: 190resolves.post.state = 0 191 192db.session.commit() 193 194new_commits = common.git_command(server_repo_location, b"", "log", "--oneline", f"heads/{base_branch}..heads/{head_branch}") 195 196# Undo the identity change. 197common.git_command(server_repo_location, b"", "config", "--unset", "user.email") 198common.git_command(server_repo_location, b"", "config", "--unset", "user.name") 199 200return "merge_simulator" if simulate else "merge", out, err, head_route, head_branch, base_route, base_branch, merge_exit, new_commits 201 202# Otherwise, we need to fetch the head repo. 203head_user = head_route.rpartition("/")[0].removeprefix("/") 204head_username = head_user.partition("@")[0] if "@" in head_user else head_user 205head_name = head_route.rpartition("/")[2] 206head_domain = head_user.partition("@")[2] if "@" in head_user else config.BASE_DOMAIN + (f":{config.port}" if config.port not in {80, 443} else "") 207remote_url = ("https://" if config.suggest_https else "http://") + head_domain + "/git/" + head_username + "/" + head_name 208 209out, err = b"", b"" 210part_out, part_err = common.git_command(server_repo_location, b"", "remote", "add", "NEW", remote_url, return_err=True) 211out += part_out 212err += part_err 213part_out, part_err = common.git_command(server_repo_location, b"", "remote", "update", return_err=True) 214out += part_out 215err += part_err 216part_out, part_err = common.git_command(server_repo_location, b"", "fetch", "NEW", f"refs/heads/{head_branch}:refs/remotes/NEW/{head_branch}", return_err=True) 217out += part_out 218err += part_err 219part_out, part_err = common.git_command(server_repo_location, b"", "checkout", f"{base_branch}", return_err=True) 220out += part_out 221err += part_err 222 223if simulate: 224if method == "merge": 225part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories", 226"--no-commit", "--no-ff", f"NEW/{head_branch}", return_err=True, return_exit=True) 227# Undo the merge. 228common.git_command(server_repo_location, b"", "merge", "--abort") 229elif method == "fast-forward": 230part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge-base", "--is-ancestor", f"heads/{base_branch}", f"NEW/{head_branch}", 231return_err=True, return_exit=True) 232elif method == "rebase": 233# To attempt a rebase dry run, switch to a detached head. 234common_ancestor = common.git_command(server_repo_location, b"", "merge-base", 235f"heads/{base_branch}", 236f"NEW/{head_branch}").strip().decode() 237common.git_command(server_repo_location, b"", "checkout", "--detach", f"heads/{base_branch}") 238part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "rebase", 239"--onto", f"NEW/{head_branch}", 240common_ancestor, return_err=True, 241return_exit=True) 242# Undo the rebase. 243common.git_command(server_repo_location, b"", "rebase", "--abort") 244else: 245if method == "merge": 246new_commits = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", f"heads/{base_branch}..NEW/{head_branch}", "--").decode().splitlines() 247part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories", 248"--no-ff", f"NEW/{head_branch}", return_err=True, return_exit=True) 249elif method == "fast-forward": 250new_commits = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", f"heads/{base_branch}..NEW/{head_branch}", "--").decode().splitlines() 251part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--ff-only", f"NEW/{head_branch}", return_err=True, return_exit=True) 252elif method == "rebase": 253old_head_commit = common.git_command(server_repo_location, b"", "rev-parse", f"heads/{head_branch}") 254part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "rebase", f"NEW/{head_branch}", return_err=True, return_exit=True) 255new_commits = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", f"{old_head_commit}..heads/{head_branch}", "--").decode().splitlines() 256 257# Copy the commit rows from the head repo to the base repo. 258if "@" not in head_user: 259# Same server, no API call needed. 260for commit in new_commits: 261if method != "rebase": 262commit_data = Commit.query.filter_by(repo_name=head_route, sha=commit).first() 263db.session.expunge(commit_data) 264make_transient(commit_data) 265 266commit_data.repo_name = base_route 267commit_data.identifier = f"{base_route}/{commit_data.sha}" 268commit_data.receive_date = datetime.now() 269commit_data.pusher = db.session.get(User, username) 270else: 271commit_data = _parse_commit(server_repo_location, base_route, commit, db.session.get(User, username), head_username) 272db.session.add(commit_data) 273else: 274# Different server, use the API. 275for commit in new_commits: 276if method != "rebase": 277commit_data = _parse_commit(server_repo_location, base_route, commit, db.session.get(User, username), _get_foreign_commit_owner(head_domain, head_route, commit)) 278else: 279commit_data = _parse_commit(server_repo_location, base_route, commit, db.session.get(User, username), head_username) 280db.session.add(commit_data) 281 282# Add the merge commit. 283if method == "merge": 284last_commit = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", "-n", "1").strip().decode() 285last_commit_info = common.git_command(server_repo_location, b"", "show", "-s", "--format='%H%n%at%n%cn <%ce>%n%B'", last_commit).strip().decode().split("\n") 286err += part_err 287 288if not db.session.get(Commit, f"{base_route}/{last_commit}"): 289sha, time, identity, body = last_commit_info.split("\n", 3) 290if not db.session.get(Commit, f"/{base_route}/{sha}"): 291commit = Commit(sha, db.session.get(User, username), db.session.get(Repo, base_route), time, body, identity, db.session.get(User, username)) 292db.session.add(commit) 293 294# Consider the PR merged. 295pull_request.state = 1 296 297for resolves in pull_request.resolves: 298resolves.post.state = 0 299 300db.session.commit() 301 302out += part_out 303err += part_err 304part_out, part_err = common.git_command(server_repo_location, b"", "remote", "prune", "NEW", return_err=True) 305out += part_out 306err += part_err 307part_out, part_err = common.git_command(server_repo_location, b"", "remote", "rm", "NEW", return_err=True) 308out += part_out 309err += part_err 310part_out, part_err = common.git_command(server_repo_location, b"", "fetch", "--prune", return_err=True) 311out += part_out 312err += part_err 313 314# Undo the identity change. 315common.git_command(server_repo_location, b"", "config", "--unset", "user.email") 316common.git_command(server_repo_location, b"", "config", "--unset", "user.name") 317 318return "merge_simulator" if simulate else "merge", out, err, head_route, head_branch, base_route, base_branch, merge_exit, new_commits 319 320 321@shared_task(ignore_result=False) 322def copy_site(route): 323from models import Repo 324repo = db.session.get(Repo, route) 325server_repo_location = os.path.join(config.REPOS_PATH, route.lstrip("/")) 326subdomain = repo.owner.username 327subpath = repo.name if repo.has_site != 2 else "" 328site_location = os.path.join(config.SITE_PATH, subdomain, subpath) 329# Get the branch to be used for the site; if it somehow doesn't exist, use the default branch. 330branch = repo.site_branch or repo.default_branch 331# Make a shallow clone of the repo; this prevents getting the full git database when it's not needed. 332if os.path.isdir(site_location): 333# Delete the old site. 334shutil.rmtree(site_location) 335 336common.git_command(config.SITE_PATH, b"", "clone", "--depth=1", f"--branch={branch}", os.path.join(os.getcwd(), server_repo_location), os.path.join(subdomain, subpath)) 337 338 339@shared_task(ignore_result=False) 340def delete_site(route): 341from models import Repo 342repo = db.session.get(Repo, route) 343subdomain = repo.owner.username 344subpath = repo.name if repo.has_site != 2 else "." 345site_location = os.path.join(config.SITE_PATH, subdomain, subpath) 346if os.path.isdir(site_location): 347shutil.rmtree(site_location) 348 349# Redo the primary site. 350primary_site = Repo.query.filter_by(owner=repo.owner, has_site=2).first() 351if primary_site: 352copy_site(primary_site.route) 353 354 355@shared_task(ignore_result=False) 356def request_email_change(username, email): 357from models import User, EmailChangeRequest 358user = db.session.get(User, username) 359 360request = EmailChangeRequest(user, email) 361 362db.session.add(request) 363db.session.commit() 364 365message = email_send.make_multipart_message( 366"Email change request for {username}".format(username=username), 367config.NOTIFICATION_EMAIL, 368email, 369"email-change", 370username=username, 371code=request.code, 372new_email=email, 373url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/settings/confirm-email/" + request.code 374) 375 376with (SMTP(config.MAIL_SERVER) as mail): 377mail.sendmail(config.NOTIFICATION_EMAIL, email, message) 378