By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 __init__.py

View raw Download
text/x-script.python • 22.85 kiB
Python script, ASCII text executable
        
            
1
#!/usr/bin/env python3
2
3
"""
4
Izvor - modular, GTK+ desktop search and launcher
5
Copyright (C) 2024 <root@roundabout-host.com>
6
7
This program is free software: you can redistribute it and/or modify
8
it under the terms of the GNU General Public License as published by
9
the Free Software Foundation, either version 3 of the License, or
10
(at your option) any later version.
11
12
This program is distributed in the hope that it will be useful,
13
but WITHOUT ANY WARRANTY; without even the implied warranty of
14
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
GNU General Public License for more details.
16
17
You should have received a copy of the GNU General Public License
18
along with this program. If not, see <https://www.gnu.org/licenses/>.
19
"""
20
21
import os
22
import subprocess
23
24
import gi
25
from gi.events import GLibEventLoopPolicy
26
import asyncio
27
import importlib
28
from pathlib import Path
29
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
30
31
gi.require_version("Gtk", "3.0")
32
gi.require_version("Pango", "1.0")
33
from gi.repository import Gtk, Gdk, Pango, GLib
34
import gettext
35
import json
36
import traceback
37
import sys
38
39
_ = gettext.gettext
40
41
_T = TypeVar("_T")
42
43
PACKAGE_DIRECTORY = Path(__file__).resolve().parent
44
45
def get_parent_package(path):
46
parent_path = os.path.split(path)[0]
47
while parent_path != "/":
48
if "__init__.py" in os.listdir(parent_path):
49
return Path(parent_path)
50
parent_path = os.path.split(parent_path)[0]
51
return None
52
53
54
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
55
*args, **kwargs):
56
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
57
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
58
59
while tasks:
60
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
61
62
for task in done:
63
iterator = tasks.pop(task)
64
try:
65
result = task.result()
66
yield result
67
tasks[asyncio.create_task(iterator.__anext__())] = iterator
68
except StopAsyncIteration:
69
pass
70
71
72
class ResultInfoWidget(Gtk.ListBoxRow):
73
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True):
74
super().__init__()
75
self.name = name
76
self.description = description
77
self.image = image
78
self.execute = execute
79
80
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
81
self.add(self.box)
82
83
if icon_size:
84
# If the icon size is 0, have no icons
85
if image[0] == "logo":
86
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
87
self.image.set_pixel_size(icon_size)
88
self.box.pack_start(self.image, False, False, 0)
89
90
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
91
self.label_box.set_valign(Gtk.Align.START)
92
self.box.pack_start(self.label_box, True, True, 0)
93
94
self.name_label = Gtk.Label(label=self.name)
95
self.name_label.set_line_wrap(True)
96
self.name_label.set_justify(Gtk.Justification.LEFT)
97
self.name_label.set_halign(Gtk.Align.START)
98
self.name_label.set_line_wrap_mode(Pango.WrapMode.WORD_CHAR)
99
attributes = Pango.AttrList()
100
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
101
self.name_label.set_attributes(attributes)
102
self.label_box.pack_start(self.name_label, True, True, 0)
103
104
if show_description:
105
self.description_label = Gtk.Label(label=self.description)
106
self.description_label.set_line_wrap(True)
107
self.description_label.set_justify(Gtk.Justification.LEFT)
108
self.description_label.set_halign(Gtk.Align.START)
109
self.description_label.set_line_wrap_mode(Pango.WrapMode.WORD_CHAR)
110
self.label_box.pack_start(self.description_label, True, True, 0)
111
112
113
class ProviderExceptionDialog(Gtk.Dialog):
114
def __init__(self, provider: Path, exception: Exception, parent=None, tb=None):
115
super().__init__(title=_("{provider} raised a {exception}".format(provider=get_parent_package(provider), exception=type(exception).__name__)), transient_for=parent)
116
self.add_button(_("Open config"), 0)
117
self.add_button(_("Reset config"), 1)
118
self.add_button(_("Quit app"), Gtk.ResponseType.CLOSE)
119
self.set_default_response(Gtk.ResponseType.CLOSE)
120
self.set_default_size(360, 240)
121
self.set_resizable(True)
122
123
self.provider = provider
124
self.exception = exception
125
126
traceback_text = "\n".join(traceback.format_exception(type(exception), exception, tb))
127
128
self.label = Gtk.Label(label=traceback_text)
129
130
monospaced_font = Pango.FontDescription()
131
monospaced_font.set_family("monospace")
132
self.label.override_font(monospaced_font)
133
self.label.set_line_wrap(True)
134
self.label.set_justify(Gtk.Justification.LEFT)
135
self.label.set_halign(Gtk.Align.START)
136
self.label.set_selectable(True)
137
138
self.get_content_area().add(self.label)
139
self.label.show()
140
141
class Izvor(Gtk.Application):
142
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
143
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
144
USER_PROVIDERS = USER_DATA / "providers"
145
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
146
SYSTEM_CONFIGS = Path("/etc/izvor")
147
SYSTEM_DATA = Path("/usr/share/izvor")
148
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
149
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
150
ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers"
151
APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json"
152
153
def __init__(self):
154
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
155
asyncio.set_event_loop_policy(GLibEventLoopPolicy())
156
self.loop = asyncio.get_event_loop()
157
self.builder = Gtk.Builder()
158
self.builder.add_from_file(str(PACKAGE_DIRECTORY / "izvor.ui"))
159
self.window = self.builder.get_object("root")
160
self.window.connect("destroy", self.kill)
161
self.window.connect("key-press-event", self.check_escape)
162
self.window.set_title(_("Launcher"))
163
164
if not os.getenv("XDG_DATA_HOME"):
165
print("XDG_DATA_HOME is not set. Using default path.")
166
if not os.getenv("XDG_CONFIG_HOME"):
167
print("XDG_CONFIG_HOME is not set. Using default path.")
168
if not self.USER_PROVIDER_CONFIGS.exists():
169
print("Creating user config directory.")
170
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
171
if not self.USER_PROVIDERS.exists():
172
print("Creating user data directory.")
173
self.USER_PROVIDERS.mkdir(parents=True)
174
175
self.available_providers = {}
176
177
if self.SYSTEM_PROVIDERS.exists():
178
for provider in self.SYSTEM_PROVIDERS.iterdir():
179
if not (provider / "__init__.py").exists():
180
continue
181
self.available_providers[provider.stem] = provider
182
if self.USER_PROVIDERS.exists():
183
for provider in self.USER_PROVIDERS.iterdir():
184
if not (provider / "__init__.py").exists():
185
continue
186
self.available_providers[provider.stem] = provider
187
188
print(f"Available providers: {list(self.available_providers.keys())}")
189
190
for provider, path in self.available_providers.items():
191
if not (self.USER_PROVIDER_CONFIGS / f"{provider}.json").exists():
192
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "w") as f:
193
spec = importlib.util.spec_from_file_location(provider, path / "__init__.py")
194
module = importlib.util.module_from_spec(spec)
195
spec.loader.exec_module(module)
196
if hasattr(module, "config_template"):
197
json.dump(module.config_template, f)
198
else:
199
json.dump({}, f)
200
201
if not self.ENABLED_PROVIDERS_FILE.exists():
202
self.ENABLED_PROVIDERS_FILE.touch()
203
204
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
205
for provider in self.available_providers:
206
f.write(provider + "\n")
207
208
if not self.APPEARANCE_CONFIG_FILE.exists():
209
self.APPEARANCE_CONFIG_FILE.touch()
210
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
211
json.dump({"icon_size": 32, "show_result_descriptions": True}, f)
212
213
with open(self.APPEARANCE_CONFIG_FILE, "r") as f:
214
appearance_config = json.load(f)
215
self.icon_size = appearance_config.get("icon_size", 32)
216
self.show_result_descriptions = appearance_config.get("show_result_descriptions", True)
217
218
self.permanently_enabled_providers = {}
219
220
with open(self.ENABLED_PROVIDERS_FILE, "r") as f:
221
for line in f:
222
provider = line.strip()
223
if provider in self.available_providers:
224
self.permanently_enabled_providers[provider] = self.available_providers[provider]
225
226
self.enabled_providers = self.permanently_enabled_providers.copy()
227
228
providers_menu = self.builder.get_object("providers-menu")
229
230
self.providers = []
231
self.provider_checkboxes = {}
232
233
for provider in self.permanently_enabled_providers.values():
234
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
235
module = importlib.util.module_from_spec(spec)
236
spec.loader.exec_module(module)
237
with open(self.USER_PROVIDER_CONFIGS / f"{provider.stem}.json", "r") as f:
238
provider_config = json.load(f)
239
loaded_provider = module.Provider(provider_config)
240
self.providers.append(loaded_provider)
241
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
242
providers_menu.append(self.provider_checkboxes[provider.stem])
243
self.provider_checkboxes[provider.stem].set_active(True)
244
self.provider_checkboxes[provider.stem].show()
245
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
246
247
self.update_task = None
248
249
def call_update_results(widget):
250
if self.update_task is not None:
251
self.update_task.cancel()
252
self.update_task = asyncio.create_task(self.update_results(widget))
253
254
def execute_result(widget, row):
255
row.execute()
256
self.kill()
257
258
self.builder.connect_signals(
259
{
260
"update-search": call_update_results,
261
"result-activated": execute_result,
262
"providers-menu-all-toggled": self.update_enabled_providers_all,
263
"menu-about": self.about,
264
"menu-providers": self.provider_menu,
265
"menu-preferences": self.preferences,
266
}
267
)
268
269
GLib.idle_add(self.update_enabled_providers, None)
270
271
def update_enabled_providers(self, widget=None):
272
self.providers = []
273
for provider, checkbox in self.provider_checkboxes.items():
274
if checkbox.get_active():
275
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
276
spec = importlib.util.spec_from_file_location(provider, self.enabled_providers[provider] / "__init__.py")
277
module = importlib.util.module_from_spec(spec)
278
spec.loader.exec_module(module)
279
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "r") as f:
280
provider_config = json.load(f)
281
loaded_provider = module.Provider(provider_config)
282
self.providers.append(loaded_provider)
283
else:
284
self.enabled_providers.pop(provider, None)
285
286
if self.update_task is not None:
287
self.update_task.cancel()
288
self.update_task = asyncio.create_task(self.update_results(widget))
289
290
def update_enabled_providers_all(self, widget):
291
for checkbox in self.provider_checkboxes.values():
292
checkbox.set_active(widget.get_active())
293
self.update_enabled_providers()
294
295
def kill(self, widget=None):
296
self.loop.stop()
297
self.quit()
298
299
def check_escape(self, widget, event):
300
results_list = self.builder.get_object("results-list")
301
search_entry = self.builder.get_object("search-query")
302
rows = results_list.get_children()
303
current_row = results_list.get_selected_row()
304
305
if event.keyval == Gdk.KEY_Escape:
306
self.kill()
307
return True
308
309
if event.keyval == Gdk.KEY_Down:
310
# Move to the next row
311
if current_row:
312
current_index = rows.index(current_row)
313
next_index = (current_index + 1) % len(rows) # Wrap to the beginning if at the end
314
else:
315
next_index = 0
316
317
results_list.select_row(rows[next_index])
318
search_entry.grab_focus() # Refocus the search entry
319
self.scroll_to_row(rows[next_index], results_list.get_adjustment())
320
return True
321
322
if event.keyval == Gdk.KEY_Up:
323
# Move to the previous row
324
if current_row:
325
current_index = rows.index(current_row)
326
prev_index = (current_index - 1) % len(rows) # Wrap to the end if at the beginning
327
else:
328
prev_index = len(rows) - 1
329
330
results_list.select_row(rows[prev_index])
331
search_entry.grab_focus() # Refocus the search entry
332
self.scroll_to_row(rows[prev_index], results_list.get_adjustment())
333
return True
334
335
if event.keyval == Gdk.KEY_Return:
336
if current_row:
337
# Execute the selected row
338
current_row.activate()
339
return True
340
elif len(rows) > 0:
341
# Execute the first row
342
rows[0].activate()
343
return True
344
345
return False
346
347
def scroll_to_row(self, row, adjustment):
348
row_geometry = row.get_allocation()
349
row_y = row_geometry.y
350
row_height = row_geometry.height
351
adjustment_value = adjustment.get_value()
352
adjustment_upper = adjustment.get_upper() - adjustment.get_page_size()
353
354
if row_y + row_height > adjustment_value + adjustment.get_page_size():
355
adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper))
356
elif row_y < adjustment_value:
357
adjustment.set_value(max(row_y, 0))
358
359
def about(self, widget):
360
about_builder = Gtk.Builder()
361
about_builder.add_from_file(str(PACKAGE_DIRECTORY / "about.ui"))
362
about_window = about_builder.get_object("about-dialog")
363
about_window.connect("destroy", lambda _: about_window.destroy())
364
about_window.connect("close", lambda _: about_window.destroy())
365
about_window.connect("response", lambda _, __: about_window.destroy())
366
about_window.show_all()
367
368
def provider_menu(self, widget):
369
providers_builder = Gtk.Builder()
370
providers_builder.add_from_file(str(PACKAGE_DIRECTORY / "providers.ui"))
371
providers_window = providers_builder.get_object("providers-window")
372
providers_window.connect("destroy", lambda _: providers_window.destroy())
373
374
providers_table = providers_builder.get_object("providers-table") # actually Gtk.Grid
375
rows = 0
376
for provider in self.available_providers.values():
377
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
378
module = importlib.util.module_from_spec(spec)
379
spec.loader.exec_module(module)
380
loaded_provider = module.Provider({})
381
382
provider_label = Gtk.Label(label=loaded_provider.name)
383
provider_label.set_halign(Gtk.Align.START)
384
provider_label.set_valign(Gtk.Align.CENTER)
385
font_desc = Pango.FontDescription()
386
font_desc.set_weight(Pango.Weight.BOLD)
387
provider_label.override_font(font_desc)
388
provider_label.set_line_wrap(True)
389
provider_label.set_justify(Gtk.Justification.LEFT)
390
391
provider_description = Gtk.Label(label=loaded_provider.description)
392
provider_description.set_halign(Gtk.Align.START)
393
provider_description.set_valign(Gtk.Align.CENTER)
394
provider_description.set_justify(Gtk.Justification.LEFT)
395
provider_description.set_line_wrap(True)
396
397
icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR)
398
icon.set_pixel_size(self.icon_size)
399
400
switch = Gtk.Switch()
401
switch.set_valign(Gtk.Align.CENTER)
402
switch.set_active(provider.stem in self.permanently_enabled_providers)
403
switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem)
404
405
config_button = Gtk.Button.new_with_label(_("Configure"))
406
def open_config(widget, stem=provider.stem):
407
# User-accessible provider config, not Flatpak sandboxed
408
config_path = str(self.USER_PROVIDER_CONFIGS / f"{stem}.json")
409
if os.getenv("FLATPAK_SANDBOX_DIR"):
410
subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", config_path])
411
else:
412
subprocess.Popen(["xdg-open", config_path])
413
config_button.connect("clicked", open_config)
414
config_button.set_relief(Gtk.ReliefStyle.NONE)
415
416
providers_table.attach(icon, 0, rows, 1, 1)
417
providers_table.attach(provider_label, 1, rows, 1, 1)
418
providers_table.attach(provider_description, 2, rows, 1, 1)
419
providers_table.attach(switch, 3, rows, 1, 1)
420
providers_table.attach(config_button, 4, rows, 1, 1)
421
422
provider_label.show()
423
provider_description.show()
424
425
rows += 1
426
427
def open_provider_directory(widget):
428
# User-accessible provider directory, not Flatpak sandboxed
429
provider_directory = str(self.USER_PROVIDERS)
430
if os.getenv("FLATPAK_SANDBOX_DIR"):
431
subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", provider_directory])
432
else:
433
subprocess.Popen(["xdg-open", provider_directory])
434
435
providers_builder.connect_signals(
436
{
437
"open-provider-directory": open_provider_directory,
438
}
439
)
440
441
providers_window.show_all()
442
443
def preferences(self, widget):
444
preferences_builder = Gtk.Builder()
445
preferences_builder.add_from_file(str(PACKAGE_DIRECTORY / "preferences.ui"))
446
preferences_window = preferences_builder.get_object("preferences-window")
447
preferences_window.connect("destroy", lambda _: preferences_window.destroy())
448
449
def update_appearance_preferences(widget, *args):
450
self.icon_size = preferences_builder.get_object("icon-size").get_value()
451
self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active()
452
453
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
454
json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f)
455
456
# Regenerate the results to reflect the changes
457
if self.update_task is not None:
458
self.update_task.cancel()
459
self.update_task = asyncio.create_task(self.update_results(widget))
460
461
preferences_builder.connect_signals(
462
{
463
"update-appearance": update_appearance_preferences,
464
}
465
)
466
467
preferences_window.show_all()
468
469
def update_permanently_enabled_providers(self, widget, param, provider):
470
if widget.get_active():
471
self.permanently_enabled_providers[provider] = self.available_providers[provider]
472
else:
473
self.permanently_enabled_providers.pop(provider, None)
474
475
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
476
f.write("\n".join(self.permanently_enabled_providers.keys()))
477
478
479
async def update_results(self, widget):
480
spinner = self.builder.get_object("spinner")
481
spinner.start()
482
generators = []
483
query = self.builder.get_object("search-query-buffer").get_text()
484
for provider in self.providers:
485
generators.append(provider.search)
486
487
# Clear the results list
488
results_list = self.builder.get_object("results-list")
489
for row in results_list.get_children():
490
results_list.remove(row)
491
492
try:
493
async for result in merge_generators_with_params(generators, query):
494
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions)
495
results_list.add(result_box)
496
result_box.show_all()
497
except Exception as e:
498
exception_type, exception, tb = sys.exc_info()
499
filename, line_number, function_name, line = traceback.extract_tb(tb)[-1]
500
print(f"{filename} raised an exception!")
501
print(f"{type(e).__name__}: {str(e)}")
502
dialog = ProviderExceptionDialog(Path(filename), e, self.window, tb=tb)
503
dialog.show_all()
504
response = dialog.run()
505
if response == 0:
506
# Open config
507
if os.getenv("FLATPAK_SANDBOX_DIR"):
508
subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
509
else:
510
subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
511
512
self.kill()
513
elif response == 1:
514
# Reset config
515
spec = importlib.util.spec_from_file_location(get_parent_package(filename).stem, Path(filename))
516
module = importlib.util.module_from_spec(spec)
517
spec.loader.exec_module(module)
518
519
if hasattr(module, "config_template"):
520
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
521
json.dump(module.config_template, f)
522
else:
523
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
524
json.dump({}, f)
525
526
self.kill()
527
elif response == Gtk.ResponseType.CLOSE:
528
self.kill()
529
530
results_list.show_all()
531
spinner.stop()
532
533
534
if __name__ == "__main__":
535
izvor = Izvor()
536
izvor.window.show_all()
537
izvor.window.present()
538
try:
539
izvor.loop.run_forever()
540
finally:
541
izvor.loop.close()
542