By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 __init__.py

View raw Download
text/x-script.python • 22.22 kiB
Python script, ASCII text executable
        
            
1
#!/usr/bin/env python3
2
3
"""
4
Izvor - modular, GTK+ desktop search and launcher
5
Copyright (C) 2024 <root@roundabout-host.com>
6
7
This program is free software: you can redistribute it and/or modify
8
it under the terms of the GNU General Public License as published by
9
the Free Software Foundation, either version 3 of the License, or
10
(at your option) any later version.
11
12
This program is distributed in the hope that it will be useful,
13
but WITHOUT ANY WARRANTY; without even the implied warranty of
14
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15
GNU General Public License for more details.
16
17
You should have received a copy of the GNU General Public License
18
along with this program. If not, see <https://www.gnu.org/licenses/>.
19
"""
20
21
import os
22
import subprocess
23
24
import gi
25
from gi.events import GLibEventLoopPolicy
26
import asyncio
27
import importlib
28
from pathlib import Path
29
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
30
31
gi.require_version("Gtk", "3.0")
32
from gi.repository import Gtk, Gdk, Pango, GLib
33
import gettext
34
import json
35
import traceback
36
import sys
37
38
_ = gettext.gettext
39
40
_T = TypeVar("_T")
41
42
PACKAGE_DIRECTORY = Path(__file__).resolve().parent
43
44
def get_parent_package(path):
45
parent_path = os.path.split(path)[0]
46
while parent_path != "/":
47
if "__init__.py" in os.listdir(parent_path):
48
return Path(parent_path)
49
parent_path = os.path.split(parent_path)[0]
50
return None
51
52
53
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
54
*args, **kwargs):
55
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
56
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
57
58
while tasks:
59
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
60
61
for task in done:
62
iterator = tasks.pop(task)
63
try:
64
result = task.result()
65
yield result
66
tasks[asyncio.create_task(iterator.__anext__())] = iterator
67
except StopAsyncIteration:
68
pass
69
70
71
class ResultInfoWidget(Gtk.ListBoxRow):
72
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True):
73
super().__init__()
74
self.name = name
75
self.description = description
76
self.image = image
77
self.execute = execute
78
79
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
80
self.add(self.box)
81
82
if icon_size:
83
# If the icon size is 0, have no icons
84
if image[0] == "logo":
85
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
86
self.image.set_pixel_size(icon_size)
87
self.box.pack_start(self.image, False, False, 0)
88
89
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
90
self.box.pack_start(self.label_box, True, True, 0)
91
92
self.name_label = Gtk.Label(label=self.name)
93
self.name_label.set_line_wrap(True)
94
self.name_label.set_justify(Gtk.Justification.LEFT)
95
self.name_label.set_halign(Gtk.Align.START)
96
attributes = Pango.AttrList()
97
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
98
self.name_label.set_attributes(attributes)
99
self.label_box.pack_start(self.name_label, True, True, 0)
100
101
if show_description:
102
self.description_label = Gtk.Label(label=self.description)
103
self.description_label.set_line_wrap(True)
104
self.description_label.set_justify(Gtk.Justification.LEFT)
105
self.description_label.set_halign(Gtk.Align.START)
106
self.label_box.pack_start(self.description_label, True, True, 0)
107
108
109
class ProviderExceptionDialog(Gtk.Dialog):
110
def __init__(self, provider: Path, exception: Exception, parent=None, tb=None):
111
super().__init__(title=_("{provider} raised a {exception}".format(provider=get_parent_package(provider), exception=type(exception).__name__)), transient_for=parent)
112
self.add_button(_("Open config"), 0)
113
self.add_button(_("Reset config"), 1)
114
self.add_button(_("Quit app"), Gtk.ResponseType.CLOSE)
115
self.set_default_response(Gtk.ResponseType.CLOSE)
116
self.set_default_size(360, 240)
117
self.set_resizable(True)
118
119
self.provider = provider
120
self.exception = exception
121
122
traceback_text = "\n".join(traceback.format_exception(type(exception), exception, tb))
123
124
self.label = Gtk.Label(label=traceback_text)
125
126
monospaced_font = Pango.FontDescription()
127
monospaced_font.set_family("monospace")
128
self.label.override_font(monospaced_font)
129
self.label.set_line_wrap(True)
130
self.label.set_justify(Gtk.Justification.LEFT)
131
self.label.set_halign(Gtk.Align.START)
132
self.label.set_selectable(True)
133
134
self.get_content_area().add(self.label)
135
self.label.show()
136
137
class Izvor(Gtk.Application):
138
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
139
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
140
USER_PROVIDERS = USER_DATA / "providers"
141
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
142
SYSTEM_CONFIGS = Path("/etc/izvor")
143
SYSTEM_DATA = Path("/usr/share/izvor")
144
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
145
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
146
ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers"
147
APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json"
148
149
def __init__(self):
150
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
151
asyncio.set_event_loop_policy(GLibEventLoopPolicy())
152
self.loop = asyncio.get_event_loop()
153
self.builder = Gtk.Builder()
154
self.builder.add_from_file(str(PACKAGE_DIRECTORY / "izvor.ui"))
155
self.window = self.builder.get_object("root")
156
self.window.connect("destroy", self.kill)
157
self.window.connect("key-press-event", self.check_escape)
158
self.window.set_title(_("Launcher"))
159
160
if not os.getenv("XDG_DATA_HOME"):
161
print("XDG_DATA_HOME is not set. Using default path.")
162
if not os.getenv("XDG_CONFIG_HOME"):
163
print("XDG_CONFIG_HOME is not set. Using default path.")
164
if not self.USER_PROVIDER_CONFIGS.exists():
165
print("Creating user config directory.")
166
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
167
if not self.USER_PROVIDERS.exists():
168
print("Creating user data directory.")
169
self.USER_PROVIDERS.mkdir(parents=True)
170
171
self.available_providers = {}
172
173
if self.SYSTEM_PROVIDERS.exists():
174
for provider in self.SYSTEM_PROVIDERS.iterdir():
175
if not (provider / "__init__.py").exists():
176
continue
177
self.available_providers[provider.stem] = provider
178
if self.USER_PROVIDERS.exists():
179
for provider in self.USER_PROVIDERS.iterdir():
180
if not (provider / "__init__.py").exists():
181
continue
182
self.available_providers[provider.stem] = provider
183
184
print(f"Available providers: {list(self.available_providers.keys())}")
185
186
for provider, path in self.available_providers.items():
187
if not (self.USER_PROVIDER_CONFIGS / f"{provider}.json").exists():
188
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "w") as f:
189
spec = importlib.util.spec_from_file_location(provider, path / "__init__.py")
190
module = importlib.util.module_from_spec(spec)
191
spec.loader.exec_module(module)
192
if hasattr(module, "config_template"):
193
json.dump(module.config_template, f)
194
else:
195
json.dump({}, f)
196
197
if not self.ENABLED_PROVIDERS_FILE.exists():
198
self.ENABLED_PROVIDERS_FILE.touch()
199
200
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
201
for provider in self.available_providers:
202
f.write(provider + "\n")
203
204
if not self.APPEARANCE_CONFIG_FILE.exists():
205
self.APPEARANCE_CONFIG_FILE.touch()
206
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
207
json.dump({"icon_size": 32, "show_result_descriptions": True}, f)
208
209
with open(self.APPEARANCE_CONFIG_FILE, "r") as f:
210
appearance_config = json.load(f)
211
self.icon_size = appearance_config.get("icon_size", 32)
212
self.show_result_descriptions = appearance_config.get("show_result_descriptions", True)
213
214
self.permanently_enabled_providers = {}
215
216
with open(self.ENABLED_PROVIDERS_FILE, "r") as f:
217
for line in f:
218
provider = line.strip()
219
if provider in self.available_providers:
220
self.permanently_enabled_providers[provider] = self.available_providers[provider]
221
222
self.enabled_providers = self.permanently_enabled_providers.copy()
223
224
providers_menu = self.builder.get_object("providers-menu")
225
226
self.providers = []
227
self.provider_checkboxes = {}
228
229
for provider in self.permanently_enabled_providers.values():
230
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
231
module = importlib.util.module_from_spec(spec)
232
spec.loader.exec_module(module)
233
with open(self.USER_PROVIDER_CONFIGS / f"{provider.stem}.json", "r") as f:
234
provider_config = json.load(f)
235
loaded_provider = module.Provider(provider_config)
236
self.providers.append(loaded_provider)
237
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
238
providers_menu.append(self.provider_checkboxes[provider.stem])
239
self.provider_checkboxes[provider.stem].set_active(True)
240
self.provider_checkboxes[provider.stem].show()
241
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
242
243
self.update_task = None
244
245
def call_update_results(widget):
246
if self.update_task is not None:
247
self.update_task.cancel()
248
self.update_task = asyncio.create_task(self.update_results(widget))
249
250
def execute_result(widget, row):
251
row.execute()
252
self.kill()
253
254
self.builder.connect_signals(
255
{
256
"update-search": call_update_results,
257
"result-activated": execute_result,
258
"providers-menu-all-toggled": self.update_enabled_providers_all,
259
"menu-about": self.about,
260
"menu-providers": self.provider_menu,
261
"menu-preferences": self.preferences,
262
}
263
)
264
265
GLib.idle_add(self.update_enabled_providers, None)
266
267
def update_enabled_providers(self, widget=None):
268
self.providers = []
269
for provider, checkbox in self.provider_checkboxes.items():
270
if checkbox.get_active():
271
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
272
spec = importlib.util.spec_from_file_location(provider, self.enabled_providers[provider] / "__init__.py")
273
module = importlib.util.module_from_spec(spec)
274
spec.loader.exec_module(module)
275
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "r") as f:
276
provider_config = json.load(f)
277
loaded_provider = module.Provider(provider_config)
278
self.providers.append(loaded_provider)
279
else:
280
self.enabled_providers.pop(provider, None)
281
282
if self.update_task is not None:
283
self.update_task.cancel()
284
self.update_task = asyncio.create_task(self.update_results(widget))
285
286
def update_enabled_providers_all(self, widget):
287
for checkbox in self.provider_checkboxes.values():
288
checkbox.set_active(widget.get_active())
289
self.update_enabled_providers()
290
291
def kill(self, widget=None):
292
self.loop.stop()
293
self.quit()
294
295
def check_escape(self, widget, event):
296
results_list = self.builder.get_object("results-list")
297
search_entry = self.builder.get_object("search-query")
298
rows = results_list.get_children()
299
current_row = results_list.get_selected_row()
300
301
if event.keyval == Gdk.KEY_Escape:
302
self.kill()
303
return True
304
305
if event.keyval == Gdk.KEY_Down:
306
# Move to the next row
307
if current_row:
308
current_index = rows.index(current_row)
309
next_index = (current_index + 1) % len(rows) # Wrap to the beginning if at the end
310
else:
311
next_index = 0
312
313
results_list.select_row(rows[next_index])
314
search_entry.grab_focus() # Refocus the search entry
315
self.scroll_to_row(rows[next_index], results_list.get_adjustment())
316
return True
317
318
if event.keyval == Gdk.KEY_Up:
319
# Move to the previous row
320
if current_row:
321
current_index = rows.index(current_row)
322
prev_index = (current_index - 1) % len(rows) # Wrap to the end if at the beginning
323
else:
324
prev_index = len(rows) - 1
325
326
results_list.select_row(rows[prev_index])
327
search_entry.grab_focus() # Refocus the search entry
328
self.scroll_to_row(rows[prev_index], results_list.get_adjustment())
329
return True
330
331
if event.keyval == Gdk.KEY_Return:
332
if current_row:
333
# Execute the selected row
334
current_row.activate()
335
return True
336
elif len(rows) > 0:
337
# Execute the first row
338
rows[0].activate()
339
return True
340
341
return False
342
343
def scroll_to_row(self, row, adjustment):
344
row_geometry = row.get_allocation()
345
row_y = row_geometry.y
346
row_height = row_geometry.height
347
adjustment_value = adjustment.get_value()
348
adjustment_upper = adjustment.get_upper() - adjustment.get_page_size()
349
350
if row_y + row_height > adjustment_value + adjustment.get_page_size():
351
adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper))
352
elif row_y < adjustment_value:
353
adjustment.set_value(max(row_y, 0))
354
355
def about(self, widget):
356
about_builder = Gtk.Builder()
357
about_builder.add_from_file(str(PACKAGE_DIRECTORY / "about.ui"))
358
about_window = about_builder.get_object("about-dialog")
359
about_window.connect("destroy", lambda _: about_window.destroy())
360
about_window.connect("close", lambda _: about_window.destroy())
361
about_window.connect("response", lambda _, __: about_window.destroy())
362
about_window.show_all()
363
364
def provider_menu(self, widget):
365
providers_builder = Gtk.Builder()
366
providers_builder.add_from_file(str(PACKAGE_DIRECTORY / "providers.ui"))
367
providers_window = providers_builder.get_object("providers-window")
368
providers_window.connect("destroy", lambda _: providers_window.destroy())
369
370
providers_table = providers_builder.get_object("providers-table") # actually Gtk.Grid
371
rows = 0
372
for provider in self.available_providers.values():
373
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
374
module = importlib.util.module_from_spec(spec)
375
spec.loader.exec_module(module)
376
loaded_provider = module.Provider({})
377
378
provider_label = Gtk.Label(label=loaded_provider.name)
379
provider_label.set_halign(Gtk.Align.START)
380
provider_label.set_valign(Gtk.Align.CENTER)
381
font_desc = Pango.FontDescription()
382
font_desc.set_weight(Pango.Weight.BOLD)
383
provider_label.override_font(font_desc)
384
provider_label.set_line_wrap(True)
385
provider_label.set_justify(Gtk.Justification.LEFT)
386
387
provider_description = Gtk.Label(label=loaded_provider.description)
388
provider_description.set_halign(Gtk.Align.START)
389
provider_description.set_valign(Gtk.Align.CENTER)
390
provider_description.set_justify(Gtk.Justification.LEFT)
391
provider_description.set_line_wrap(True)
392
393
icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR)
394
icon.set_pixel_size(self.icon_size)
395
396
switch = Gtk.Switch()
397
switch.set_valign(Gtk.Align.CENTER)
398
switch.set_active(provider.stem in self.permanently_enabled_providers)
399
switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem)
400
401
config_button = Gtk.Button.new_with_label(_("Configure"))
402
config_button.connect("clicked", lambda _, stem=provider.stem: subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{stem}.json")]))
403
config_button.set_relief(Gtk.ReliefStyle.NONE)
404
405
providers_table.attach(icon, 0, rows, 1, 1)
406
providers_table.attach(provider_label, 1, rows, 1, 1)
407
providers_table.attach(provider_description, 2, rows, 1, 1)
408
providers_table.attach(switch, 3, rows, 1, 1)
409
providers_table.attach(config_button, 4, rows, 1, 1)
410
411
provider_label.show()
412
provider_description.show()
413
414
rows += 1
415
416
def open_provider_directory(widget):
417
provider_directory = str(self.USER_PROVIDERS)
418
if os.getenv("FLATPAK_SANDBOX_DIR"):
419
subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", provider_directory])
420
else:
421
subprocess.Popen(["xdg-open", provider_directory])
422
423
providers_builder.connect_signals(
424
{
425
"open-provider-directory": open_provider_directory,
426
}
427
)
428
429
providers_window.show_all()
430
431
def preferences(self, widget):
432
preferences_builder = Gtk.Builder()
433
preferences_builder.add_from_file(str(PACKAGE_DIRECTORY / "preferences.ui"))
434
preferences_window = preferences_builder.get_object("preferences-window")
435
preferences_window.connect("destroy", lambda _: preferences_window.destroy())
436
437
def update_appearance_preferences(widget, *args):
438
self.icon_size = preferences_builder.get_object("icon-size").get_value()
439
self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active()
440
441
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
442
json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f)
443
444
# Regenerate the results to reflect the changes
445
if self.update_task is not None:
446
self.update_task.cancel()
447
self.update_task = asyncio.create_task(self.update_results(widget))
448
449
preferences_builder.connect_signals(
450
{
451
"update-appearance": update_appearance_preferences,
452
}
453
)
454
455
preferences_window.show_all()
456
457
def update_permanently_enabled_providers(self, widget, param, provider):
458
if widget.get_active():
459
self.permanently_enabled_providers[provider] = self.available_providers[provider]
460
else:
461
self.permanently_enabled_providers.pop(provider, None)
462
463
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
464
f.write("\n".join(self.permanently_enabled_providers.keys()))
465
466
467
async def update_results(self, widget):
468
spinner = self.builder.get_object("spinner")
469
spinner.start()
470
generators = []
471
query = self.builder.get_object("search-query-buffer").get_text()
472
for provider in self.providers:
473
generators.append(provider.search)
474
475
# Clear the results list
476
results_list = self.builder.get_object("results-list")
477
for row in results_list.get_children():
478
results_list.remove(row)
479
480
try:
481
async for result in merge_generators_with_params(generators, query):
482
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions)
483
results_list.add(result_box)
484
result_box.show_all()
485
except Exception as e:
486
exception_type, exception, tb = sys.exc_info()
487
filename, line_number, function_name, line = traceback.extract_tb(tb)[-1]
488
print(f"{filename} raised an exception!")
489
print(f"{type(e).__name__}: {str(e)}")
490
dialog = ProviderExceptionDialog(Path(filename), e, self.window, tb=tb)
491
dialog.show_all()
492
response = dialog.run()
493
if response == 0:
494
# Open config
495
if os.getenv("FLATPAK_SANDBOX_DIR"):
496
subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
497
else:
498
subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
499
500
self.kill()
501
elif response == 1:
502
# Reset config
503
spec = importlib.util.spec_from_file_location(get_parent_package(filename).stem, Path(filename))
504
module = importlib.util.module_from_spec(spec)
505
spec.loader.exec_module(module)
506
507
if hasattr(module, "config_template"):
508
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
509
json.dump(module.config_template, f)
510
else:
511
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
512
json.dump({}, f)
513
514
self.kill()
515
elif response == Gtk.ResponseType.CLOSE:
516
self.kill()
517
518
results_list.show_all()
519
spinner.stop()
520
521
522
if __name__ == "__main__":
523
izvor = Izvor()
524
izvor.window.show_all()
525
izvor.window.present()
526
try:
527
izvor.loop.run_forever()
528
finally:
529
izvor.loop.close()
530