You're looking at it

Homepage: https://roundabout-host.com

By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 celery_tasks.py

View raw Download
text/plain • 20.22 kiB
Python script, ASCII text executable
        
            
1
"""
2
This module contains asynchronous tasks for Roundabout using Celery.
3
4
Roundabout - git hosting for everyone <https://roundabout-host.com>
5
Copyright (C) 2023-2025 Roundabout developers <root@roundabout-host.com>
6
7
This program is free software: you can redistribute it and/or modify
8
it under the terms of the GNU Affero General Public License as published by
9
the Free Software Foundation, either version 3 of the License, or
10
(at your option) any later version.
11
12
This program is distributed in the hope that it will be useful,
13
but WITHOUT ANY WARRANTY; without even the implied warranty of
14
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
GNU Affero General Public License for more details.
16
17
You should have received a copy of the GNU Affero General Public License
18
along with this program. If not, see <http://www.gnu.org/licenses/>.
19
"""
20
21
import common
22
import time
23
import os
24
import config
25
import email_send
26
import shutil
27
from celery import shared_task
28
from app import db, _
29
from misc_utils import *
30
from smtplib import SMTP
31
from celery.utils.log import get_task_logger
32
from sqlalchemy.orm import make_transient
33
from datetime import datetime
34
import httpx
35
import bs4 as bs
36
37
38
def _get_foreign_commit_owner(server, repo_route, sha):
39
response = httpx.get(f"http://{server}/data-api/commit{repo_route}/{sha}", follow_redirects=True) # repo_route is supposed to be /<owner>/<repo>, so the slash is already there
40
if response.status_code == 200:
41
soup = bs.BeautifulSoup(response.text, "lxml-xml")
42
return soup.find("commit")["owner"] + "@" + server
43
44
45
def _parse_commit(directory, repo_route, sha, pusher, owner_name=None):
46
from models import User, Repo, Commit
47
commit_info = common.git_command(directory, b"", "show", "-s", "--format='%H%n%at%n%cn <%ce>%n%B'", sha).strip().decode()
48
49
sha, time, identity, body = commit_info.split("\n", 3)
50
51
if owner_name is None:
52
owner_name = get_commit_identity(identity, pusher, db.session.get(Repo, repo_route)).username
53
54
return Commit(sha, pusher, db.session.get(Repo, repo_route), time, body, identity, owner_name)
55
56
57
@shared_task(ignore_result=False)
58
def send_notification(user_notification_id):
59
from models import UserNotification, Commit, Post, PullRequest
60
user_notification = db.session.get(UserNotification, user_notification_id)
61
user = user_notification.user
62
notification = user_notification.notification
63
64
if user.email:
65
with (SMTP(config.MAIL_SERVER) as mail):
66
match notification.data.get("type"):
67
case "welcome":
68
message = email_send.make_multipart_message(
69
f"[system] Welcome, {user.username}",
70
config.NOTIFICATION_EMAIL,
71
user.email,
72
"welcome",
73
username=user.username
74
)
75
case "commit":
76
commit = db.session.get(Commit, notification.data.get("commit"))
77
line_separator = "\n\n" # hack so it works in older Pythons
78
newline = "\n"
79
message = email_send.make_multipart_message(
80
f"[commit in {notification.data.get('repo')}] {commit.message.partition(line_separator)[0].replace(newline, ' ')}",
81
config.NOTIFICATION_EMAIL,
82
user.email,
83
"commit",
84
username=user.username,
85
commit=commit,
86
url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/repo/" + notification.data.get("repo") + "/commit/" + notification.data.get("commit")
87
)
88
case "post":
89
post = db.session.get(Post, notification.data.get("post"))
90
message = email_send.make_multipart_message(
91
f"[post in {notification.data.get('repo')}] {post.subject}",
92
config.NOTIFICATION_EMAIL,
93
user.email,
94
"forum",
95
username=user.username,
96
post=post,
97
url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/repo/" + notification.data.get("repo") + "/post/" + notification.data.get("post")
98
)
99
case "pr":
100
pr = db.session.get(PullRequest, notification.data.get("pr"))
101
message = email_send.make_multipart_message(
102
f"[PR in {notification.data.get('repo')}] {pr.head_route}:{pr.head_branch} -> {pr.base_route}:{pr.base_branch}",
103
config.NOTIFICATION_EMAIL,
104
user.email,
105
"pr",
106
username=user.username,
107
pr=pr,
108
url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + notification.data.get("base") + "/prs/"
109
)
110
111
mail.sendmail(config.NOTIFICATION_EMAIL, user.email, message)
112
113
return 0 # notification sent successfully
114
115
116
@shared_task(ignore_result=False)
117
def merge_heads(head_route, head_branch, base_route, base_branch, pr_id, simulate=True, method="merge", username=None):
118
from models import Repo, Commit, PullRequest, User
119
server_repo_location = os.path.join(config.REPOS_PATH, base_route.lstrip("/"))
120
pull_request = db.session.get(PullRequest, pr_id)
121
if not os.path.isdir(server_repo_location):
122
raise FileNotFoundError(f"Repo {server_repo_location} not found, cannot merge.")
123
124
# Change to the user's identity.
125
common.git_command(server_repo_location, b"", "config", "user.email", db.session.get(User, username).email or f"noreply@{config.BASE_DOMAIN}")
126
common.git_command(server_repo_location, b"", "config", "user.name", username)
127
128
if base_route == head_route:
129
common.git_command(server_repo_location, b"", "checkout", f"{base_branch}")
130
if simulate:
131
if method == "merge":
132
out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--no-commit", "--no-ff", f"heads/{head_branch}",
133
return_err=True, return_exit=True)
134
# Undo the merge.
135
common.git_command(server_repo_location, b"", "merge", "--abort")
136
elif method == "fast-forward":
137
out, err, merge_exit = common.git_command(server_repo_location, b"", "merge-base", "--is-ancestor", f"heads/{base_branch}", f"heads/{head_branch}",
138
return_err=True, return_exit=True)
139
elif method == "rebase":
140
# To attempt a rebase dry run, switch to a detached head.
141
common_ancestor = common.git_command(server_repo_location, b"", "merge-base",
142
f"heads/{base_branch}",
143
f"heads/{head_branch}").strip().decode()
144
common.git_command(server_repo_location, b"", "checkout", "--detach", f"heads/{base_branch}")
145
out, err, merge_exit = common.git_command(server_repo_location, b"", "rebase",
146
"--onto", f"heads/{head_branch}",
147
common_ancestor, return_err=True,
148
return_exit=True)
149
# Undo the rebase.
150
common.git_command(server_repo_location, b"", "rebase", "--abort")
151
else:
152
if method == "merge":
153
out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--no-ff", f"heads/{head_branch}",
154
return_err=True, return_exit=True)
155
elif method == "fast-forward":
156
out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--ff-only", f"heads/{head_branch}",
157
return_err=True, return_exit=True)
158
elif method == "rebase":
159
old_head_commit = common.git_command(server_repo_location, b"", "rev-parse", f"heads/{head_branch}")
160
out, err, merge_exit = common.git_command(server_repo_location, b"", "rebase", f"heads/{head_branch}", return_err=True, return_exit=True)
161
162
# Add the merge commit.
163
if method == "merge":
164
last_commit = common.git_command(server_repo_location, b"", "log",
165
"--pretty=format:\"%H\"", "-n", "1").strip().decode()
166
last_commit_info = common.git_command(server_repo_location, b"", "show", "-s",
167
"--format='%H%n%at%n%cn <%ce>%n%B'",
168
last_commit).strip().decode()
169
170
if not db.session.get(Commit, f"{base_route}/{last_commit}"):
171
sha, time, identity, body = last_commit_info.split("\n", 3)
172
if not db.session.get(Commit, f"/{base_route}/{sha}"):
173
commit = Commit(sha, db.session.get(User, username),
174
db.session.get(Repo, base_route), time, body, identity,
175
db.session.get(User, username))
176
db.session.add(commit)
177
elif method == "rebase":
178
new_commits = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"",
179
f"{old_head_commit}..heads/{head_branch}", "--").decode().splitlines()
180
181
# Copy the commit rows from the head repo to the base repo.
182
for commit in new_commits:
183
commit_data = _parse_commit(server_repo_location, base_route, commit, db.session.get(User, username))
184
db.session.add(commit_data)
185
186
# Consider the PR merged.
187
pull_request.state = 1
188
189
for resolves in pull_request.resolves:
190
resolves.post.state = 0
191
192
db.session.commit()
193
194
new_commits = common.git_command(server_repo_location, b"", "log", "--oneline", f"heads/{base_branch}..heads/{head_branch}")
195
196
# Undo the identity change.
197
common.git_command(server_repo_location, b"", "config", "--unset", "user.email")
198
common.git_command(server_repo_location, b"", "config", "--unset", "user.name")
199
200
return "merge_simulator" if simulate else "merge", out, err, head_route, head_branch, base_route, base_branch, merge_exit, new_commits
201
202
# Otherwise, we need to fetch the head repo.
203
head_user = head_route.rpartition("/")[0].removeprefix("/")
204
head_username = head_user.partition("@")[0] if "@" in head_user else head_user
205
head_name = head_route.rpartition("/")[2]
206
head_domain = head_user.partition("@")[2] if "@" in head_user else config.BASE_DOMAIN + (f":{config.port}" if config.port not in {80, 443} else "")
207
remote_url = ("https://" if config.suggest_https else "http://") + head_domain + "/git/" + head_username + "/" + head_name
208
209
out, err = b"", b""
210
part_out, part_err = common.git_command(server_repo_location, b"", "remote", "add", "NEW", remote_url, return_err=True)
211
out += part_out
212
err += part_err
213
part_out, part_err = common.git_command(server_repo_location, b"", "remote", "update", return_err=True)
214
out += part_out
215
err += part_err
216
part_out, part_err = common.git_command(server_repo_location, b"", "fetch", "NEW", f"refs/heads/{head_branch}:refs/remotes/NEW/{head_branch}", return_err=True)
217
out += part_out
218
err += part_err
219
part_out, part_err = common.git_command(server_repo_location, b"", "checkout", f"{base_branch}", return_err=True)
220
out += part_out
221
err += part_err
222
223
if simulate:
224
if method == "merge":
225
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories",
226
"--no-commit", "--no-ff", f"NEW/{head_branch}", return_err=True, return_exit=True)
227
# Undo the merge.
228
common.git_command(server_repo_location, b"", "merge", "--abort")
229
elif method == "fast-forward":
230
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge-base", "--is-ancestor", f"heads/{base_branch}", f"NEW/{head_branch}",
231
return_err=True, return_exit=True)
232
elif method == "rebase":
233
# To attempt a rebase dry run, switch to a detached head.
234
common_ancestor = common.git_command(server_repo_location, b"", "merge-base",
235
f"heads/{base_branch}",
236
f"NEW/{head_branch}").strip().decode()
237
common.git_command(server_repo_location, b"", "checkout", "--detach", f"heads/{base_branch}")
238
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "rebase",
239
"--onto", f"NEW/{head_branch}",
240
common_ancestor, return_err=True,
241
return_exit=True)
242
# Undo the rebase.
243
common.git_command(server_repo_location, b"", "rebase", "--abort")
244
else:
245
if method == "merge":
246
new_commits = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", f"heads/{base_branch}..NEW/{head_branch}", "--").decode().splitlines()
247
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories",
248
"--no-ff", f"NEW/{head_branch}", return_err=True, return_exit=True)
249
elif method == "fast-forward":
250
new_commits = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", f"heads/{base_branch}..NEW/{head_branch}", "--").decode().splitlines()
251
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--ff-only", f"NEW/{head_branch}", return_err=True, return_exit=True)
252
elif method == "rebase":
253
old_head_commit = common.git_command(server_repo_location, b"", "rev-parse", f"heads/{head_branch}")
254
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "rebase", f"NEW/{head_branch}", return_err=True, return_exit=True)
255
new_commits = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", f"{old_head_commit}..heads/{head_branch}", "--").decode().splitlines()
256
257
# Copy the commit rows from the head repo to the base repo.
258
if "@" not in head_user:
259
# Same server, no API call needed.
260
for commit in new_commits:
261
if method != "rebase":
262
commit_data = Commit.query.filter_by(repo_name=head_route, sha=commit).first()
263
db.session.expunge(commit_data)
264
make_transient(commit_data)
265
266
commit_data.repo_name = base_route
267
commit_data.identifier = f"{base_route}/{commit_data.sha}"
268
commit_data.receive_date = datetime.now()
269
commit_data.pusher = db.session.get(User, username)
270
else:
271
commit_data = _parse_commit(server_repo_location, base_route, commit, db.session.get(User, username), head_username)
272
db.session.add(commit_data)
273
else:
274
# Different server, use the API.
275
for commit in new_commits:
276
if method != "rebase":
277
commit_data = _parse_commit(server_repo_location, base_route, commit, db.session.get(User, username), _get_foreign_commit_owner(head_domain, head_route, commit))
278
else:
279
commit_data = _parse_commit(server_repo_location, base_route, commit, db.session.get(User, username), head_username)
280
db.session.add(commit_data)
281
282
# Add the merge commit.
283
if method == "merge":
284
last_commit = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", "-n", "1").strip().decode()
285
last_commit_info = common.git_command(server_repo_location, b"", "show", "-s", "--format='%H%n%at%n%cn <%ce>%n%B'", last_commit).strip().decode().split("\n")
286
err += part_err
287
288
if not db.session.get(Commit, f"{base_route}/{last_commit}"):
289
sha, time, identity, body = last_commit_info.split("\n", 3)
290
if not db.session.get(Commit, f"/{base_route}/{sha}"):
291
commit = Commit(sha, db.session.get(User, username), db.session.get(Repo, base_route), time, body, identity, db.session.get(User, username))
292
db.session.add(commit)
293
294
# Consider the PR merged.
295
pull_request.state = 1
296
297
for resolves in pull_request.resolves:
298
resolves.post.state = 0
299
300
db.session.commit()
301
302
out += part_out
303
err += part_err
304
part_out, part_err = common.git_command(server_repo_location, b"", "remote", "prune", "NEW", return_err=True)
305
out += part_out
306
err += part_err
307
part_out, part_err = common.git_command(server_repo_location, b"", "remote", "rm", "NEW", return_err=True)
308
out += part_out
309
err += part_err
310
part_out, part_err = common.git_command(server_repo_location, b"", "fetch", "--prune", return_err=True)
311
out += part_out
312
err += part_err
313
314
# Undo the identity change.
315
common.git_command(server_repo_location, b"", "config", "--unset", "user.email")
316
common.git_command(server_repo_location, b"", "config", "--unset", "user.name")
317
318
return "merge_simulator" if simulate else "merge", out, err, head_route, head_branch, base_route, base_branch, merge_exit, new_commits
319
320
321
@shared_task(ignore_result=False)
322
def copy_site(route):
323
from models import Repo
324
repo = db.session.get(Repo, route)
325
server_repo_location = os.path.join(config.REPOS_PATH, route.lstrip("/"))
326
subdomain = repo.owner.username
327
subpath = repo.name if repo.has_site != 2 else ""
328
site_location = os.path.join(config.SITE_PATH, subdomain, subpath)
329
# Get the branch to be used for the site; if it somehow doesn't exist, use the default branch.
330
branch = repo.site_branch or repo.default_branch
331
# Make a shallow clone of the repo; this prevents getting the full git database when it's not needed.
332
if os.path.isdir(site_location):
333
# Delete the old site.
334
shutil.rmtree(site_location)
335
336
common.git_command(config.SITE_PATH, b"", "clone", "--depth=1", f"--branch={branch}", os.path.join(os.getcwd(), server_repo_location), os.path.join(subdomain, subpath))
337
338
339
@shared_task(ignore_result=False)
340
def delete_site(route):
341
from models import Repo
342
repo = db.session.get(Repo, route)
343
subdomain = repo.owner.username
344
subpath = repo.name if repo.has_site != 2 else "."
345
site_location = os.path.join(config.SITE_PATH, subdomain, subpath)
346
if os.path.isdir(site_location):
347
shutil.rmtree(site_location)
348
349
# Redo the primary site.
350
primary_site = Repo.query.filter_by(owner=repo.owner, has_site=2).first()
351
if primary_site:
352
copy_site(primary_site.route)
353
354
355
@shared_task(ignore_result=False)
356
def request_email_change(username, email):
357
from models import User, EmailChangeRequest
358
user = db.session.get(User, username)
359
360
request = EmailChangeRequest(user, email)
361
362
db.session.add(request)
363
db.session.commit()
364
365
message = email_send.make_multipart_message(
366
"Email change request for {username}".format(username=username),
367
config.NOTIFICATION_EMAIL,
368
email,
369
"email-change",
370
username=username,
371
code=request.code,
372
new_email=email,
373
url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/settings/confirm-email/" + request.code
374
)
375
376
with (SMTP(config.MAIL_SERVER) as mail):
377
mail.sendmail(config.NOTIFICATION_EMAIL, email, message)
378