By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 main.py

View raw Download
text/plain • 21.16 kiB
Python script, ASCII text executable
        
            
1
"""
2
Izvor - modular, GTK+ desktop search and launcher
3
Copyright (C) 2024 <root@roundabout-host.com>
4
5
This program is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with this program. If not, see <https://www.gnu.org/licenses/>.
17
"""
18
19
import os
20
import subprocess
21
22
import gi
23
import asyncio
24
import importlib
25
import gbulb
26
from pathlib import Path
27
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
28
29
from xdg.Config import icon_size
30
31
gi.require_version("Gtk", "3.0")
32
from gi.repository import Gtk, Gdk, Pango, GLib
33
import gettext
34
import json
35
import traceback
36
import sys
37
38
_ = gettext.gettext
39
40
_T = TypeVar("_T")
41
42
43
def get_parent_package(path):
44
parent_path = os.path.split(path)[0]
45
while parent_path != "/":
46
if "__init__.py" in os.listdir(parent_path):
47
return Path(parent_path)
48
parent_path = os.path.split(parent_path)[0]
49
return None
50
51
52
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
53
*args, **kwargs):
54
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
55
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
56
57
while tasks:
58
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
59
60
for task in done:
61
iterator = tasks.pop(task)
62
try:
63
result = task.result()
64
yield result
65
tasks[asyncio.create_task(iterator.__anext__())] = iterator
66
except StopAsyncIteration:
67
pass
68
69
70
class ResultInfoWidget(Gtk.ListBoxRow):
71
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True):
72
super().__init__()
73
self.name = name
74
self.description = description
75
self.image = image
76
self.execute = execute
77
78
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
79
self.add(self.box)
80
81
if icon_size:
82
# If the icon size is 0, have no icons
83
if image[0] == "logo":
84
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
85
self.image.set_pixel_size(icon_size)
86
self.box.pack_start(self.image, False, False, 0)
87
88
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
89
self.box.pack_start(self.label_box, True, True, 0)
90
91
self.name_label = Gtk.Label(label=self.name)
92
self.name_label.set_line_wrap(True)
93
self.name_label.set_justify(Gtk.Justification.LEFT)
94
self.name_label.set_halign(Gtk.Align.START)
95
attributes = Pango.AttrList()
96
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
97
self.name_label.set_attributes(attributes)
98
self.label_box.pack_start(self.name_label, True, True, 0)
99
100
if show_description:
101
self.description_label = Gtk.Label(label=self.description)
102
self.description_label.set_line_wrap(True)
103
self.description_label.set_justify(Gtk.Justification.LEFT)
104
self.description_label.set_halign(Gtk.Align.START)
105
self.label_box.pack_start(self.description_label, True, True, 0)
106
107
108
class ProviderExceptionDialog(Gtk.Dialog):
109
def __init__(self, provider: Path, exception: Exception, parent=None, tb=None):
110
super().__init__(title=_("{provider} raised a {exception}".format(provider=get_parent_package(provider), exception=type(exception).__name__)), transient_for=parent)
111
self.add_button(_("Open config"), 0)
112
self.add_button(_("Reset config"), 1)
113
self.add_button(_("Quit app"), Gtk.ResponseType.CLOSE)
114
self.set_default_response(Gtk.ResponseType.CLOSE)
115
self.set_default_size(360, 240)
116
self.set_resizable(True)
117
118
self.provider = provider
119
self.exception = exception
120
121
traceback_text = "\n".join(traceback.format_exception(type(exception), exception, tb))
122
123
self.label = Gtk.Label(label=traceback_text)
124
125
monospaced_font = Pango.FontDescription()
126
monospaced_font.set_family("monospace")
127
self.label.override_font(monospaced_font)
128
self.label.set_line_wrap(True)
129
self.label.set_justify(Gtk.Justification.LEFT)
130
self.label.set_halign(Gtk.Align.START)
131
self.label.set_selectable(True)
132
133
self.get_content_area().add(self.label)
134
self.label.show()
135
136
class Izvor(Gtk.Application):
137
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
138
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
139
USER_PROVIDERS = USER_DATA / "providers"
140
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
141
SYSTEM_CONFIGS = Path("/etc/izvor")
142
SYSTEM_DATA = Path("/usr/share/izvor")
143
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
144
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
145
ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers"
146
APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json"
147
148
def __init__(self):
149
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
150
gbulb.install(gtk=True)
151
self.loop = asyncio.get_event_loop()
152
self.builder = Gtk.Builder()
153
self.builder.add_from_file("izvor.ui")
154
self.window = self.builder.get_object("root")
155
self.window.connect("destroy", self.kill)
156
self.window.connect("key-press-event", self.check_escape)
157
self.window.set_title(_("Launcher"))
158
159
if not os.getenv("XDG_DATA_HOME"):
160
print("XDG_DATA_HOME is not set. Using default path.")
161
if not os.getenv("XDG_CONFIG_HOME"):
162
print("XDG_CONFIG_HOME is not set. Using default path.")
163
if not self.USER_PROVIDER_CONFIGS.exists():
164
print("Creating user config directory.")
165
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
166
if not self.USER_PROVIDERS.exists():
167
print("Creating user data directory.")
168
self.USER_PROVIDERS.mkdir(parents=True)
169
170
self.available_providers = {}
171
172
if self.SYSTEM_PROVIDERS.exists():
173
for provider in self.SYSTEM_PROVIDERS.iterdir():
174
if not (provider / "__init__.py").exists():
175
continue
176
self.available_providers[provider.stem] = provider
177
if self.USER_PROVIDERS.exists():
178
for provider in self.USER_PROVIDERS.iterdir():
179
if not (provider / "__init__.py").exists():
180
continue
181
self.available_providers[provider.stem] = provider
182
183
print(f"Available providers: {list(self.available_providers.keys())}")
184
185
for provider, path in self.available_providers.items():
186
if not (self.USER_PROVIDER_CONFIGS / f"{provider}.json").exists():
187
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "w") as f:
188
spec = importlib.util.spec_from_file_location(provider, path / "__init__.py")
189
module = importlib.util.module_from_spec(spec)
190
spec.loader.exec_module(module)
191
if hasattr(module, "config_template"):
192
json.dump(module.config_template, f)
193
else:
194
json.dump({}, f)
195
196
if not self.ENABLED_PROVIDERS_FILE.exists():
197
self.ENABLED_PROVIDERS_FILE.touch()
198
199
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
200
for provider in self.available_providers:
201
f.write(provider + "\n")
202
203
if not self.APPEARANCE_CONFIG_FILE.exists():
204
self.APPEARANCE_CONFIG_FILE.touch()
205
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
206
json.dump({"icon_size": 32, "show_result_descriptions": True}, f)
207
208
with open(self.APPEARANCE_CONFIG_FILE, "r") as f:
209
appearance_config = json.load(f)
210
self.icon_size = appearance_config.get("icon_size", 32)
211
self.show_result_descriptions = appearance_config.get("show_result_descriptions", True)
212
213
self.permanently_enabled_providers = {}
214
215
with open(self.ENABLED_PROVIDERS_FILE, "r") as f:
216
for line in f:
217
provider = line.strip()
218
if provider in self.available_providers:
219
self.permanently_enabled_providers[provider] = self.available_providers[provider]
220
221
self.enabled_providers = self.permanently_enabled_providers.copy()
222
223
providers_menu = self.builder.get_object("providers-menu")
224
225
self.providers = []
226
self.provider_checkboxes = {}
227
228
for provider in self.permanently_enabled_providers.values():
229
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
230
module = importlib.util.module_from_spec(spec)
231
spec.loader.exec_module(module)
232
with open(self.USER_PROVIDER_CONFIGS / f"{provider.stem}.json", "r") as f:
233
provider_config = json.load(f)
234
loaded_provider = module.Provider(provider_config)
235
self.providers.append(loaded_provider)
236
print(f"Loaded provider: {loaded_provider.name}")
237
print(loaded_provider.description)
238
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
239
providers_menu.append(self.provider_checkboxes[provider.stem])
240
self.provider_checkboxes[provider.stem].set_active(True)
241
self.provider_checkboxes[provider.stem].show()
242
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
243
244
self.update_task = None
245
246
def call_update_results(widget):
247
if self.update_task is not None:
248
self.update_task.cancel()
249
self.update_task = asyncio.create_task(self.update_results(widget))
250
251
def execute_result(widget, row):
252
row.execute()
253
self.kill()
254
255
self.builder.connect_signals(
256
{
257
"update-search": call_update_results,
258
"result-activated": execute_result,
259
"providers-menu-all-toggled": self.update_enabled_providers_all,
260
"menu-about": self.about,
261
"menu-providers": self.provider_menu,
262
"menu-preferences": self.preferences,
263
}
264
)
265
266
GLib.idle_add(self.update_enabled_providers, None)
267
268
def update_enabled_providers(self, widget=None):
269
self.providers = []
270
for provider, checkbox in self.provider_checkboxes.items():
271
if checkbox.get_active():
272
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
273
spec = importlib.util.spec_from_file_location(provider, self.enabled_providers[provider] / "__init__.py")
274
module = importlib.util.module_from_spec(spec)
275
spec.loader.exec_module(module)
276
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "r") as f:
277
provider_config = json.load(f)
278
loaded_provider = module.Provider(provider_config)
279
self.providers.append(loaded_provider)
280
else:
281
self.enabled_providers.pop(provider, None)
282
283
if self.update_task is not None:
284
self.update_task.cancel()
285
self.update_task = asyncio.create_task(self.update_results(widget))
286
287
def update_enabled_providers_all(self, widget):
288
for checkbox in self.provider_checkboxes.values():
289
checkbox.set_active(widget.get_active())
290
self.update_enabled_providers()
291
292
def kill(self, widget=None):
293
self.loop.stop()
294
self.quit()
295
296
def check_escape(self, widget, event):
297
results_list = self.builder.get_object("results-list")
298
search_entry = self.builder.get_object("search-query")
299
rows = results_list.get_children()
300
current_row = results_list.get_selected_row()
301
302
if event.keyval == Gdk.KEY_Escape:
303
self.kill()
304
return True
305
306
if event.keyval == Gdk.KEY_Down:
307
# Move to the next row
308
if current_row:
309
current_index = rows.index(current_row)
310
next_index = (current_index + 1) % len(rows) # Wrap to the beginning if at the end
311
else:
312
next_index = 0
313
314
results_list.select_row(rows[next_index])
315
search_entry.grab_focus() # Refocus the search entry
316
self.scroll_to_row(rows[next_index], results_list.get_adjustment())
317
return True
318
319
if event.keyval == Gdk.KEY_Up:
320
# Move to the previous row
321
if current_row:
322
current_index = rows.index(current_row)
323
prev_index = (current_index - 1) % len(rows) # Wrap to the end if at the beginning
324
else:
325
prev_index = len(rows) - 1
326
327
results_list.select_row(rows[prev_index])
328
search_entry.grab_focus() # Refocus the search entry
329
self.scroll_to_row(rows[prev_index], results_list.get_adjustment())
330
return True
331
332
if event.keyval == Gdk.KEY_Return:
333
if current_row:
334
# Execute the selected row
335
current_row.activate()
336
return True
337
elif len(rows) > 0:
338
# Execute the first row
339
rows[0].activate()
340
return True
341
342
return False
343
344
def scroll_to_row(self, row, adjustment):
345
row_geometry = row.get_allocation()
346
row_y = row_geometry.y
347
row_height = row_geometry.height
348
adjustment_value = adjustment.get_value()
349
adjustment_upper = adjustment.get_upper() - adjustment.get_page_size()
350
351
if row_y + row_height > adjustment_value + adjustment.get_page_size():
352
adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper))
353
elif row_y < adjustment_value:
354
adjustment.set_value(max(row_y, 0))
355
356
def about(self, widget):
357
about_builder = Gtk.Builder()
358
about_builder.add_from_file("about.ui")
359
about_window = about_builder.get_object("about-dialog")
360
about_window.connect("destroy", lambda _: about_window.destroy())
361
about_window.connect("close", lambda _: about_window.destroy())
362
about_window.connect("response", lambda _, __: about_window.destroy())
363
about_window.show_all()
364
365
def provider_menu(self, widget):
366
providers_builder = Gtk.Builder()
367
providers_builder.add_from_file("providers.ui")
368
providers_window = providers_builder.get_object("providers-window")
369
providers_window.connect("destroy", lambda _: providers_window.destroy())
370
371
providers_table = providers_builder.get_object("providers-table") # actually Gtk.Grid
372
rows = 0
373
for provider in self.available_providers.values():
374
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
375
module = importlib.util.module_from_spec(spec)
376
spec.loader.exec_module(module)
377
loaded_provider = module.Provider({})
378
379
provider_label = Gtk.Label(label=loaded_provider.name)
380
provider_label.set_halign(Gtk.Align.START)
381
provider_label.set_valign(Gtk.Align.CENTER)
382
font_desc = Pango.FontDescription()
383
font_desc.set_weight(Pango.Weight.BOLD)
384
provider_label.override_font(font_desc)
385
provider_label.set_line_wrap(True)
386
provider_label.set_justify(Gtk.Justification.LEFT)
387
388
provider_description = Gtk.Label(label=loaded_provider.description)
389
provider_description.set_halign(Gtk.Align.START)
390
provider_description.set_valign(Gtk.Align.CENTER)
391
provider_description.set_justify(Gtk.Justification.LEFT)
392
provider_description.set_line_wrap(True)
393
394
icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR)
395
icon.set_pixel_size(self.icon_size)
396
397
switch = Gtk.Switch()
398
switch.set_valign(Gtk.Align.CENTER)
399
switch.set_active(provider.stem in self.permanently_enabled_providers)
400
switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem)
401
402
providers_table.attach(icon, 0, rows, 1, 1)
403
providers_table.attach(provider_label, 1, rows, 1, 1)
404
providers_table.attach(provider_description, 2, rows, 1, 1)
405
providers_table.attach(switch, 3, rows, 1, 1)
406
407
provider_label.show()
408
provider_description.show()
409
410
rows += 1
411
412
providers_window.show_all()
413
414
def preferences(self, widget):
415
preferences_builder = Gtk.Builder()
416
preferences_builder.add_from_file("preferences.ui")
417
preferences_window = preferences_builder.get_object("preferences-window")
418
preferences_window.connect("destroy", lambda _: preferences_window.destroy())
419
420
def update_appearance_preferences(widget, *args):
421
self.icon_size = preferences_builder.get_object("icon-size").get_value()
422
self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active()
423
424
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
425
json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f)
426
427
# Regenerate the results to reflect the changes
428
if self.update_task is not None:
429
self.update_task.cancel()
430
self.update_task = asyncio.create_task(self.update_results(widget))
431
432
preferences_builder.connect_signals(
433
{
434
"update-appearance": update_appearance_preferences,
435
}
436
)
437
438
preferences_window.show_all()
439
440
def update_permanently_enabled_providers(self, widget, param, provider):
441
if widget.get_active():
442
self.permanently_enabled_providers[provider] = self.available_providers[provider]
443
else:
444
self.permanently_enabled_providers.pop(provider, None)
445
446
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
447
f.write("\n".join(self.permanently_enabled_providers.keys()))
448
449
450
async def update_results(self, widget):
451
print("Updating results...")
452
spinner = self.builder.get_object("spinner")
453
spinner.start()
454
generators = []
455
query = self.builder.get_object("search-query-buffer").get_text()
456
# print(f"Query: {query}")
457
for provider in self.providers:
458
generators.append(provider.search)
459
460
# Clear the results list
461
results_list = self.builder.get_object("results-list")
462
for row in results_list.get_children():
463
results_list.remove(row)
464
465
try:
466
async for result in merge_generators_with_params(generators, query):
467
# print(result)
468
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions)
469
results_list.add(result_box)
470
result_box.show_all()
471
except Exception as e:
472
exception_type, exception, tb = sys.exc_info()
473
filename, line_number, function_name, line = traceback.extract_tb(tb)[-1]
474
print(f"{filename} raised an exception!")
475
print(f"{type(e).__name__}: {str(e)}")
476
dialog = ProviderExceptionDialog(Path(filename), e, self.window, tb=tb)
477
dialog.show_all()
478
response = dialog.run()
479
if response == 0:
480
# Open config
481
subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
482
483
self.kill()
484
elif response == 1:
485
# Reset config
486
spec = importlib.util.spec_from_file_location(get_parent_package(filename).stem, Path(filename))
487
module = importlib.util.module_from_spec(spec)
488
spec.loader.exec_module(module)
489
490
if hasattr(module, "config_template"):
491
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
492
json.dump(module.config_template, f)
493
else:
494
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
495
json.dump({}, f)
496
497
self.kill()
498
elif response == Gtk.ResponseType.CLOSE:
499
self.kill()
500
501
results_list.show_all()
502
spinner.stop()
503
504
505
if __name__ == "__main__":
506
izvor = Izvor()
507
izvor.window.show_all()
508
try:
509
izvor.loop.run_forever()
510
finally:
511
izvor.loop.close()
512