app.py
Python script, ASCII text executable
1
import datetime
2
3
import celery
4
import flask
5
import sqlalchemy
6
from flask_sqlalchemy import SQLAlchemy
7
from flask_bcrypt import Bcrypt
8
from flask_migrate import Migrate
9
10
from sqlalchemy.orm import declarative_base
11
from celery import shared_task
12
import httpx
13
14
app = flask.Flask(__name__)
15
from celery import Celery, Task
16
17
18
def celery_init_app(app_) -> Celery:
19
class FlaskTask(Task):
20
def __call__(self, *args: object, **kwargs: object) -> object:
21
with app_.app_context():
22
return self.run(*args, **kwargs)
23
24
celery_app = Celery(app_.name, task_cls=FlaskTask)
25
celery_app.config_from_object(app_.config["CELERY"])
26
celery_app.set_default()
27
app_.extensions["celery"] = celery_app
28
return celery_app
29
30
31
app.config.from_mapping(
32
CELERY=dict(
33
broker_url="redis://localhost:6379",
34
result_backend="redis://localhost:6379"
35
),
36
)
37
celery_app = celery_init_app(app)
38
39
app.config["SQLALCHEMY_DATABASE_URI"] = \
40
"postgresql://echo:1234@localhost:5432/echo"
41
app.config["SQLALCHEMY_ENGINE_OPTIONS"] = {
42
"connect_args": {
43
"options": "-c timezone=utc"
44
}
45
}
46
db = SQLAlchemy(app)
47
bcrypt = Bcrypt(app)
48
migrate = Migrate(app, db)
49
app.config["SESSION_TYPE"] = "filesystem"
50
app.config["SECRET_KEY"] = "super secret"
51
52
with app.app_context():
53
class User(db.Model):
54
username = db.Column(db.String(64), unique=True, nullable=False, primary_key=True)
55
password = db.Column(db.String(72), nullable=False)
56
admin = db.Column(db.Boolean, nullable=False, default=False)
57
58
applications = db.relationship("Application", back_populates="owner")
59
60
def __init__(self, username, password, admin=False):
61
self.username = username
62
self.password = bcrypt.generate_password_hash(password).decode("utf-8")
63
self.admin = admin
64
65
class Application(db.Model):
66
id = db.Column(db.Integer, primary_key=True, autoincrement=True, unique=True, default=0)
67
name = db.Column(db.String(64), unique=True, nullable=False)
68
owner_name = db.Column(db.String(64), db.ForeignKey("user.username"), nullable=False)
69
70
owner = db.relationship("User", back_populates="applications")
71
72
endpoints = db.relationship("Endpoint", back_populates="application")
73
74
def __init__(self, name, owner):
75
self.name = name
76
self.owner_name = owner.username
77
78
class Endpoint(db.Model):
79
id = db.Column(db.Integer, unique=True, nullable=False, primary_key=True, autoincrement=True)
80
application_id = db.Column(db.Integer, db.ForeignKey("application.id"), nullable=False)
81
address = db.Column(db.String(2048), nullable=False)
82
name = db.Column(db.String(64), nullable=False)
83
comment = db.Column(db.String(2048), nullable=True)
84
ping_interval = db.Column(db.Integer, default=300, nullable=False)
85
buggy = db.Column(db.Boolean, default=False)
86
87
application = db.relationship("Application", back_populates="endpoints")
88
statuses = db.relationship("Status", back_populates="endpoint", lazy="dynamic")
89
90
def __init__(self, application, name, address, ping_interval, comment=""):
91
self.application_id = application.id
92
self.name = name
93
self.address = address
94
self.comment = comment
95
self.ping_interval = ping_interval
96
97
98
class Status(db.Model):
99
id = db.Column(db.Integer, nullable=False, autoincrement=True, primary_key=True)
100
endpoint_id = db.Column(db.Integer, db.ForeignKey("endpoint.id"), nullable=False)
101
time = db.Column(db.DateTime, default=datetime.datetime.utcnow)
102
103
status = db.Column(db.SmallInteger, nullable=False)
104
buggy = db.Column(db.Boolean, default=False)
105
endpoint = db.relationship("Endpoint", back_populates="statuses")
106
107
def __init__(self, endpoint_id, status, buggy):
108
self.endpoint_id = endpoint_id
109
self.status = status
110
self.buggy = buggy
111
112
113
@celery.shared_task(name="ping")
114
def ping(id, address, next_ping):
115
if not db.session.get(Endpoint, id):
116
return
117
elif db.session.get(Endpoint, id).buggy:
118
buggy = True
119
else:
120
buggy = False
121
url = address
122
print(f"Pinging {url}")
123
response = httpx.get(url, verify=False)
124
reading = Status(id, response.status_code, buggy)
125
db.session.add(reading)
126
db.session.commit()
127
128
# Schedule the next ping
129
ping.apply_async(args=(id, address, next_ping), countdown=next_ping)
130
131
@celery.shared_task(name="ping_all")
132
def ping_all():
133
endpoints = Endpoint.query.all()
134
for endpoint in endpoints:
135
ping.delay(endpoint.id, endpoint.address, endpoint.ping_interval)
136
137
138
task = ping_all.delay()
139
140
print()
141
print()
142
print(task)
143
print()
144
print()
145
146
147
@app.context_processor
148
def default():
149
return {
150
"session": flask.session,
151
}
152
153
154
@app.route("/")
155
def dashboard():
156
return flask.render_template("dashboard.html", apps=Application.query.all())
157
158
159
@app.route("/login", methods=["GET"])
160
def login():
161
return flask.render_template("login.html")
162
163
164
@app.route("/signup", methods=["GET"])
165
def signup():
166
return flask.render_template("signup.html")
167
168
169
@app.route("/new-app", methods=["GET"])
170
def new_app():
171
if not flask.session.get("username"):
172
return flask.redirect("/login", code=303)
173
return flask.render_template("new-app.html")
174
175
176
@app.route("/new-app", methods=["POST"])
177
def new_app_post():
178
if not flask.session.get("username"):
179
return flask.redirect("/login", code=303)
180
if Application.query.filter_by(name=flask.request.form["name"]).first():
181
flask.flash("Application already exists")
182
return flask.redirect("/new-app", code=303)
183
184
new_app_ = Application(
185
flask.request.form["name"],
186
db.session.get(User, flask.session["username"]),
187
)
188
db.session.add(new_app_)
189
db.session.commit()
190
return flask.redirect("/", code=303)
191
192
193
@app.route("/login", methods=["POST"])
194
def login_post():
195
user = db.session.get(User, flask.request.form["username"])
196
if not user:
197
flask.flash("Username doesn't exist")
198
return flask.redirect("/signup", code=303)
199
if not bcrypt.check_password_hash(user.password, flask.request.form["password"]):
200
flask.flash("Wrong password")
201
return flask.redirect("/signup", code=303)
202
203
flask.session["username"] = user.username
204
return flask.redirect("/", code=303)
205
206
207
@app.route("/logout")
208
def logout():
209
flask.session.pop("username", None)
210
return flask.redirect("/", code=303)
211
212
213
@app.route("/signup", methods=["POST"])
214
def signup_post():
215
if flask.request.form["password"] != flask.request.form["password2"]:
216
flask.flash("Passwords do not match")
217
return flask.redirect("/signup", code=303)
218
if db.session.get(User, flask.request.form["username"]):
219
flask.flash("Username already exists")
220
return flask.redirect("/signup", code=303)
221
if len(flask.request.form["password"]) < 8:
222
flask.flash("Password must be at least 8 characters")
223
return flask.redirect("/signup", code=303)
224
if len(flask.request.form["username"]) < 4:
225
flask.flash("Username must be at least 4 characters")
226
return flask.redirect("/signup", code=303)
227
228
new_user = User(
229
flask.request.form["username"],
230
flask.request.form["password"],
231
)
232
db.session.add(new_user)
233
db.session.commit()
234
flask.session["username"] = new_user.username
235
return flask.redirect("/", code=303)
236
237
238
# UTC filter
239
@app.template_filter("utc")
240
def utc_filter(timestamp):
241
return datetime.datetime.utcfromtimestamp(timestamp)
242
243
244
@app.route("/app/<int:app_id>/")
245
def app_info(app_id):
246
app_ = db.session.get(Application, app_id)
247
248
time_slices = [(datetime.datetime.utcnow() - datetime.timedelta(minutes=int(flask.request.args.get("bar_duration", 30)) * (i+1)),
249
datetime.datetime.utcnow() - datetime.timedelta(minutes=int(flask.request.args.get("bar_duration", 30)) * i))
250
for i in range(int(flask.request.args.get("time_period", 30)) // int(flask.request.args.get("bar_duration", 1)))]
251
252
slice_results = []
253
254
for slice_ in time_slices:
255
slice_results.append(db.session.query(Status).filter(
256
sqlalchemy.and_(Status.endpoint.has(application_id=app_id),
257
Status.time >= slice_[0],
258
Status.time < slice_[1])).all())
259
260
return flask.render_template("app.html", app=app_, sorted=sorted, list=list,
261
sorting=lambda x: x.time, reverse=True,
262
is_ok=lambda x: all(status.status in (200, 201, 202, 203, 204, 205, 206, 207, 208, 226, 302, 304, 307)
263
for status in x), and_=sqlalchemy.and_,
264
bar_duration=int(flask.request.args.get("bar_duration", 30)), int=int, Status=Status,
265
time_period=int(flask.request.args.get("time_period", 1440)),
266
now=round(datetime.datetime.utcnow().timestamp()), func=sqlalchemy.func,
267
reversed=reversed, fromtimestamp=datetime.datetime.utcfromtimestamp,
268
slices=slice_results)
269
270
271
@app.route("/app/<int:app_id>/edit/")
272
def app_editor(app_id):
273
if flask.session.get("username") != db.session.get(Application, app_id).owner_name:
274
flask.abort(403)
275
app_ = db.session.get(Application, app_id)
276
return flask.render_template("app-editor.html", app=app_)
277
278
279
@app.route("/app/<int:app_id>/edit/<int:endpoint_id>", methods=["POST"])
280
def endpoint_edit(app_id, endpoint_id):
281
if flask.session.get("username") != db.session.get(Application, app_id).owner_name:
282
flask.abort(403)
283
endpoint = db.session.get(Endpoint, endpoint_id)
284
if flask.request.form.get("delete") == "delete":
285
statuses = db.session.query(Status).filter_by(endpoint_id=endpoint_id).all()
286
for status in statuses:
287
db.session.delete(status)
288
db.session.delete(endpoint)
289
db.session.commit()
290
else:
291
endpoint.name = flask.request.form["name"]
292
endpoint.address = flask.request.form["url"]
293
endpoint.ping_interval = max(15, int(flask.request.form["ping_interval"]))
294
endpoint.comment = flask.request.form["comment"]
295
db.session.commit()
296
return flask.redirect(f"/app/{app_id}/edit", code=303)
297
298
299
@app.route("/app/<int:app_id>/add-endpoint", methods=["POST"])
300
def app_add_endpoint(app_id):
301
if flask.session.get("username") != db.session.get(Application, app_id).owner_name:
302
flask.abort(403)
303
app_ = db.session.get(Application, app_id)
304
endpoint = Endpoint(app_,
305
flask.request.form["name"],
306
flask.request.form["url"],
307
max(15, int(flask.request.form["ping_interval"])),
308
flask.request.form["comment"])
309
db.session.add(endpoint)
310
db.session.commit()
311
312
ping.delay(endpoint.id, endpoint.address, endpoint.ping_interval)
313
314
return flask.redirect(f"/app/{app_id}/edit", code=303)
315
316