By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 __init__.py

View raw Download
text/x-script.python • 21.75 kiB
Python script, ASCII text executable
        
            
1
#!/usr/bin/env python3
2
3
"""
4
Izvor - modular, GTK+ desktop search and launcher
5
Copyright (C) 2024 <root@roundabout-host.com>
6
7
This program is free software: you can redistribute it and/or modify
8
it under the terms of the GNU General Public License as published by
9
the Free Software Foundation, either version 3 of the License, or
10
(at your option) any later version.
11
12
This program is distributed in the hope that it will be useful,
13
but WITHOUT ANY WARRANTY; without even the implied warranty of
14
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
GNU General Public License for more details.
16
17
You should have received a copy of the GNU General Public License
18
along with this program. If not, see <https://www.gnu.org/licenses/>.
19
"""
20
21
import os
22
import subprocess
23
24
import gi
25
from gi.events import GLibEventLoopPolicy
26
import asyncio
27
import importlib
28
from pathlib import Path
29
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
30
31
gi.require_version("Gtk", "3.0")
32
from gi.repository import Gtk, Gdk, Pango, GLib
33
import gettext
34
import json
35
import traceback
36
import sys
37
38
_ = gettext.gettext
39
40
_T = TypeVar("_T")
41
42
PACKAGE_DIRECTORY = Path(__file__).resolve().parent
43
44
def get_parent_package(path):
45
parent_path = os.path.split(path)[0]
46
while parent_path != "/":
47
if "__init__.py" in os.listdir(parent_path):
48
return Path(parent_path)
49
parent_path = os.path.split(parent_path)[0]
50
return None
51
52
53
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
54
*args, **kwargs):
55
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
56
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
57
58
while tasks:
59
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
60
61
for task in done:
62
iterator = tasks.pop(task)
63
try:
64
result = task.result()
65
yield result
66
tasks[asyncio.create_task(iterator.__anext__())] = iterator
67
except StopAsyncIteration:
68
pass
69
70
71
class ResultInfoWidget(Gtk.ListBoxRow):
72
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True):
73
super().__init__()
74
self.name = name
75
self.description = description
76
self.image = image
77
self.execute = execute
78
79
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
80
self.add(self.box)
81
82
if icon_size:
83
# If the icon size is 0, have no icons
84
if image[0] == "logo":
85
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
86
self.image.set_pixel_size(icon_size)
87
self.box.pack_start(self.image, False, False, 0)
88
89
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
90
self.box.pack_start(self.label_box, True, True, 0)
91
92
self.name_label = Gtk.Label(label=self.name)
93
self.name_label.set_line_wrap(True)
94
self.name_label.set_justify(Gtk.Justification.LEFT)
95
self.name_label.set_halign(Gtk.Align.START)
96
attributes = Pango.AttrList()
97
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
98
self.name_label.set_attributes(attributes)
99
self.label_box.pack_start(self.name_label, True, True, 0)
100
101
if show_description:
102
self.description_label = Gtk.Label(label=self.description)
103
self.description_label.set_line_wrap(True)
104
self.description_label.set_justify(Gtk.Justification.LEFT)
105
self.description_label.set_halign(Gtk.Align.START)
106
self.label_box.pack_start(self.description_label, True, True, 0)
107
108
109
class ProviderExceptionDialog(Gtk.Dialog):
110
def __init__(self, provider: Path, exception: Exception, parent=None, tb=None):
111
super().__init__(title=_("{provider} raised a {exception}".format(provider=get_parent_package(provider), exception=type(exception).__name__)), transient_for=parent)
112
self.add_button(_("Open config"), 0)
113
self.add_button(_("Reset config"), 1)
114
self.add_button(_("Quit app"), Gtk.ResponseType.CLOSE)
115
self.set_default_response(Gtk.ResponseType.CLOSE)
116
self.set_default_size(360, 240)
117
self.set_resizable(True)
118
119
self.provider = provider
120
self.exception = exception
121
122
traceback_text = "\n".join(traceback.format_exception(type(exception), exception, tb))
123
124
self.label = Gtk.Label(label=traceback_text)
125
126
monospaced_font = Pango.FontDescription()
127
monospaced_font.set_family("monospace")
128
self.label.override_font(monospaced_font)
129
self.label.set_line_wrap(True)
130
self.label.set_justify(Gtk.Justification.LEFT)
131
self.label.set_halign(Gtk.Align.START)
132
self.label.set_selectable(True)
133
134
self.get_content_area().add(self.label)
135
self.label.show()
136
137
class Izvor(Gtk.Application):
138
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
139
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
140
USER_PROVIDERS = USER_DATA / "providers"
141
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
142
SYSTEM_CONFIGS = Path("/etc/izvor")
143
SYSTEM_DATA = Path("/usr/share/izvor")
144
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
145
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
146
ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers"
147
APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json"
148
149
def __init__(self):
150
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
151
asyncio.set_event_loop_policy(GLibEventLoopPolicy())
152
self.loop = asyncio.get_event_loop()
153
self.builder = Gtk.Builder()
154
self.builder.add_from_file(str(PACKAGE_DIRECTORY / "izvor.ui"))
155
self.window = self.builder.get_object("root")
156
self.window.connect("destroy", self.kill)
157
self.window.connect("key-press-event", self.check_escape)
158
self.window.set_title(_("Launcher"))
159
160
if not os.getenv("XDG_DATA_HOME"):
161
print("XDG_DATA_HOME is not set. Using default path.")
162
if not os.getenv("XDG_CONFIG_HOME"):
163
print("XDG_CONFIG_HOME is not set. Using default path.")
164
if not self.USER_PROVIDER_CONFIGS.exists():
165
print("Creating user config directory.")
166
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
167
if not self.USER_PROVIDERS.exists():
168
print("Creating user data directory.")
169
self.USER_PROVIDERS.mkdir(parents=True)
170
171
self.available_providers = {}
172
173
if self.SYSTEM_PROVIDERS.exists():
174
for provider in self.SYSTEM_PROVIDERS.iterdir():
175
if not (provider / "__init__.py").exists():
176
continue
177
self.available_providers[provider.stem] = provider
178
if self.USER_PROVIDERS.exists():
179
for provider in self.USER_PROVIDERS.iterdir():
180
if not (provider / "__init__.py").exists():
181
continue
182
self.available_providers[provider.stem] = provider
183
184
print(f"Available providers: {list(self.available_providers.keys())}")
185
186
for provider, path in self.available_providers.items():
187
if not (self.USER_PROVIDER_CONFIGS / f"{provider}.json").exists():
188
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "w") as f:
189
spec = importlib.util.spec_from_file_location(provider, path / "__init__.py")
190
module = importlib.util.module_from_spec(spec)
191
spec.loader.exec_module(module)
192
if hasattr(module, "config_template"):
193
json.dump(module.config_template, f)
194
else:
195
json.dump({}, f)
196
197
if not self.ENABLED_PROVIDERS_FILE.exists():
198
self.ENABLED_PROVIDERS_FILE.touch()
199
200
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
201
for provider in self.available_providers:
202
f.write(provider + "\n")
203
204
if not self.APPEARANCE_CONFIG_FILE.exists():
205
self.APPEARANCE_CONFIG_FILE.touch()
206
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
207
json.dump({"icon_size": 32, "show_result_descriptions": True}, f)
208
209
with open(self.APPEARANCE_CONFIG_FILE, "r") as f:
210
appearance_config = json.load(f)
211
self.icon_size = appearance_config.get("icon_size", 32)
212
self.show_result_descriptions = appearance_config.get("show_result_descriptions", True)
213
214
self.permanently_enabled_providers = {}
215
216
with open(self.ENABLED_PROVIDERS_FILE, "r") as f:
217
for line in f:
218
provider = line.strip()
219
if provider in self.available_providers:
220
self.permanently_enabled_providers[provider] = self.available_providers[provider]
221
222
self.enabled_providers = self.permanently_enabled_providers.copy()
223
224
providers_menu = self.builder.get_object("providers-menu")
225
226
self.providers = []
227
self.provider_checkboxes = {}
228
229
for provider in self.permanently_enabled_providers.values():
230
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
231
module = importlib.util.module_from_spec(spec)
232
spec.loader.exec_module(module)
233
with open(self.USER_PROVIDER_CONFIGS / f"{provider.stem}.json", "r") as f:
234
provider_config = json.load(f)
235
loaded_provider = module.Provider(provider_config)
236
self.providers.append(loaded_provider)
237
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
238
providers_menu.append(self.provider_checkboxes[provider.stem])
239
self.provider_checkboxes[provider.stem].set_active(True)
240
self.provider_checkboxes[provider.stem].show()
241
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
242
243
self.update_task = None
244
245
def call_update_results(widget):
246
if self.update_task is not None:
247
self.update_task.cancel()
248
self.update_task = asyncio.create_task(self.update_results(widget))
249
250
def execute_result(widget, row):
251
row.execute()
252
self.kill()
253
254
self.builder.connect_signals(
255
{
256
"update-search": call_update_results,
257
"result-activated": execute_result,
258
"providers-menu-all-toggled": self.update_enabled_providers_all,
259
"menu-about": self.about,
260
"menu-providers": self.provider_menu,
261
"menu-preferences": self.preferences,
262
}
263
)
264
265
GLib.idle_add(self.update_enabled_providers, None)
266
267
def update_enabled_providers(self, widget=None):
268
self.providers = []
269
for provider, checkbox in self.provider_checkboxes.items():
270
if checkbox.get_active():
271
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
272
spec = importlib.util.spec_from_file_location(provider, self.enabled_providers[provider] / "__init__.py")
273
module = importlib.util.module_from_spec(spec)
274
spec.loader.exec_module(module)
275
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "r") as f:
276
provider_config = json.load(f)
277
loaded_provider = module.Provider(provider_config)
278
self.providers.append(loaded_provider)
279
else:
280
self.enabled_providers.pop(provider, None)
281
282
if self.update_task is not None:
283
self.update_task.cancel()
284
self.update_task = asyncio.create_task(self.update_results(widget))
285
286
def update_enabled_providers_all(self, widget):
287
for checkbox in self.provider_checkboxes.values():
288
checkbox.set_active(widget.get_active())
289
self.update_enabled_providers()
290
291
def kill(self, widget=None):
292
self.loop.stop()
293
self.quit()
294
295
def check_escape(self, widget, event):
296
results_list = self.builder.get_object("results-list")
297
search_entry = self.builder.get_object("search-query")
298
rows = results_list.get_children()
299
current_row = results_list.get_selected_row()
300
301
if event.keyval == Gdk.KEY_Escape:
302
self.kill()
303
return True
304
305
if event.keyval == Gdk.KEY_Down:
306
# Move to the next row
307
if current_row:
308
current_index = rows.index(current_row)
309
next_index = (current_index + 1) % len(rows) # Wrap to the beginning if at the end
310
else:
311
next_index = 0
312
313
results_list.select_row(rows[next_index])
314
search_entry.grab_focus() # Refocus the search entry
315
self.scroll_to_row(rows[next_index], results_list.get_adjustment())
316
return True
317
318
if event.keyval == Gdk.KEY_Up:
319
# Move to the previous row
320
if current_row:
321
current_index = rows.index(current_row)
322
prev_index = (current_index - 1) % len(rows) # Wrap to the end if at the beginning
323
else:
324
prev_index = len(rows) - 1
325
326
results_list.select_row(rows[prev_index])
327
search_entry.grab_focus() # Refocus the search entry
328
self.scroll_to_row(rows[prev_index], results_list.get_adjustment())
329
return True
330
331
if event.keyval == Gdk.KEY_Return:
332
if current_row:
333
# Execute the selected row
334
current_row.activate()
335
return True
336
elif len(rows) > 0:
337
# Execute the first row
338
rows[0].activate()
339
return True
340
341
return False
342
343
def scroll_to_row(self, row, adjustment):
344
row_geometry = row.get_allocation()
345
row_y = row_geometry.y
346
row_height = row_geometry.height
347
adjustment_value = adjustment.get_value()
348
adjustment_upper = adjustment.get_upper() - adjustment.get_page_size()
349
350
if row_y + row_height > adjustment_value + adjustment.get_page_size():
351
adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper))
352
elif row_y < adjustment_value:
353
adjustment.set_value(max(row_y, 0))
354
355
def about(self, widget):
356
about_builder = Gtk.Builder()
357
about_builder.add_from_file(str(PACKAGE_DIRECTORY / "about.ui"))
358
about_window = about_builder.get_object("about-dialog")
359
about_window.connect("destroy", lambda _: about_window.destroy())
360
about_window.connect("close", lambda _: about_window.destroy())
361
about_window.connect("response", lambda _, __: about_window.destroy())
362
about_window.show_all()
363
364
def provider_menu(self, widget):
365
providers_builder = Gtk.Builder()
366
providers_builder.add_from_file(str(PACKAGE_DIRECTORY / "providers.ui"))
367
providers_window = providers_builder.get_object("providers-window")
368
providers_window.connect("destroy", lambda _: providers_window.destroy())
369
370
providers_table = providers_builder.get_object("providers-table") # actually Gtk.Grid
371
rows = 0
372
for provider in self.available_providers.values():
373
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
374
module = importlib.util.module_from_spec(spec)
375
spec.loader.exec_module(module)
376
loaded_provider = module.Provider({})
377
378
provider_label = Gtk.Label(label=loaded_provider.name)
379
provider_label.set_halign(Gtk.Align.START)
380
provider_label.set_valign(Gtk.Align.CENTER)
381
font_desc = Pango.FontDescription()
382
font_desc.set_weight(Pango.Weight.BOLD)
383
provider_label.override_font(font_desc)
384
provider_label.set_line_wrap(True)
385
provider_label.set_justify(Gtk.Justification.LEFT)
386
387
provider_description = Gtk.Label(label=loaded_provider.description)
388
provider_description.set_halign(Gtk.Align.START)
389
provider_description.set_valign(Gtk.Align.CENTER)
390
provider_description.set_justify(Gtk.Justification.LEFT)
391
provider_description.set_line_wrap(True)
392
393
icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR)
394
icon.set_pixel_size(self.icon_size)
395
396
switch = Gtk.Switch()
397
switch.set_valign(Gtk.Align.CENTER)
398
switch.set_active(provider.stem in self.permanently_enabled_providers)
399
switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem)
400
401
config_button = Gtk.Button.new_with_label(_("Configure"))
402
config_button.connect("clicked", lambda _, stem=provider.stem: subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{stem}.json")]))
403
config_button.set_relief(Gtk.ReliefStyle.NONE)
404
405
providers_table.attach(icon, 0, rows, 1, 1)
406
providers_table.attach(provider_label, 1, rows, 1, 1)
407
providers_table.attach(provider_description, 2, rows, 1, 1)
408
providers_table.attach(switch, 3, rows, 1, 1)
409
providers_table.attach(config_button, 4, rows, 1, 1)
410
411
provider_label.show()
412
provider_description.show()
413
414
rows += 1
415
416
def open_provider_directory(widget):
417
subprocess.Popen(["xdg-open", str(self.USER_PROVIDERS)])
418
419
providers_builder.connect_signals(
420
{
421
"open-provider-directory": open_provider_directory,
422
}
423
)
424
425
providers_window.show_all()
426
427
def preferences(self, widget):
428
preferences_builder = Gtk.Builder()
429
preferences_builder.add_from_file(str(PACKAGE_DIRECTORY / "preferences.ui"))
430
preferences_window = preferences_builder.get_object("preferences-window")
431
preferences_window.connect("destroy", lambda _: preferences_window.destroy())
432
433
def update_appearance_preferences(widget, *args):
434
self.icon_size = preferences_builder.get_object("icon-size").get_value()
435
self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active()
436
437
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
438
json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f)
439
440
# Regenerate the results to reflect the changes
441
if self.update_task is not None:
442
self.update_task.cancel()
443
self.update_task = asyncio.create_task(self.update_results(widget))
444
445
preferences_builder.connect_signals(
446
{
447
"update-appearance": update_appearance_preferences,
448
}
449
)
450
451
preferences_window.show_all()
452
453
def update_permanently_enabled_providers(self, widget, param, provider):
454
if widget.get_active():
455
self.permanently_enabled_providers[provider] = self.available_providers[provider]
456
else:
457
self.permanently_enabled_providers.pop(provider, None)
458
459
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
460
f.write("\n".join(self.permanently_enabled_providers.keys()))
461
462
463
async def update_results(self, widget):
464
spinner = self.builder.get_object("spinner")
465
spinner.start()
466
generators = []
467
query = self.builder.get_object("search-query-buffer").get_text()
468
for provider in self.providers:
469
generators.append(provider.search)
470
471
# Clear the results list
472
results_list = self.builder.get_object("results-list")
473
for row in results_list.get_children():
474
results_list.remove(row)
475
476
try:
477
async for result in merge_generators_with_params(generators, query):
478
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions)
479
results_list.add(result_box)
480
result_box.show_all()
481
except Exception as e:
482
exception_type, exception, tb = sys.exc_info()
483
filename, line_number, function_name, line = traceback.extract_tb(tb)[-1]
484
print(f"{filename} raised an exception!")
485
print(f"{type(e).__name__}: {str(e)}")
486
dialog = ProviderExceptionDialog(Path(filename), e, self.window, tb=tb)
487
dialog.show_all()
488
response = dialog.run()
489
if response == 0:
490
# Open config
491
subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
492
493
self.kill()
494
elif response == 1:
495
# Reset config
496
spec = importlib.util.spec_from_file_location(get_parent_package(filename).stem, Path(filename))
497
module = importlib.util.module_from_spec(spec)
498
spec.loader.exec_module(module)
499
500
if hasattr(module, "config_template"):
501
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
502
json.dump(module.config_template, f)
503
else:
504
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
505
json.dump({}, f)
506
507
self.kill()
508
elif response == Gtk.ResponseType.CLOSE:
509
self.kill()
510
511
results_list.show_all()
512
spinner.stop()
513
514
515
if __name__ == "__main__":
516
izvor = Izvor()
517
izvor.window.show_all()
518
try:
519
izvor.loop.run_forever()
520
finally:
521
izvor.loop.close()
522