By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 __init__.py

View raw Download
text/x-script.python • 22.68 kiB
Python script, ASCII text executable
        
            
1
#!/usr/bin/env python3
2
3
"""
4
Izvor - modular, GTK+ desktop search and launcher
5
Copyright (C) 2024 <root@roundabout-host.com>
6
7
This program is free software: you can redistribute it and/or modify
8
it under the terms of the GNU General Public License as published by
9
the Free Software Foundation, either version 3 of the License, or
10
(at your option) any later version.
11
12
This program is distributed in the hope that it will be useful,
13
but WITHOUT ANY WARRANTY; without even the implied warranty of
14
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
GNU General Public License for more details.
16
17
You should have received a copy of the GNU General Public License
18
along with this program. If not, see <https://www.gnu.org/licenses/>.
19
"""
20
21
import os
22
import subprocess
23
24
import gi
25
from gi.events import GLibEventLoopPolicy
26
import asyncio
27
import importlib
28
from pathlib import Path
29
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
30
31
gi.require_version("Gtk", "3.0")
32
from gi.repository import Gtk, Gdk, Pango, GLib
33
import gettext
34
import json
35
import traceback
36
import sys
37
38
_ = gettext.gettext
39
40
_T = TypeVar("_T")
41
42
PACKAGE_DIRECTORY = Path(__file__).resolve().parent
43
44
def get_parent_package(path):
45
parent_path = os.path.split(path)[0]
46
while parent_path != "/":
47
if "__init__.py" in os.listdir(parent_path):
48
return Path(parent_path)
49
parent_path = os.path.split(parent_path)[0]
50
return None
51
52
53
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
54
*args, **kwargs):
55
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
56
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
57
58
while tasks:
59
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
60
61
for task in done:
62
iterator = tasks.pop(task)
63
try:
64
result = task.result()
65
yield result
66
tasks[asyncio.create_task(iterator.__anext__())] = iterator
67
except StopAsyncIteration:
68
pass
69
70
71
class ResultInfoWidget(Gtk.ListBoxRow):
72
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True):
73
super().__init__()
74
self.name = name
75
self.description = description
76
self.image = image
77
self.execute = execute
78
79
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
80
self.add(self.box)
81
82
if icon_size:
83
# If the icon size is 0, have no icons
84
if image[0] == "logo":
85
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
86
self.image.set_pixel_size(icon_size)
87
self.box.pack_start(self.image, False, False, 0)
88
89
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
90
self.box.pack_start(self.label_box, True, True, 0)
91
92
self.name_label = Gtk.Label(label=self.name)
93
self.name_label.set_line_wrap(True)
94
self.name_label.set_justify(Gtk.Justification.LEFT)
95
self.name_label.set_halign(Gtk.Align.START)
96
attributes = Pango.AttrList()
97
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
98
self.name_label.set_attributes(attributes)
99
self.label_box.pack_start(self.name_label, True, True, 0)
100
101
if show_description:
102
self.description_label = Gtk.Label(label=self.description)
103
self.description_label.set_line_wrap(True)
104
self.description_label.set_justify(Gtk.Justification.LEFT)
105
self.description_label.set_halign(Gtk.Align.START)
106
self.label_box.pack_start(self.description_label, True, True, 0)
107
108
109
class ProviderExceptionDialog(Gtk.Dialog):
110
def __init__(self, provider: Path, exception: Exception, parent=None, tb=None):
111
super().__init__(title=_("{provider} raised a {exception}".format(provider=get_parent_package(provider), exception=type(exception).__name__)), transient_for=parent)
112
self.add_button(_("Open config"), 0)
113
self.add_button(_("Reset config"), 1)
114
self.add_button(_("Quit app"), Gtk.ResponseType.CLOSE)
115
self.set_default_response(Gtk.ResponseType.CLOSE)
116
self.set_default_size(360, 240)
117
self.set_resizable(True)
118
119
self.provider = provider
120
self.exception = exception
121
122
traceback_text = "\n".join(traceback.format_exception(type(exception), exception, tb))
123
124
self.label = Gtk.Label(label=traceback_text)
125
126
monospaced_font = Pango.FontDescription()
127
monospaced_font.set_family("monospace")
128
self.label.override_font(monospaced_font)
129
self.label.set_line_wrap(True)
130
self.label.set_justify(Gtk.Justification.LEFT)
131
self.label.set_halign(Gtk.Align.START)
132
self.label.set_selectable(True)
133
134
self.get_content_area().add(self.label)
135
self.label.show()
136
137
class Izvor(Gtk.Application):
138
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
139
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
140
USER_PROVIDERS = USER_DATA / "providers"
141
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
142
SYSTEM_CONFIGS = Path("/etc/izvor")
143
SYSTEM_DATA = Path("/usr/share/izvor")
144
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
145
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
146
ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers"
147
APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json"
148
149
def __init__(self):
150
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
151
asyncio.set_event_loop_policy(GLibEventLoopPolicy())
152
self.loop = asyncio.get_event_loop()
153
self.builder = Gtk.Builder()
154
self.builder.add_from_file(str(PACKAGE_DIRECTORY / "izvor.ui"))
155
self.window = self.builder.get_object("root")
156
self.window.connect("destroy", self.kill)
157
self.window.connect("key-press-event", self.check_escape)
158
self.window.set_title(_("Launcher"))
159
160
if not os.getenv("XDG_DATA_HOME"):
161
print("XDG_DATA_HOME is not set. Using default path.")
162
if not os.getenv("XDG_CONFIG_HOME"):
163
print("XDG_CONFIG_HOME is not set. Using default path.")
164
if not self.USER_PROVIDER_CONFIGS.exists():
165
print("Creating user config directory.")
166
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
167
if not self.USER_PROVIDERS.exists():
168
print("Creating user data directory.")
169
self.USER_PROVIDERS.mkdir(parents=True)
170
171
self.available_providers = {}
172
173
if self.SYSTEM_PROVIDERS.exists():
174
for provider in self.SYSTEM_PROVIDERS.iterdir():
175
if not (provider / "__init__.py").exists():
176
continue
177
self.available_providers[provider.stem] = provider
178
if self.USER_PROVIDERS.exists():
179
for provider in self.USER_PROVIDERS.iterdir():
180
if not (provider / "__init__.py").exists():
181
continue
182
self.available_providers[provider.stem] = provider
183
184
print(f"Available providers: {list(self.available_providers.keys())}")
185
186
for provider, path in self.available_providers.items():
187
if not (self.USER_PROVIDER_CONFIGS / f"{provider}.json").exists():
188
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "w") as f:
189
spec = importlib.util.spec_from_file_location(provider, path / "__init__.py")
190
module = importlib.util.module_from_spec(spec)
191
spec.loader.exec_module(module)
192
if hasattr(module, "config_template"):
193
json.dump(module.config_template, f)
194
else:
195
json.dump({}, f)
196
197
if not self.ENABLED_PROVIDERS_FILE.exists():
198
self.ENABLED_PROVIDERS_FILE.touch()
199
200
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
201
for provider in self.available_providers:
202
f.write(provider + "\n")
203
204
if not self.APPEARANCE_CONFIG_FILE.exists():
205
self.APPEARANCE_CONFIG_FILE.touch()
206
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
207
json.dump({"icon_size": 32, "show_result_descriptions": True}, f)
208
209
with open(self.APPEARANCE_CONFIG_FILE, "r") as f:
210
appearance_config = json.load(f)
211
self.icon_size = appearance_config.get("icon_size", 32)
212
self.show_result_descriptions = appearance_config.get("show_result_descriptions", True)
213
214
self.permanently_enabled_providers = {}
215
216
with open(self.ENABLED_PROVIDERS_FILE, "r") as f:
217
for line in f:
218
provider = line.strip()
219
if provider in self.available_providers:
220
self.permanently_enabled_providers[provider] = self.available_providers[provider]
221
222
self.enabled_providers = self.permanently_enabled_providers.copy()
223
224
providers_menu = self.builder.get_object("providers-menu")
225
226
self.providers = []
227
self.provider_checkboxes = {}
228
229
for provider in self.permanently_enabled_providers.values():
230
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
231
module = importlib.util.module_from_spec(spec)
232
spec.loader.exec_module(module)
233
with open(self.USER_PROVIDER_CONFIGS / f"{provider.stem}.json", "r") as f:
234
provider_config = json.load(f)
235
loaded_provider = module.Provider(provider_config)
236
self.providers.append(loaded_provider)
237
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
238
providers_menu.append(self.provider_checkboxes[provider.stem])
239
self.provider_checkboxes[provider.stem].set_active(True)
240
self.provider_checkboxes[provider.stem].show()
241
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
242
243
self.update_task = None
244
245
def call_update_results(widget):
246
if self.update_task is not None:
247
self.update_task.cancel()
248
self.update_task = asyncio.create_task(self.update_results(widget))
249
250
def execute_result(widget, row):
251
row.execute()
252
self.kill()
253
254
self.builder.connect_signals(
255
{
256
"update-search": call_update_results,
257
"result-activated": execute_result,
258
"providers-menu-all-toggled": self.update_enabled_providers_all,
259
"menu-about": self.about,
260
"menu-providers": self.provider_menu,
261
"menu-preferences": self.preferences,
262
}
263
)
264
265
GLib.idle_add(self.update_enabled_providers, None)
266
267
def update_enabled_providers(self, widget=None):
268
self.providers = []
269
for provider, checkbox in self.provider_checkboxes.items():
270
if checkbox.get_active():
271
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
272
spec = importlib.util.spec_from_file_location(provider, self.enabled_providers[provider] / "__init__.py")
273
module = importlib.util.module_from_spec(spec)
274
spec.loader.exec_module(module)
275
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "r") as f:
276
provider_config = json.load(f)
277
loaded_provider = module.Provider(provider_config)
278
self.providers.append(loaded_provider)
279
else:
280
self.enabled_providers.pop(provider, None)
281
282
if self.update_task is not None:
283
self.update_task.cancel()
284
self.update_task = asyncio.create_task(self.update_results(widget))
285
286
def update_enabled_providers_all(self, widget):
287
for checkbox in self.provider_checkboxes.values():
288
checkbox.set_active(widget.get_active())
289
self.update_enabled_providers()
290
291
def kill(self, widget=None):
292
self.loop.stop()
293
self.quit()
294
295
def check_escape(self, widget, event):
296
results_list = self.builder.get_object("results-list")
297
search_entry = self.builder.get_object("search-query")
298
rows = results_list.get_children()
299
current_row = results_list.get_selected_row()
300
301
if event.keyval == Gdk.KEY_Escape:
302
self.kill()
303
return True
304
305
if event.keyval == Gdk.KEY_Down:
306
# Move to the next row
307
if current_row:
308
current_index = rows.index(current_row)
309
next_index = (current_index + 1) % len(rows) # Wrap to the beginning if at the end
310
else:
311
next_index = 0
312
313
results_list.select_row(rows[next_index])
314
search_entry.grab_focus() # Refocus the search entry
315
self.scroll_to_row(rows[next_index], results_list.get_adjustment())
316
return True
317
318
if event.keyval == Gdk.KEY_Up:
319
# Move to the previous row
320
if current_row:
321
current_index = rows.index(current_row)
322
prev_index = (current_index - 1) % len(rows) # Wrap to the end if at the beginning
323
else:
324
prev_index = len(rows) - 1
325
326
results_list.select_row(rows[prev_index])
327
search_entry.grab_focus() # Refocus the search entry
328
self.scroll_to_row(rows[prev_index], results_list.get_adjustment())
329
return True
330
331
if event.keyval == Gdk.KEY_Return:
332
if current_row:
333
# Execute the selected row
334
current_row.activate()
335
return True
336
elif len(rows) > 0:
337
# Execute the first row
338
rows[0].activate()
339
return True
340
341
return False
342
343
def scroll_to_row(self, row, adjustment):
344
row_geometry = row.get_allocation()
345
row_y = row_geometry.y
346
row_height = row_geometry.height
347
adjustment_value = adjustment.get_value()
348
adjustment_upper = adjustment.get_upper() - adjustment.get_page_size()
349
350
if row_y + row_height > adjustment_value + adjustment.get_page_size():
351
adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper))
352
elif row_y < adjustment_value:
353
adjustment.set_value(max(row_y, 0))
354
355
def about(self, widget):
356
about_builder = Gtk.Builder()
357
about_builder.add_from_file(str(PACKAGE_DIRECTORY / "about.ui"))
358
about_window = about_builder.get_object("about-dialog")
359
about_window.connect("destroy", lambda _: about_window.destroy())
360
about_window.connect("close", lambda _: about_window.destroy())
361
about_window.connect("response", lambda _, __: about_window.destroy())
362
about_window.show_all()
363
364
def provider_menu(self, widget):
365
providers_builder = Gtk.Builder()
366
providers_builder.add_from_file(str(PACKAGE_DIRECTORY / "providers.ui"))
367
providers_window = providers_builder.get_object("providers-window")
368
providers_window.connect("destroy", lambda _: providers_window.destroy())
369
370
providers_table = providers_builder.get_object("providers-table") # actually Gtk.Grid
371
rows = 0
372
for provider in self.available_providers.values():
373
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
374
module = importlib.util.module_from_spec(spec)
375
spec.loader.exec_module(module)
376
loaded_provider = module.Provider({})
377
378
provider_label = Gtk.Label(label=loaded_provider.name)
379
provider_label.set_halign(Gtk.Align.START)
380
provider_label.set_valign(Gtk.Align.CENTER)
381
font_desc = Pango.FontDescription()
382
font_desc.set_weight(Pango.Weight.BOLD)
383
provider_label.override_font(font_desc)
384
provider_label.set_line_wrap(True)
385
provider_label.set_justify(Gtk.Justification.LEFT)
386
387
provider_description = Gtk.Label(label=loaded_provider.description)
388
provider_description.set_halign(Gtk.Align.START)
389
provider_description.set_valign(Gtk.Align.CENTER)
390
provider_description.set_justify(Gtk.Justification.LEFT)
391
provider_description.set_line_wrap(True)
392
393
icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR)
394
icon.set_pixel_size(self.icon_size)
395
396
switch = Gtk.Switch()
397
switch.set_valign(Gtk.Align.CENTER)
398
switch.set_active(provider.stem in self.permanently_enabled_providers)
399
switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem)
400
401
config_button = Gtk.Button.new_with_label(_("Configure"))
402
def open_config(widget, stem=provider.stem):
403
# User-accessible provider config, not Flatpak sandboxed
404
config_path = str(Path.home() / ".config" / "izvor" / "providers" / f"{stem}.json")
405
if os.getenv("FLATPAK_SANDBOX_DIR"):
406
subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", config_path])
407
else:
408
subprocess.Popen(["xdg-open", config_path])
409
config_button.connect("clicked", open_config)
410
config_button.set_relief(Gtk.ReliefStyle.NONE)
411
412
providers_table.attach(icon, 0, rows, 1, 1)
413
providers_table.attach(provider_label, 1, rows, 1, 1)
414
providers_table.attach(provider_description, 2, rows, 1, 1)
415
providers_table.attach(switch, 3, rows, 1, 1)
416
providers_table.attach(config_button, 4, rows, 1, 1)
417
418
provider_label.show()
419
provider_description.show()
420
421
rows += 1
422
423
def open_provider_directory(widget):
424
# User-accessible provider directory, not Flatpak sandboxed
425
provider_directory = str(Path.home() / ".local" / "share" / "izvor" / "providers")
426
if os.getenv("FLATPAK_SANDBOX_DIR"):
427
subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", provider_directory])
428
else:
429
subprocess.Popen(["xdg-open", provider_directory])
430
431
providers_builder.connect_signals(
432
{
433
"open-provider-directory": open_provider_directory,
434
}
435
)
436
437
providers_window.show_all()
438
439
def preferences(self, widget):
440
preferences_builder = Gtk.Builder()
441
preferences_builder.add_from_file(str(PACKAGE_DIRECTORY / "preferences.ui"))
442
preferences_window = preferences_builder.get_object("preferences-window")
443
preferences_window.connect("destroy", lambda _: preferences_window.destroy())
444
445
def update_appearance_preferences(widget, *args):
446
self.icon_size = preferences_builder.get_object("icon-size").get_value()
447
self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active()
448
449
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
450
json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f)
451
452
# Regenerate the results to reflect the changes
453
if self.update_task is not None:
454
self.update_task.cancel()
455
self.update_task = asyncio.create_task(self.update_results(widget))
456
457
preferences_builder.connect_signals(
458
{
459
"update-appearance": update_appearance_preferences,
460
}
461
)
462
463
preferences_window.show_all()
464
465
def update_permanently_enabled_providers(self, widget, param, provider):
466
if widget.get_active():
467
self.permanently_enabled_providers[provider] = self.available_providers[provider]
468
else:
469
self.permanently_enabled_providers.pop(provider, None)
470
471
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
472
f.write("\n".join(self.permanently_enabled_providers.keys()))
473
474
475
async def update_results(self, widget):
476
spinner = self.builder.get_object("spinner")
477
spinner.start()
478
generators = []
479
query = self.builder.get_object("search-query-buffer").get_text()
480
for provider in self.providers:
481
generators.append(provider.search)
482
483
# Clear the results list
484
results_list = self.builder.get_object("results-list")
485
for row in results_list.get_children():
486
results_list.remove(row)
487
488
try:
489
async for result in merge_generators_with_params(generators, query):
490
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions)
491
results_list.add(result_box)
492
result_box.show_all()
493
except Exception as e:
494
exception_type, exception, tb = sys.exc_info()
495
filename, line_number, function_name, line = traceback.extract_tb(tb)[-1]
496
print(f"{filename} raised an exception!")
497
print(f"{type(e).__name__}: {str(e)}")
498
dialog = ProviderExceptionDialog(Path(filename), e, self.window, tb=tb)
499
dialog.show_all()
500
response = dialog.run()
501
if response == 0:
502
# Open config
503
if os.getenv("FLATPAK_SANDBOX_DIR"):
504
subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
505
else:
506
subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
507
508
self.kill()
509
elif response == 1:
510
# Reset config
511
spec = importlib.util.spec_from_file_location(get_parent_package(filename).stem, Path(filename))
512
module = importlib.util.module_from_spec(spec)
513
spec.loader.exec_module(module)
514
515
if hasattr(module, "config_template"):
516
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
517
json.dump(module.config_template, f)
518
else:
519
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
520
json.dump({}, f)
521
522
self.kill()
523
elif response == Gtk.ResponseType.CLOSE:
524
self.kill()
525
526
results_list.show_all()
527
spinner.stop()
528
529
530
if __name__ == "__main__":
531
izvor = Izvor()
532
izvor.window.show_all()
533
izvor.window.present()
534
try:
535
izvor.loop.run_forever()
536
finally:
537
izvor.loop.close()
538