By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 __init__.py

View raw Download
text/x-script.python • 21.49 kiB
Python script, ASCII text executable
        
            
1
#!/usr/bin/env python3
2
3
"""
4
Izvor - modular, GTK+ desktop search and launcher
5
Copyright (C) 2024 <root@roundabout-host.com>
6
7
This program is free software: you can redistribute it and/or modify
8
it under the terms of the GNU General Public License as published by
9
the Free Software Foundation, either version 3 of the License, or
10
(at your option) any later version.
11
12
This program is distributed in the hope that it will be useful,
13
but WITHOUT ANY WARRANTY; without even the implied warranty of
14
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
GNU General Public License for more details.
16
17
You should have received a copy of the GNU General Public License
18
along with this program. If not, see <https://www.gnu.org/licenses/>.
19
"""
20
21
import os
22
import subprocess
23
24
import gi
25
from gi.events import GLibEventLoopPolicy
26
import asyncio
27
import importlib
28
from pathlib import Path
29
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
30
31
gi.require_version("Gtk", "3.0")
32
from gi.repository import Gtk, Gdk, Pango, GLib
33
import gettext
34
import json
35
import traceback
36
import sys
37
38
_ = gettext.gettext
39
40
_T = TypeVar("_T")
41
42
PACKAGE_DIRECTORY = Path(__file__).resolve().parent
43
44
def get_parent_package(path):
45
parent_path = os.path.split(path)[0]
46
while parent_path != "/":
47
if "__init__.py" in os.listdir(parent_path):
48
return Path(parent_path)
49
parent_path = os.path.split(parent_path)[0]
50
return None
51
52
53
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
54
*args, **kwargs):
55
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
56
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
57
58
while tasks:
59
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
60
61
for task in done:
62
iterator = tasks.pop(task)
63
try:
64
result = task.result()
65
yield result
66
tasks[asyncio.create_task(iterator.__anext__())] = iterator
67
except StopAsyncIteration:
68
pass
69
70
71
class ResultInfoWidget(Gtk.ListBoxRow):
72
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True):
73
super().__init__()
74
self.name = name
75
self.description = description
76
self.image = image
77
self.execute = execute
78
79
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
80
self.add(self.box)
81
82
if icon_size:
83
# If the icon size is 0, have no icons
84
if image[0] == "logo":
85
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
86
self.image.set_pixel_size(icon_size)
87
self.box.pack_start(self.image, False, False, 0)
88
89
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
90
self.box.pack_start(self.label_box, True, True, 0)
91
92
self.name_label = Gtk.Label(label=self.name)
93
self.name_label.set_line_wrap(True)
94
self.name_label.set_justify(Gtk.Justification.LEFT)
95
self.name_label.set_halign(Gtk.Align.START)
96
attributes = Pango.AttrList()
97
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
98
self.name_label.set_attributes(attributes)
99
self.label_box.pack_start(self.name_label, True, True, 0)
100
101
if show_description:
102
self.description_label = Gtk.Label(label=self.description)
103
self.description_label.set_line_wrap(True)
104
self.description_label.set_justify(Gtk.Justification.LEFT)
105
self.description_label.set_halign(Gtk.Align.START)
106
self.label_box.pack_start(self.description_label, True, True, 0)
107
108
109
class ProviderExceptionDialog(Gtk.Dialog):
110
def __init__(self, provider: Path, exception: Exception, parent=None, tb=None):
111
super().__init__(title=_("{provider} raised a {exception}".format(provider=get_parent_package(provider), exception=type(exception).__name__)), transient_for=parent)
112
self.add_button(_("Open config"), 0)
113
self.add_button(_("Reset config"), 1)
114
self.add_button(_("Quit app"), Gtk.ResponseType.CLOSE)
115
self.set_default_response(Gtk.ResponseType.CLOSE)
116
self.set_default_size(360, 240)
117
self.set_resizable(True)
118
119
self.provider = provider
120
self.exception = exception
121
122
traceback_text = "\n".join(traceback.format_exception(type(exception), exception, tb))
123
124
self.label = Gtk.Label(label=traceback_text)
125
126
monospaced_font = Pango.FontDescription()
127
monospaced_font.set_family("monospace")
128
self.label.override_font(monospaced_font)
129
self.label.set_line_wrap(True)
130
self.label.set_justify(Gtk.Justification.LEFT)
131
self.label.set_halign(Gtk.Align.START)
132
self.label.set_selectable(True)
133
134
self.get_content_area().add(self.label)
135
self.label.show()
136
137
class Izvor(Gtk.Application):
138
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
139
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
140
USER_PROVIDERS = USER_DATA / "providers"
141
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
142
SYSTEM_CONFIGS = Path("/etc/izvor")
143
SYSTEM_DATA = Path("/usr/share/izvor")
144
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
145
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
146
ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers"
147
APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json"
148
149
def __init__(self):
150
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
151
asyncio.set_event_loop_policy(GLibEventLoopPolicy())
152
self.loop = asyncio.get_event_loop()
153
self.builder = Gtk.Builder()
154
self.builder.add_from_file(str(PACKAGE_DIRECTORY / "izvor.ui"))
155
self.window = self.builder.get_object("root")
156
self.window.connect("destroy", self.kill)
157
self.window.connect("key-press-event", self.check_escape)
158
self.window.set_title(_("Launcher"))
159
160
if not os.getenv("XDG_DATA_HOME"):
161
print("XDG_DATA_HOME is not set. Using default path.")
162
if not os.getenv("XDG_CONFIG_HOME"):
163
print("XDG_CONFIG_HOME is not set. Using default path.")
164
if not self.USER_PROVIDER_CONFIGS.exists():
165
print("Creating user config directory.")
166
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
167
if not self.USER_PROVIDERS.exists():
168
print("Creating user data directory.")
169
self.USER_PROVIDERS.mkdir(parents=True)
170
171
self.available_providers = {}
172
173
if self.SYSTEM_PROVIDERS.exists():
174
for provider in self.SYSTEM_PROVIDERS.iterdir():
175
if not (provider / "__init__.py").exists():
176
continue
177
self.available_providers[provider.stem] = provider
178
if self.USER_PROVIDERS.exists():
179
for provider in self.USER_PROVIDERS.iterdir():
180
if not (provider / "__init__.py").exists():
181
continue
182
self.available_providers[provider.stem] = provider
183
184
print(f"Available providers: {list(self.available_providers.keys())}")
185
186
for provider, path in self.available_providers.items():
187
if not (self.USER_PROVIDER_CONFIGS / f"{provider}.json").exists():
188
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "w") as f:
189
spec = importlib.util.spec_from_file_location(provider, path / "__init__.py")
190
module = importlib.util.module_from_spec(spec)
191
spec.loader.exec_module(module)
192
if hasattr(module, "config_template"):
193
json.dump(module.config_template, f)
194
else:
195
json.dump({}, f)
196
197
if not self.ENABLED_PROVIDERS_FILE.exists():
198
self.ENABLED_PROVIDERS_FILE.touch()
199
200
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
201
for provider in self.available_providers:
202
f.write(provider + "\n")
203
204
if not self.APPEARANCE_CONFIG_FILE.exists():
205
self.APPEARANCE_CONFIG_FILE.touch()
206
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
207
json.dump({"icon_size": 32, "show_result_descriptions": True}, f)
208
209
with open(self.APPEARANCE_CONFIG_FILE, "r") as f:
210
appearance_config = json.load(f)
211
self.icon_size = appearance_config.get("icon_size", 32)
212
self.show_result_descriptions = appearance_config.get("show_result_descriptions", True)
213
214
self.permanently_enabled_providers = {}
215
216
with open(self.ENABLED_PROVIDERS_FILE, "r") as f:
217
for line in f:
218
provider = line.strip()
219
if provider in self.available_providers:
220
self.permanently_enabled_providers[provider] = self.available_providers[provider]
221
222
self.enabled_providers = self.permanently_enabled_providers.copy()
223
224
providers_menu = self.builder.get_object("providers-menu")
225
226
self.providers = []
227
self.provider_checkboxes = {}
228
229
for provider in self.permanently_enabled_providers.values():
230
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
231
module = importlib.util.module_from_spec(spec)
232
spec.loader.exec_module(module)
233
with open(self.USER_PROVIDER_CONFIGS / f"{provider.stem}.json", "r") as f:
234
provider_config = json.load(f)
235
loaded_provider = module.Provider(provider_config)
236
self.providers.append(loaded_provider)
237
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
238
providers_menu.append(self.provider_checkboxes[provider.stem])
239
self.provider_checkboxes[provider.stem].set_active(True)
240
self.provider_checkboxes[provider.stem].show()
241
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
242
243
self.update_task = None
244
245
def call_update_results(widget):
246
if self.update_task is not None:
247
self.update_task.cancel()
248
self.update_task = asyncio.create_task(self.update_results(widget))
249
250
def execute_result(widget, row):
251
row.execute()
252
self.kill()
253
254
self.builder.connect_signals(
255
{
256
"update-search": call_update_results,
257
"result-activated": execute_result,
258
"providers-menu-all-toggled": self.update_enabled_providers_all,
259
"menu-about": self.about,
260
"menu-providers": self.provider_menu,
261
"menu-preferences": self.preferences,
262
}
263
)
264
265
GLib.idle_add(self.update_enabled_providers, None)
266
267
def update_enabled_providers(self, widget=None):
268
self.providers = []
269
for provider, checkbox in self.provider_checkboxes.items():
270
if checkbox.get_active():
271
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
272
spec = importlib.util.spec_from_file_location(provider, self.enabled_providers[provider] / "__init__.py")
273
module = importlib.util.module_from_spec(spec)
274
spec.loader.exec_module(module)
275
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "r") as f:
276
provider_config = json.load(f)
277
loaded_provider = module.Provider(provider_config)
278
self.providers.append(loaded_provider)
279
else:
280
self.enabled_providers.pop(provider, None)
281
282
if self.update_task is not None:
283
self.update_task.cancel()
284
self.update_task = asyncio.create_task(self.update_results(widget))
285
286
def update_enabled_providers_all(self, widget):
287
for checkbox in self.provider_checkboxes.values():
288
checkbox.set_active(widget.get_active())
289
self.update_enabled_providers()
290
291
def kill(self, widget=None):
292
self.loop.stop()
293
self.quit()
294
295
def check_escape(self, widget, event):
296
results_list = self.builder.get_object("results-list")
297
search_entry = self.builder.get_object("search-query")
298
rows = results_list.get_children()
299
current_row = results_list.get_selected_row()
300
301
if event.keyval == Gdk.KEY_Escape:
302
self.kill()
303
return True
304
305
if event.keyval == Gdk.KEY_Down:
306
# Move to the next row
307
if current_row:
308
current_index = rows.index(current_row)
309
next_index = (current_index + 1) % len(rows) # Wrap to the beginning if at the end
310
else:
311
next_index = 0
312
313
results_list.select_row(rows[next_index])
314
search_entry.grab_focus() # Refocus the search entry
315
self.scroll_to_row(rows[next_index], results_list.get_adjustment())
316
return True
317
318
if event.keyval == Gdk.KEY_Up:
319
# Move to the previous row
320
if current_row:
321
current_index = rows.index(current_row)
322
prev_index = (current_index - 1) % len(rows) # Wrap to the end if at the beginning
323
else:
324
prev_index = len(rows) - 1
325
326
results_list.select_row(rows[prev_index])
327
search_entry.grab_focus() # Refocus the search entry
328
self.scroll_to_row(rows[prev_index], results_list.get_adjustment())
329
return True
330
331
if event.keyval == Gdk.KEY_Return:
332
if current_row:
333
# Execute the selected row
334
current_row.activate()
335
return True
336
elif len(rows) > 0:
337
# Execute the first row
338
rows[0].activate()
339
return True
340
341
return False
342
343
def scroll_to_row(self, row, adjustment):
344
row_geometry = row.get_allocation()
345
row_y = row_geometry.y
346
row_height = row_geometry.height
347
adjustment_value = adjustment.get_value()
348
adjustment_upper = adjustment.get_upper() - adjustment.get_page_size()
349
350
if row_y + row_height > adjustment_value + adjustment.get_page_size():
351
adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper))
352
elif row_y < adjustment_value:
353
adjustment.set_value(max(row_y, 0))
354
355
def about(self, widget):
356
about_builder = Gtk.Builder()
357
about_builder.add_from_file(str(PACKAGE_DIRECTORY / "about.ui"))
358
about_window = about_builder.get_object("about-dialog")
359
about_window.connect("destroy", lambda _: about_window.destroy())
360
about_window.connect("close", lambda _: about_window.destroy())
361
about_window.connect("response", lambda _, __: about_window.destroy())
362
about_window.show_all()
363
364
def provider_menu(self, widget):
365
providers_builder = Gtk.Builder()
366
providers_builder.add_from_file(str(PACKAGE_DIRECTORY / "providers.ui"))
367
providers_window = providers_builder.get_object("providers-window")
368
providers_window.connect("destroy", lambda _: providers_window.destroy())
369
370
providers_table = providers_builder.get_object("providers-table") # actually Gtk.Grid
371
rows = 0
372
for provider in self.available_providers.values():
373
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
374
module = importlib.util.module_from_spec(spec)
375
spec.loader.exec_module(module)
376
loaded_provider = module.Provider({})
377
378
provider_label = Gtk.Label(label=loaded_provider.name)
379
provider_label.set_halign(Gtk.Align.START)
380
provider_label.set_valign(Gtk.Align.CENTER)
381
font_desc = Pango.FontDescription()
382
font_desc.set_weight(Pango.Weight.BOLD)
383
provider_label.override_font(font_desc)
384
provider_label.set_line_wrap(True)
385
provider_label.set_justify(Gtk.Justification.LEFT)
386
387
provider_description = Gtk.Label(label=loaded_provider.description)
388
provider_description.set_halign(Gtk.Align.START)
389
provider_description.set_valign(Gtk.Align.CENTER)
390
provider_description.set_justify(Gtk.Justification.LEFT)
391
provider_description.set_line_wrap(True)
392
393
icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR)
394
icon.set_pixel_size(self.icon_size)
395
396
switch = Gtk.Switch()
397
switch.set_valign(Gtk.Align.CENTER)
398
switch.set_active(provider.stem in self.permanently_enabled_providers)
399
switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem)
400
401
config_button = Gtk.Button.new_with_label(_("Configure"))
402
config_button.connect("clicked", lambda _, stem=provider.stem: subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{stem}.json")]))
403
config_button.set_relief(Gtk.ReliefStyle.NONE)
404
405
providers_table.attach(icon, 0, rows, 1, 1)
406
providers_table.attach(provider_label, 1, rows, 1, 1)
407
providers_table.attach(provider_description, 2, rows, 1, 1)
408
providers_table.attach(switch, 3, rows, 1, 1)
409
providers_table.attach(config_button, 4, rows, 1, 1)
410
411
provider_label.show()
412
provider_description.show()
413
414
rows += 1
415
416
providers_window.show_all()
417
418
def preferences(self, widget):
419
preferences_builder = Gtk.Builder()
420
preferences_builder.add_from_file(str(PACKAGE_DIRECTORY / "preferences.ui"))
421
preferences_window = preferences_builder.get_object("preferences-window")
422
preferences_window.connect("destroy", lambda _: preferences_window.destroy())
423
424
def update_appearance_preferences(widget, *args):
425
self.icon_size = preferences_builder.get_object("icon-size").get_value()
426
self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active()
427
428
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
429
json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f)
430
431
# Regenerate the results to reflect the changes
432
if self.update_task is not None:
433
self.update_task.cancel()
434
self.update_task = asyncio.create_task(self.update_results(widget))
435
436
preferences_builder.connect_signals(
437
{
438
"update-appearance": update_appearance_preferences,
439
}
440
)
441
442
preferences_window.show_all()
443
444
def update_permanently_enabled_providers(self, widget, param, provider):
445
if widget.get_active():
446
self.permanently_enabled_providers[provider] = self.available_providers[provider]
447
else:
448
self.permanently_enabled_providers.pop(provider, None)
449
450
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
451
f.write("\n".join(self.permanently_enabled_providers.keys()))
452
453
454
async def update_results(self, widget):
455
spinner = self.builder.get_object("spinner")
456
spinner.start()
457
generators = []
458
query = self.builder.get_object("search-query-buffer").get_text()
459
for provider in self.providers:
460
generators.append(provider.search)
461
462
# Clear the results list
463
results_list = self.builder.get_object("results-list")
464
for row in results_list.get_children():
465
results_list.remove(row)
466
467
try:
468
async for result in merge_generators_with_params(generators, query):
469
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions)
470
results_list.add(result_box)
471
result_box.show_all()
472
except Exception as e:
473
exception_type, exception, tb = sys.exc_info()
474
filename, line_number, function_name, line = traceback.extract_tb(tb)[-1]
475
print(f"{filename} raised an exception!")
476
print(f"{type(e).__name__}: {str(e)}")
477
dialog = ProviderExceptionDialog(Path(filename), e, self.window, tb=tb)
478
dialog.show_all()
479
response = dialog.run()
480
if response == 0:
481
# Open config
482
subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
483
484
self.kill()
485
elif response == 1:
486
# Reset config
487
spec = importlib.util.spec_from_file_location(get_parent_package(filename).stem, Path(filename))
488
module = importlib.util.module_from_spec(spec)
489
spec.loader.exec_module(module)
490
491
if hasattr(module, "config_template"):
492
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
493
json.dump(module.config_template, f)
494
else:
495
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
496
json.dump({}, f)
497
498
self.kill()
499
elif response == Gtk.ResponseType.CLOSE:
500
self.kill()
501
502
results_list.show_all()
503
spinner.stop()
504
505
506
if __name__ == "__main__":
507
izvor = Izvor()
508
izvor.window.show_all()
509
try:
510
izvor.loop.run_forever()
511
finally:
512
izvor.loop.close()
513