By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 git_http.py

View raw Download
text/x-script.python • 7.72 kiB
Python script, ASCII text executable
        
            
1
import uuid
2
from models import *
3
from app import app, db, bcrypt
4
from misc_utils import *
5
from common import git_command
6
import os
7
import shutil
8
import config
9
import flask
10
import git
11
import subprocess
12
from flask_httpauth import HTTPBasicAuth
13
import zlib
14
import re
15
import datetime
16
17
auth = HTTPBasicAuth(realm=config.AUTH_REALM)
18
19
auth_required = flask.Response("Unauthorized Access", 401,
20
{"WWW-Authenticate": 'Basic realm="Login Required"'})
21
22
23
@auth.verify_password
24
def verify_password(username, password):
25
user = User.query.filter_by(username=username).first()
26
27
if user and bcrypt.check_password_hash(user.password_hashed, password):
28
flask.g.user = username
29
return True
30
31
return False
32
33
34
def get_commit_identity(identity, pusher, repo):
35
email = identity.rpartition("<")[2].rpartition(">")[0].strip()
36
# If the email is not valid, attribute the commit to the pusher.
37
if not email:
38
return pusher
39
email_users = db.query(User).filter_by(email=email).all()
40
# If no user has the email, attribute the commit to the pusher.
41
if not email_users:
42
return pusher
43
44
# If only one user has the email, attribute the commit to them.
45
if email_users.count() == 1:
46
return email_users[0]
47
48
# If it's ambiguous, attribute the commit to an user with a higher permission level.
49
for user in email_users:
50
if repo.owner == user:
51
return user
52
53
for user in email_users:
54
relationship = db.query(RepoAccess).filter_by(user=user, repo=repo).first()
55
if relationship.permission_level == 2:
56
return user
57
58
for user in email_users:
59
relationship = db.query(RepoAccess).filter_by(user=user, repo=repo).first()
60
if relationship.permission_level == 1:
61
return user
62
63
# If no user has a higher permission level, attribute the commit to the pusher.:(
64
return pusher
65
66
67
@app.route("/<username>/<repository>/git-upload-pack", methods=["POST"])
68
@app.route("/git/<username>/<repository>/git-upload-pack", methods=["POST"])
69
@auth.login_required(optional=True)
70
def git_upload_pack(username, repository):
71
if auth.current_user() is None and not get_visibility(username, repository):
72
return auth_required
73
if not (get_visibility(username, repository) or get_permission_level(flask.g.user, username,
74
repository) is not None):
75
flask.abort(403)
76
77
server_repo_location = os.path.join(config.REPOS_PATH, username, repository, ".git")
78
text = git_command(server_repo_location, flask.request.data, "upload-pack",
79
"--stateless-rpc", ".")
80
81
return flask.Response(text, content_type="application/x-git-upload-pack-result")
82
83
84
@app.route("/<username>/<repository>/git-receive-pack", methods=["POST"])
85
@app.route("/git/<username>/<repository>/git-receive-pack", methods=["POST"])
86
@auth.login_required
87
def git_receive_pack(username, repository):
88
if not get_permission_level(flask.g.user, username, repository):
89
flask.abort(403)
90
91
server_repo_location = os.path.join(config.REPOS_PATH, username, repository, ".git")
92
text = git_command(server_repo_location, flask.request.data, "receive-pack",
93
"--stateless-rpc", ".")
94
95
if flask.request.data == b"0000":
96
return flask.Response("", content_type="application/x-git-receive-pack-result")
97
98
push_info = flask.request.data.split(b"\x00")[0].decode()
99
if not push_info:
100
return flask.Response(text, content_type="application/x-git-receive-pack-result")
101
102
old_sha, new_sha, ref = push_info[4:].split() # discard first 4 characters, used for line length which we don't need
103
104
if old_sha == "0" * 40:
105
commits_list = subprocess.check_output(["git", "rev-list", new_sha],
106
cwd=server_repo_location).decode().strip().split("\n")
107
else:
108
commits_list = subprocess.check_output(["git", "rev-list", f"{old_sha}..{new_sha}"],
109
cwd=server_repo_location).decode().strip().split("\n")
110
111
for sha in reversed(commits_list):
112
info = git_command(server_repo_location, None, "show", "-s",
113
"--format='%H%n%at%n%cn <%ce>%n%B'", sha).decode()
114
115
sha, time, identity, body = info.split("\n", 3)
116
login = flask.g.user
117
118
if not Commit.query.filter_by(identifier=f"/{username}/{repository}/{sha}").first():
119
logged_in_user = User.query.filter_by(username=login).first()
120
user = get_commit_identity(identity, logged_in_user, db.session.get(Repo, f"/{username}/{repository}"))
121
repo = Repo.query.filter_by(route=f"/{username}/{repository}").first()
122
123
commit = Commit(sha, user, repo, time, body, identity)
124
125
db.session.add(commit)
126
db.session.commit()
127
128
if ref.startswith("refs/heads/"): # if the push is to a branch
129
ref = ref.rpartition("/")[2] # get the branch name only
130
repo_data = db.session.get(Repo, f"/{username}/{repository}")
131
if ref == repo_data.site_branch:
132
# Update the site
133
from celery_tasks import copy_site
134
copy_site.delay(repo_data.route)
135
136
return flask.Response(text, content_type="application/x-git-receive-pack-result")
137
138
139
@app.route("/<username>/<repository>/info/refs", methods=["GET", "POST"])
140
@app.route("/git/<username>/<repository>/info/refs", methods=["GET", "POST"])
141
@auth.login_required(optional=True)
142
def git_info_refs(username, repository):
143
server_repo_location = os.path.join(config.REPOS_PATH, username, repository, ".git")
144
145
repo = git.Repo(server_repo_location)
146
repo_data = Repo.query.filter_by(route=f"/{username}/{repository}").first()
147
if not repo_data.default_branch:
148
if repo.heads:
149
repo_data.default_branch = repo.heads[0].name
150
repo.git.checkout("-f", repo_data.default_branch)
151
152
if auth.current_user() is None and (
153
not get_visibility(username, repository) or flask.request.args.get(
154
"service") == "git-receive-pack"):
155
return auth_required
156
try:
157
if not (get_visibility(username, repository) or get_permission_level(flask.g.user,
158
username,
159
repository) is not None):
160
flask.abort(403)
161
except AttributeError:
162
return auth_required
163
164
service = flask.request.args.get("service")
165
166
if service.startswith("git"):
167
service = service[4:]
168
else:
169
flask.abort(403)
170
171
if service == "receive-pack":
172
try:
173
if not get_permission_level(flask.g.user, username, repository):
174
flask.abort(403)
175
except AttributeError:
176
return auth_required
177
178
service_line = f"# service=git-{service}\n"
179
service_line = (f"{len(service_line) + 4:04x}" + service_line).encode()
180
181
if service == "upload-pack":
182
text = service_line + b"0000" + git_command(server_repo_location, None, "upload-pack",
183
"--stateless-rpc",
184
"--advertise-refs",
185
"--http-backend-info-refs", ".")
186
elif service == "receive-pack":
187
refs = git_command(server_repo_location, None, "receive-pack",
188
"--http-backend-info-refs", ".")
189
text = service_line + b"0000" + refs
190
else:
191
flask.abort(403)
192
193
response = flask.Response(text, content_type=f"application/x-git-{service}-advertisement")
194
response.headers["Cache-Control"] = "no-cache"
195
196
return response
197