By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 main.py

View raw Download
text/plain • 21.3 kiB
Python script, ASCII text executable
        
            
1
"""
2
Izvor - modular, GTK+ desktop search and launcher
3
Copyright (C) 2024 <root@roundabout-host.com>
4
5
This program is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with this program. If not, see <https://www.gnu.org/licenses/>.
17
"""
18
19
import os
20
import subprocess
21
22
import gi
23
import asyncio
24
import importlib
25
import gbulb
26
from pathlib import Path
27
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
28
29
from xdg.Config import icon_size
30
31
gi.require_version("Gtk", "3.0")
32
from gi.repository import Gtk, Gdk, Pango, GLib
33
import gettext
34
import json
35
import traceback
36
import sys
37
38
_ = gettext.gettext
39
40
_T = TypeVar("_T")
41
42
43
def get_parent_package(path):
44
parent_path = os.path.split(path)[0]
45
while parent_path != "/":
46
if "__init__.py" in os.listdir(parent_path):
47
return Path(parent_path)
48
parent_path = os.path.split(parent_path)[0]
49
return None
50
51
52
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
53
*args, **kwargs):
54
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
55
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
56
57
while tasks:
58
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
59
60
for task in done:
61
iterator = tasks.pop(task)
62
try:
63
result = task.result()
64
yield result
65
tasks[asyncio.create_task(iterator.__anext__())] = iterator
66
except StopAsyncIteration:
67
pass
68
69
70
class ResultInfoWidget(Gtk.ListBoxRow):
71
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True):
72
super().__init__()
73
self.name = name
74
self.description = description
75
self.image = image
76
self.execute = execute
77
78
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
79
self.add(self.box)
80
81
if icon_size:
82
# If the icon size is 0, have no icons
83
if image[0] == "logo":
84
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
85
self.image.set_pixel_size(icon_size)
86
self.box.pack_start(self.image, False, False, 0)
87
88
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
89
self.box.pack_start(self.label_box, True, True, 0)
90
91
self.name_label = Gtk.Label(label=self.name)
92
self.name_label.set_line_wrap(True)
93
self.name_label.set_justify(Gtk.Justification.LEFT)
94
self.name_label.set_halign(Gtk.Align.START)
95
attributes = Pango.AttrList()
96
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
97
self.name_label.set_attributes(attributes)
98
self.label_box.pack_start(self.name_label, True, True, 0)
99
100
if show_description:
101
self.description_label = Gtk.Label(label=self.description)
102
self.description_label.set_line_wrap(True)
103
self.description_label.set_justify(Gtk.Justification.LEFT)
104
self.description_label.set_halign(Gtk.Align.START)
105
self.label_box.pack_start(self.description_label, True, True, 0)
106
107
108
class ProviderExceptionDialog(Gtk.Dialog):
109
def __init__(self, provider: Path, exception: Exception, parent=None, tb=None):
110
super().__init__(title=_("{provider} raised a {exception}".format(provider=get_parent_package(provider), exception=type(exception).__name__)), transient_for=parent)
111
self.add_button(_("Open config"), 0)
112
self.add_button(_("Reset config"), 1)
113
self.add_button(_("Quit app"), Gtk.ResponseType.CLOSE)
114
self.set_default_response(Gtk.ResponseType.CLOSE)
115
self.set_default_size(360, 240)
116
self.set_resizable(True)
117
118
self.provider = provider
119
self.exception = exception
120
121
traceback_text = "\n".join(traceback.format_exception(type(exception), exception, tb))
122
123
self.label = Gtk.Label(label=traceback_text)
124
125
monospaced_font = Pango.FontDescription()
126
monospaced_font.set_family("monospace")
127
self.label.override_font(monospaced_font)
128
self.label.set_line_wrap(True)
129
self.label.set_justify(Gtk.Justification.LEFT)
130
self.label.set_halign(Gtk.Align.START)
131
self.label.set_selectable(True)
132
133
self.get_content_area().add(self.label)
134
self.label.show()
135
136
class Izvor(Gtk.Application):
137
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
138
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
139
USER_PROVIDERS = USER_DATA / "providers"
140
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
141
SYSTEM_CONFIGS = Path("/etc/izvor")
142
SYSTEM_DATA = Path("/usr/share/izvor")
143
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
144
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
145
ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers"
146
APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json"
147
148
def __init__(self):
149
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
150
gbulb.install(gtk=True)
151
self.loop = asyncio.get_event_loop()
152
self.builder = Gtk.Builder()
153
self.builder.add_from_file("izvor.ui")
154
self.window = self.builder.get_object("root")
155
self.window.connect("destroy", self.kill)
156
self.window.connect("key-press-event", self.check_escape)
157
self.window.set_title(_("Launcher"))
158
159
if not os.getenv("XDG_DATA_HOME"):
160
print("XDG_DATA_HOME is not set. Using default path.")
161
if not os.getenv("XDG_CONFIG_HOME"):
162
print("XDG_CONFIG_HOME is not set. Using default path.")
163
if not self.USER_PROVIDER_CONFIGS.exists():
164
print("Creating user config directory.")
165
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
166
if not self.USER_PROVIDERS.exists():
167
print("Creating user data directory.")
168
self.USER_PROVIDERS.mkdir(parents=True)
169
170
self.available_providers = {}
171
172
if self.SYSTEM_PROVIDERS.exists():
173
for provider in self.SYSTEM_PROVIDERS.iterdir():
174
if not (provider / "__init__.py").exists():
175
continue
176
self.available_providers[provider.stem] = provider
177
if self.USER_PROVIDERS.exists():
178
for provider in self.USER_PROVIDERS.iterdir():
179
if not (provider / "__init__.py").exists():
180
continue
181
self.available_providers[provider.stem] = provider
182
183
print(f"Available providers: {list(self.available_providers.keys())}")
184
185
for provider, path in self.available_providers.items():
186
if not (self.USER_PROVIDER_CONFIGS / f"{provider}.json").exists():
187
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "w") as f:
188
spec = importlib.util.spec_from_file_location(provider, path / "__init__.py")
189
module = importlib.util.module_from_spec(spec)
190
spec.loader.exec_module(module)
191
if hasattr(module, "config_template"):
192
json.dump(module.config_template, f)
193
else:
194
json.dump({}, f)
195
196
if not self.ENABLED_PROVIDERS_FILE.exists():
197
self.ENABLED_PROVIDERS_FILE.touch()
198
199
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
200
for provider in self.available_providers:
201
f.write(provider + "\n")
202
203
if not self.APPEARANCE_CONFIG_FILE.exists():
204
self.APPEARANCE_CONFIG_FILE.touch()
205
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
206
json.dump({"icon_size": 32, "show_result_descriptions": True}, f)
207
208
with open(self.APPEARANCE_CONFIG_FILE, "r") as f:
209
appearance_config = json.load(f)
210
self.icon_size = appearance_config.get("icon_size", 32)
211
self.show_result_descriptions = appearance_config.get("show_result_descriptions", True)
212
213
self.permanently_enabled_providers = {}
214
215
with open(self.ENABLED_PROVIDERS_FILE, "r") as f:
216
for line in f:
217
provider = line.strip()
218
if provider in self.available_providers:
219
self.permanently_enabled_providers[provider] = self.available_providers[provider]
220
221
self.enabled_providers = self.permanently_enabled_providers.copy()
222
223
providers_menu = self.builder.get_object("providers-menu")
224
225
self.providers = []
226
self.provider_checkboxes = {}
227
228
for provider in self.permanently_enabled_providers.values():
229
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
230
module = importlib.util.module_from_spec(spec)
231
spec.loader.exec_module(module)
232
with open(self.USER_PROVIDER_CONFIGS / f"{provider.stem}.json", "r") as f:
233
provider_config = json.load(f)
234
loaded_provider = module.Provider(provider_config)
235
self.providers.append(loaded_provider)
236
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
237
providers_menu.append(self.provider_checkboxes[provider.stem])
238
self.provider_checkboxes[provider.stem].set_active(True)
239
self.provider_checkboxes[provider.stem].show()
240
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
241
242
self.update_task = None
243
244
def call_update_results(widget):
245
if self.update_task is not None:
246
self.update_task.cancel()
247
self.update_task = asyncio.create_task(self.update_results(widget))
248
249
def execute_result(widget, row):
250
row.execute()
251
self.kill()
252
253
self.builder.connect_signals(
254
{
255
"update-search": call_update_results,
256
"result-activated": execute_result,
257
"providers-menu-all-toggled": self.update_enabled_providers_all,
258
"menu-about": self.about,
259
"menu-providers": self.provider_menu,
260
"menu-preferences": self.preferences,
261
}
262
)
263
264
GLib.idle_add(self.update_enabled_providers, None)
265
266
def update_enabled_providers(self, widget=None):
267
self.providers = []
268
for provider, checkbox in self.provider_checkboxes.items():
269
if checkbox.get_active():
270
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
271
spec = importlib.util.spec_from_file_location(provider, self.enabled_providers[provider] / "__init__.py")
272
module = importlib.util.module_from_spec(spec)
273
spec.loader.exec_module(module)
274
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "r") as f:
275
provider_config = json.load(f)
276
loaded_provider = module.Provider(provider_config)
277
self.providers.append(loaded_provider)
278
else:
279
self.enabled_providers.pop(provider, None)
280
281
if self.update_task is not None:
282
self.update_task.cancel()
283
self.update_task = asyncio.create_task(self.update_results(widget))
284
285
def update_enabled_providers_all(self, widget):
286
for checkbox in self.provider_checkboxes.values():
287
checkbox.set_active(widget.get_active())
288
self.update_enabled_providers()
289
290
def kill(self, widget=None):
291
self.loop.stop()
292
self.quit()
293
294
def check_escape(self, widget, event):
295
results_list = self.builder.get_object("results-list")
296
search_entry = self.builder.get_object("search-query")
297
rows = results_list.get_children()
298
current_row = results_list.get_selected_row()
299
300
if event.keyval == Gdk.KEY_Escape:
301
self.kill()
302
return True
303
304
if event.keyval == Gdk.KEY_Down:
305
# Move to the next row
306
if current_row:
307
current_index = rows.index(current_row)
308
next_index = (current_index + 1) % len(rows) # Wrap to the beginning if at the end
309
else:
310
next_index = 0
311
312
results_list.select_row(rows[next_index])
313
search_entry.grab_focus() # Refocus the search entry
314
self.scroll_to_row(rows[next_index], results_list.get_adjustment())
315
return True
316
317
if event.keyval == Gdk.KEY_Up:
318
# Move to the previous row
319
if current_row:
320
current_index = rows.index(current_row)
321
prev_index = (current_index - 1) % len(rows) # Wrap to the end if at the beginning
322
else:
323
prev_index = len(rows) - 1
324
325
results_list.select_row(rows[prev_index])
326
search_entry.grab_focus() # Refocus the search entry
327
self.scroll_to_row(rows[prev_index], results_list.get_adjustment())
328
return True
329
330
if event.keyval == Gdk.KEY_Return:
331
if current_row:
332
# Execute the selected row
333
current_row.activate()
334
return True
335
elif len(rows) > 0:
336
# Execute the first row
337
rows[0].activate()
338
return True
339
340
return False
341
342
def scroll_to_row(self, row, adjustment):
343
row_geometry = row.get_allocation()
344
row_y = row_geometry.y
345
row_height = row_geometry.height
346
adjustment_value = adjustment.get_value()
347
adjustment_upper = adjustment.get_upper() - adjustment.get_page_size()
348
349
if row_y + row_height > adjustment_value + adjustment.get_page_size():
350
adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper))
351
elif row_y < adjustment_value:
352
adjustment.set_value(max(row_y, 0))
353
354
def about(self, widget):
355
about_builder = Gtk.Builder()
356
about_builder.add_from_file("about.ui")
357
about_window = about_builder.get_object("about-dialog")
358
about_window.connect("destroy", lambda _: about_window.destroy())
359
about_window.connect("close", lambda _: about_window.destroy())
360
about_window.connect("response", lambda _, __: about_window.destroy())
361
about_window.show_all()
362
363
def provider_menu(self, widget):
364
providers_builder = Gtk.Builder()
365
providers_builder.add_from_file("providers.ui")
366
providers_window = providers_builder.get_object("providers-window")
367
providers_window.connect("destroy", lambda _: providers_window.destroy())
368
369
providers_table = providers_builder.get_object("providers-table") # actually Gtk.Grid
370
rows = 0
371
for provider in self.available_providers.values():
372
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
373
module = importlib.util.module_from_spec(spec)
374
spec.loader.exec_module(module)
375
loaded_provider = module.Provider({})
376
377
provider_label = Gtk.Label(label=loaded_provider.name)
378
provider_label.set_halign(Gtk.Align.START)
379
provider_label.set_valign(Gtk.Align.CENTER)
380
font_desc = Pango.FontDescription()
381
font_desc.set_weight(Pango.Weight.BOLD)
382
provider_label.override_font(font_desc)
383
provider_label.set_line_wrap(True)
384
provider_label.set_justify(Gtk.Justification.LEFT)
385
386
provider_description = Gtk.Label(label=loaded_provider.description)
387
provider_description.set_halign(Gtk.Align.START)
388
provider_description.set_valign(Gtk.Align.CENTER)
389
provider_description.set_justify(Gtk.Justification.LEFT)
390
provider_description.set_line_wrap(True)
391
392
icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR)
393
icon.set_pixel_size(self.icon_size)
394
395
switch = Gtk.Switch()
396
switch.set_valign(Gtk.Align.CENTER)
397
switch.set_active(provider.stem in self.permanently_enabled_providers)
398
switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem)
399
400
config_button = Gtk.Button.new_with_label(_("Configure"))
401
config_button.connect("clicked", lambda _, stem=provider.stem: subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{stem}.json")]))
402
config_button.set_relief(Gtk.ReliefStyle.NONE)
403
404
providers_table.attach(icon, 0, rows, 1, 1)
405
providers_table.attach(provider_label, 1, rows, 1, 1)
406
providers_table.attach(provider_description, 2, rows, 1, 1)
407
providers_table.attach(switch, 3, rows, 1, 1)
408
providers_table.attach(config_button, 4, rows, 1, 1)
409
410
provider_label.show()
411
provider_description.show()
412
413
rows += 1
414
415
providers_window.show_all()
416
417
def preferences(self, widget):
418
preferences_builder = Gtk.Builder()
419
preferences_builder.add_from_file("preferences.ui")
420
preferences_window = preferences_builder.get_object("preferences-window")
421
preferences_window.connect("destroy", lambda _: preferences_window.destroy())
422
423
def update_appearance_preferences(widget, *args):
424
self.icon_size = preferences_builder.get_object("icon-size").get_value()
425
self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active()
426
427
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
428
json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f)
429
430
# Regenerate the results to reflect the changes
431
if self.update_task is not None:
432
self.update_task.cancel()
433
self.update_task = asyncio.create_task(self.update_results(widget))
434
435
preferences_builder.connect_signals(
436
{
437
"update-appearance": update_appearance_preferences,
438
}
439
)
440
441
preferences_window.show_all()
442
443
def update_permanently_enabled_providers(self, widget, param, provider):
444
if widget.get_active():
445
self.permanently_enabled_providers[provider] = self.available_providers[provider]
446
else:
447
self.permanently_enabled_providers.pop(provider, None)
448
449
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
450
f.write("\n".join(self.permanently_enabled_providers.keys()))
451
452
453
async def update_results(self, widget):
454
spinner = self.builder.get_object("spinner")
455
spinner.start()
456
generators = []
457
query = self.builder.get_object("search-query-buffer").get_text()
458
for provider in self.providers:
459
generators.append(provider.search)
460
461
# Clear the results list
462
results_list = self.builder.get_object("results-list")
463
for row in results_list.get_children():
464
results_list.remove(row)
465
466
try:
467
async for result in merge_generators_with_params(generators, query):
468
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions)
469
results_list.add(result_box)
470
result_box.show_all()
471
except Exception as e:
472
exception_type, exception, tb = sys.exc_info()
473
filename, line_number, function_name, line = traceback.extract_tb(tb)[-1]
474
print(f"{filename} raised an exception!")
475
print(f"{type(e).__name__}: {str(e)}")
476
dialog = ProviderExceptionDialog(Path(filename), e, self.window, tb=tb)
477
dialog.show_all()
478
response = dialog.run()
479
if response == 0:
480
# Open config
481
subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
482
483
self.kill()
484
elif response == 1:
485
# Reset config
486
spec = importlib.util.spec_from_file_location(get_parent_package(filename).stem, Path(filename))
487
module = importlib.util.module_from_spec(spec)
488
spec.loader.exec_module(module)
489
490
if hasattr(module, "config_template"):
491
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
492
json.dump(module.config_template, f)
493
else:
494
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
495
json.dump({}, f)
496
497
self.kill()
498
elif response == Gtk.ResponseType.CLOSE:
499
self.kill()
500
501
results_list.show_all()
502
spinner.stop()
503
504
505
if __name__ == "__main__":
506
izvor = Izvor()
507
izvor.window.show_all()
508
try:
509
izvor.loop.run_forever()
510
finally:
511
izvor.loop.close()
512