You're looking at it

Homepage: https://roundabout-host.com

By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 celery_tasks.py

View raw Download
text/x-script.python • 13.99 kiB
Python script, ASCII text executable
        
            
1
import common
2
import time
3
import os
4
import config
5
import email_send
6
import shutil
7
from celery import shared_task
8
from app import db, _
9
from smtplib import SMTP
10
from celery.utils.log import get_task_logger
11
from sqlalchemy.orm import make_transient
12
from datetime import datetime
13
14
15
@shared_task(ignore_result=False)
16
def send_notification(user_notification_id):
17
from models import UserNotification, Commit, Post, PullRequest
18
user_notification = db.session.get(UserNotification, user_notification_id)
19
user = user_notification.user
20
notification = user_notification.notification
21
22
if user.email:
23
with (SMTP(config.MAIL_SERVER) as mail):
24
match notification.data.get("type"):
25
case "welcome":
26
message = email_send.make_multipart_message(
27
f"[system] Welcome, {user.username}",
28
config.NOTIFICATION_EMAIL,
29
user.email,
30
"welcome",
31
username=user.username
32
)
33
case "commit":
34
commit = db.session.get(Commit, notification.data.get("commit"))
35
line_separator = "\n\n" # hack so it works in older Pythons
36
newline = "\n"
37
message = email_send.make_multipart_message(
38
f"[commit in {notification.data.get('repo')}] {commit.message.partition(line_separator)[0].replace(newline, ' ')}",
39
config.NOTIFICATION_EMAIL,
40
user.email,
41
"commit",
42
username=user.username,
43
commit=commit,
44
url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/repo/" + notification.data.get("repo") + "/commit/" + notification.data.get("commit")
45
)
46
case "post":
47
post = db.session.get(Post, notification.data.get("post"))
48
message = email_send.make_multipart_message(
49
f"[post in {notification.data.get('repo')}] {post.subject}",
50
config.NOTIFICATION_EMAIL,
51
user.email,
52
"forum",
53
username=user.username,
54
post=post,
55
url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/repo/" + notification.data.get("repo") + "/post/" + notification.data.get("post")
56
)
57
case "pr":
58
pr = db.session.get(PullRequest, notification.data.get("pr"))
59
message = email_send.make_multipart_message(
60
f"[PR in {notification.data.get('repo')}] {pr.head_route}:{pr.head_branch} -> {pr.base_route}:{pr.base_branch}",
61
config.NOTIFICATION_EMAIL,
62
user.email,
63
"pr",
64
username=user.username,
65
pr=pr,
66
url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + notification.data.get("base") + "/prs/"
67
)
68
69
mail.sendmail(config.NOTIFICATION_EMAIL, user.email, message)
70
71
return 0 # notification sent successfully
72
73
74
@shared_task(ignore_result=False)
75
def merge_heads(head_route, head_branch, base_route, base_branch, pr_id, simulate=True, method="merge", username=None):
76
from models import Repo, Commit, PullRequest, User
77
server_repo_location = os.path.join(config.REPOS_PATH, base_route.lstrip("/"))
78
pull_request = db.session.get(PullRequest, pr_id)
79
if not os.path.isdir(server_repo_location):
80
raise FileNotFoundError(f"Repo {server_repo_location} not found, cannot merge.")
81
82
# Change to the user's identity.
83
common.git_command(server_repo_location, b"", "config", "user.email", db.session.get(User, username).email or f"noreply@{config.BASE_DOMAIN}")
84
common.git_command(server_repo_location, b"", "config", "user.name", username)
85
86
if base_route == head_route:
87
common.git_command(server_repo_location, b"", "checkout", f"{base_branch}")
88
if simulate:
89
if method == "merge":
90
out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--no-commit", "--no-ff", f"heads/{head_branch}",
91
return_err=True, return_exit=True)
92
# Undo the merge.
93
common.git_command(server_repo_location, b"", "merge", "--abort")
94
elif method == "fast-forward":
95
out, err, merge_exit = common.git_command(server_repo_location, b"", "merge-base", "--is-ancestor", f"heads/{base_branch}", f"heads/{head_branch}",
96
return_err=True, return_exit=True)
97
elif method == "rebase":
98
# To attempt a rebase dry run, switch to a detached head.
99
common_ancestor = common.git_command(server_repo_location, b"", "merge-base",
100
f"heads/{base_branch}",
101
f"heads/{head_branch}").strip().decode()
102
common.git_command(server_repo_location, b"", "checkout", "--detach", f"heads/{base_branch}")
103
out, err, merge_exit = common.git_command(server_repo_location, b"", "rebase",
104
"--onto", f"heads/{head_branch}",
105
common_ancestor, return_err=True,
106
return_exit=True)
107
# Undo the rebase.
108
common.git_command(server_repo_location, b"", "rebase", "--abort")
109
else:
110
if method == "merge":
111
out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--no-ff", f"heads/{head_branch}",
112
return_err=True, return_exit=True)
113
elif method == "fast-forward":
114
out, err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--ff-only", f"heads/{head_branch}",
115
return_err=True, return_exit=True)
116
elif method == "rebase":
117
out, err, merge_exit = common.git_command(server_repo_location, b"", "rebase", f"heads/{head_branch}", return_err=True, return_exit=True)
118
119
pull_request.state = 1
120
121
for resolves in pull_request.resolves:
122
resolves.post.state = 0
123
124
db.session.commit()
125
126
new_commits = common.git_command(server_repo_location, b"", "log", "--oneline", f"heads/{base_branch}..heads/{head_branch}")
127
128
# Undo the identity change.
129
common.git_command(server_repo_location, b"", "config", "--unset", "user.email")
130
common.git_command(server_repo_location, b"", "config", "--unset", "user.name")
131
132
return "merge_simulator" if simulate else "merge", out, err, head_route, head_branch, base_route, base_branch, merge_exit, new_commits
133
134
# Otherwise, we need to fetch the head repo.
135
remote_url = ("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + (f":{config.port}" if config.port not in {80, 443} else "") + "/git" + head_route
136
137
out, err = b"", b""
138
part_out, part_err = common.git_command(server_repo_location, b"", "remote", "add", "NEW", remote_url, return_err=True)
139
out += part_out
140
err += part_err
141
part_out, part_err = common.git_command(server_repo_location, b"", "remote", "update", return_err=True)
142
out += part_out
143
err += part_err
144
part_out, part_err = common.git_command(server_repo_location, b"", "fetch", "NEW", f"{head_branch}", return_err=True)
145
out += part_out
146
err += part_err
147
part_out, part_err = common.git_command(server_repo_location, b"", "checkout", f"{base_branch}", return_err=True)
148
out += part_out
149
err += part_err
150
new_commits, part_err = common.git_command(server_repo_location, b"", "log", "--pretty=format:\"%H\"", f"heads/{base_branch}..NEW/{head_branch}", "--", return_err=True)
151
new_commits = new_commits.decode().splitlines()
152
err += part_err
153
154
if simulate:
155
if method == "merge":
156
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories",
157
"--no-commit", "--no-ff", f"NEW/{head_branch}", return_err=True, return_exit=True)
158
# Undo the merge.
159
common.git_command(server_repo_location, b"", "merge", "--abort")
160
elif method == "fast-forward":
161
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge-base", "--is-ancestor", f"heads/{base_branch}", f"NEW/{head_branch}",
162
return_err=True, return_exit=True)
163
elif method == "rebase":
164
# To attempt a rebase dry run, switch to a detached head.
165
common_ancestor = common.git_command(server_repo_location, b"", "merge-base",
166
f"heads/{base_branch}",
167
f"NEW/{head_branch}").strip().decode()
168
common.git_command(server_repo_location, b"", "checkout", "--detach", f"heads/{base_branch}")
169
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "rebase",
170
"--onto", f"NEW/{head_branch}",
171
common_ancestor, return_err=True,
172
return_exit=True)
173
# Undo the rebase.
174
common.git_command(server_repo_location, b"", "rebase", "--abort")
175
else:
176
if method == "merge":
177
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--allow-unrelated-histories",
178
"--no-ff", f"NEW/{head_branch}", return_err=True, return_exit=True)
179
elif method == "fast-forward":
180
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "merge", "--ff-only", f"NEW/{head_branch}", return_err=True, return_exit=True)
181
elif method == "rebase":
182
part_out, part_err, merge_exit = common.git_command(server_repo_location, b"", "rebase", f"NEW/{head_branch}", return_err=True, return_exit=True)
183
184
# Copy the commits rows from the head repo to the base repo
185
for commit in new_commits:
186
commit_data = Commit.query.filter_by(repo_name=head_route, sha=commit).first()
187
188
db.session.expunge(commit_data)
189
make_transient(commit_data)
190
191
commit_data.repo_name = base_route
192
commit_data.identifier = f"{base_route}/{commit_data.sha}"
193
commit_data.receive_date = datetime.now()
194
db.session.add(commit_data)
195
196
# Consider the PR merged.
197
pull_request.state = 1
198
199
for resolves in pull_request.resolves:
200
resolves.post.state = 0
201
202
db.session.commit()
203
204
out += part_out
205
err += part_err
206
part_out, part_err = common.git_command(server_repo_location, b"", "remote", "rm", "NEW", return_err=True)
207
out += part_out
208
err += part_err
209
210
# Undo the identity change.
211
common.git_command(server_repo_location, b"", "config", "--unset", "user.email")
212
common.git_command(server_repo_location, b"", "config", "--unset", "user.name")
213
214
return "merge_simulator" if simulate else "merge", out, err, head_route, head_branch, base_route, base_branch, merge_exit, new_commits
215
216
217
@shared_task(ignore_result=False)
218
def copy_site(route):
219
from models import Repo
220
repo = db.session.get(Repo, route)
221
server_repo_location = os.path.join(config.REPOS_PATH, route.lstrip("/"))
222
subdomain = repo.owner.username
223
subpath = repo.name if repo.has_site != 2 else ""
224
site_location = os.path.join(config.SITE_PATH, subdomain, subpath)
225
# Get the branch to be used for the site; if it somehow doesn't exist, use the default branch.
226
branch = repo.site_branch or repo.default_branch
227
# Make a shallow clone of the repo; this prevents getting the full git database when it's not needed.
228
if os.path.isdir(site_location):
229
# Delete the old site.
230
shutil.rmtree(site_location)
231
232
common.git_command(config.SITE_PATH, b"", "clone", "--depth=1", f"--branch={branch}", os.path.join(os.getcwd(), server_repo_location), os.path.join(subdomain, subpath))
233
234
235
@shared_task(ignore_result=False)
236
def delete_site(route):
237
from models import Repo
238
repo = db.session.get(Repo, route)
239
subdomain = repo.owner.username
240
subpath = repo.name if repo.has_site != 2 else "."
241
site_location = os.path.join(config.SITE_PATH, subdomain, subpath)
242
if os.path.isdir(site_location):
243
shutil.rmtree(site_location)
244
245
# Redo the primary site.
246
primary_site = Repo.query.filter_by(owner=repo.owner, has_site=2).first()
247
if primary_site:
248
copy_site(primary_site.route)
249
250
251
@shared_task(ignore_result=False)
252
def request_email_change(username, email):
253
from models import User, EmailChangeRequest
254
user = db.session.get(User, username)
255
256
request = EmailChangeRequest(user, email)
257
258
db.session.add(request)
259
db.session.commit()
260
261
message = email_send.make_multipart_message(
262
"Email change request for {username}".format(username=username),
263
config.NOTIFICATION_EMAIL,
264
email,
265
"email-change",
266
username=username,
267
code=request.code,
268
new_email=email,
269
url=("https://" if config.suggest_https else "http://") + config.BASE_DOMAIN + "/settings/confirm-email/" + request.code
270
)
271
272
with (SMTP(config.MAIL_SERVER) as mail):
273
mail.sendmail(config.NOTIFICATION_EMAIL, email, message)
274