By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 main.py

View raw Download
text/plain • 21.5 kiB
Python script, ASCII text executable
        
            
1
"""
2
Izvor - modular, GTK+ desktop search and launcher
3
Copyright (C) 2024 <root@roundabout-host.com>
4
5
This program is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with this program. If not, see <https://www.gnu.org/licenses/>.
17
"""
18
19
import os
20
import subprocess
21
22
import gi
23
import asyncio
24
import importlib
25
import gbulb
26
from pathlib import Path
27
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
28
29
from xdg.Config import icon_size
30
31
gi.require_version("Gtk", "3.0")
32
from gi.repository import Gtk, Gdk, Pango, GLib
33
import gettext
34
import json
35
import traceback
36
import sys
37
38
_ = gettext.gettext
39
40
_T = TypeVar("_T")
41
42
43
def get_parent_package(path):
44
parent_path = os.path.split(path)[0]
45
while parent_path != "/":
46
if "__init__.py" in os.listdir(parent_path):
47
return Path(parent_path)
48
parent_path = os.path.split(parent_path)[0]
49
return None
50
51
52
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
53
*args, **kwargs):
54
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
55
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
56
57
while tasks:
58
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
59
60
for task in done:
61
iterator = tasks.pop(task)
62
try:
63
result = task.result()
64
yield result
65
tasks[asyncio.create_task(iterator.__anext__())] = iterator
66
except StopAsyncIteration:
67
pass
68
69
70
class ResultInfoWidget(Gtk.ListBoxRow):
71
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True):
72
super().__init__()
73
self.name = name
74
self.description = description
75
self.image = image
76
self.execute = execute
77
78
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
79
self.add(self.box)
80
81
if icon_size:
82
# If the icon size is 0, have no icons
83
if image[0] == "logo":
84
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
85
self.image.set_pixel_size(icon_size)
86
self.box.pack_start(self.image, False, False, 0)
87
88
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
89
self.box.pack_start(self.label_box, True, True, 0)
90
91
self.name_label = Gtk.Label(label=self.name)
92
self.name_label.set_line_wrap(True)
93
self.name_label.set_justify(Gtk.Justification.LEFT)
94
self.name_label.set_halign(Gtk.Align.START)
95
attributes = Pango.AttrList()
96
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
97
self.name_label.set_attributes(attributes)
98
self.label_box.pack_start(self.name_label, True, True, 0)
99
100
if show_description:
101
self.description_label = Gtk.Label(label=self.description)
102
self.description_label.set_line_wrap(True)
103
self.description_label.set_justify(Gtk.Justification.LEFT)
104
self.description_label.set_halign(Gtk.Align.START)
105
self.label_box.pack_start(self.description_label, True, True, 0)
106
107
108
class ProviderExceptionDialog(Gtk.Dialog):
109
def __init__(self, provider: Path, exception: Exception, parent=None, tb=None):
110
super().__init__(title=_("{provider} raised a {exception}".format(provider=get_parent_package(provider), exception=type(exception).__name__)), transient_for=parent)
111
self.add_button(_("Open config"), 0)
112
self.add_button(_("Reset config"), 1)
113
self.add_button(_("Quit app"), Gtk.ResponseType.CLOSE)
114
self.set_default_response(Gtk.ResponseType.CLOSE)
115
self.set_default_size(360, 240)
116
self.set_resizable(True)
117
118
self.provider = provider
119
self.exception = exception
120
121
traceback_text = "\n".join(traceback.format_exception(type(exception), exception, tb))
122
123
self.label = Gtk.Label(label=traceback_text)
124
125
monospaced_font = Pango.FontDescription()
126
monospaced_font.set_family("monospace")
127
self.label.override_font(monospaced_font)
128
self.label.set_line_wrap(True)
129
self.label.set_justify(Gtk.Justification.LEFT)
130
self.label.set_halign(Gtk.Align.START)
131
self.label.set_selectable(True)
132
133
self.get_content_area().add(self.label)
134
self.label.show()
135
136
class Izvor(Gtk.Application):
137
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
138
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
139
USER_PROVIDERS = USER_DATA / "providers"
140
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
141
SYSTEM_CONFIGS = Path("/etc/izvor")
142
SYSTEM_DATA = Path("/usr/share/izvor")
143
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
144
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
145
ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers"
146
APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json"
147
148
def __init__(self):
149
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
150
gbulb.install(gtk=True)
151
self.loop = asyncio.get_event_loop()
152
self.builder = Gtk.Builder()
153
self.builder.add_from_file("izvor.ui")
154
self.window = self.builder.get_object("root")
155
self.window.connect("destroy", self.kill)
156
self.window.connect("key-press-event", self.check_escape)
157
self.window.set_title(_("Launcher"))
158
159
if not os.getenv("XDG_DATA_HOME"):
160
print("XDG_DATA_HOME is not set. Using default path.")
161
if not os.getenv("XDG_CONFIG_HOME"):
162
print("XDG_CONFIG_HOME is not set. Using default path.")
163
if not self.USER_PROVIDER_CONFIGS.exists():
164
print("Creating user config directory.")
165
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
166
if not self.USER_PROVIDERS.exists():
167
print("Creating user data directory.")
168
self.USER_PROVIDERS.mkdir(parents=True)
169
170
self.available_providers = {}
171
172
if self.SYSTEM_PROVIDERS.exists():
173
for provider in self.SYSTEM_PROVIDERS.iterdir():
174
if not (provider / "__init__.py").exists():
175
continue
176
self.available_providers[provider.stem] = provider
177
if self.USER_PROVIDERS.exists():
178
for provider in self.USER_PROVIDERS.iterdir():
179
if not (provider / "__init__.py").exists():
180
continue
181
self.available_providers[provider.stem] = provider
182
183
print(f"Available providers: {list(self.available_providers.keys())}")
184
185
for provider, path in self.available_providers.items():
186
if not (self.USER_PROVIDER_CONFIGS / f"{provider}.json").exists():
187
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "w") as f:
188
spec = importlib.util.spec_from_file_location(provider, path / "__init__.py")
189
module = importlib.util.module_from_spec(spec)
190
spec.loader.exec_module(module)
191
if hasattr(module, "config_template"):
192
json.dump(module.config_template, f)
193
else:
194
json.dump({}, f)
195
196
if not self.ENABLED_PROVIDERS_FILE.exists():
197
self.ENABLED_PROVIDERS_FILE.touch()
198
199
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
200
for provider in self.available_providers:
201
f.write(provider + "\n")
202
203
if not self.APPEARANCE_CONFIG_FILE.exists():
204
self.APPEARANCE_CONFIG_FILE.touch()
205
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
206
json.dump({"icon_size": 32, "show_result_descriptions": True}, f)
207
208
with open(self.APPEARANCE_CONFIG_FILE, "r") as f:
209
appearance_config = json.load(f)
210
self.icon_size = appearance_config.get("icon_size", 32)
211
self.show_result_descriptions = appearance_config.get("show_result_descriptions", True)
212
213
self.permanently_enabled_providers = {}
214
215
with open(self.ENABLED_PROVIDERS_FILE, "r") as f:
216
for line in f:
217
provider = line.strip()
218
if provider in self.available_providers:
219
self.permanently_enabled_providers[provider] = self.available_providers[provider]
220
221
self.enabled_providers = self.permanently_enabled_providers.copy()
222
223
providers_menu = self.builder.get_object("providers-menu")
224
225
self.providers = []
226
self.provider_checkboxes = {}
227
228
for provider in self.permanently_enabled_providers.values():
229
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
230
module = importlib.util.module_from_spec(spec)
231
spec.loader.exec_module(module)
232
with open(self.USER_PROVIDER_CONFIGS / f"{provider.stem}.json", "r") as f:
233
provider_config = json.load(f)
234
loaded_provider = module.Provider(provider_config)
235
self.providers.append(loaded_provider)
236
print(f"Loaded provider: {loaded_provider.name}")
237
print(loaded_provider.description)
238
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
239
providers_menu.append(self.provider_checkboxes[provider.stem])
240
self.provider_checkboxes[provider.stem].set_active(True)
241
self.provider_checkboxes[provider.stem].show()
242
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
243
244
self.update_task = None
245
246
def call_update_results(widget):
247
if self.update_task is not None:
248
self.update_task.cancel()
249
self.update_task = asyncio.create_task(self.update_results(widget))
250
251
def execute_result(widget, row):
252
row.execute()
253
self.kill()
254
255
self.builder.connect_signals(
256
{
257
"update-search": call_update_results,
258
"result-activated": execute_result,
259
"providers-menu-all-toggled": self.update_enabled_providers_all,
260
"menu-about": self.about,
261
"menu-providers": self.provider_menu,
262
"menu-preferences": self.preferences,
263
}
264
)
265
266
GLib.idle_add(self.update_enabled_providers, None)
267
268
def update_enabled_providers(self, widget=None):
269
self.providers = []
270
for provider, checkbox in self.provider_checkboxes.items():
271
if checkbox.get_active():
272
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
273
spec = importlib.util.spec_from_file_location(provider, self.enabled_providers[provider] / "__init__.py")
274
module = importlib.util.module_from_spec(spec)
275
spec.loader.exec_module(module)
276
with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "r") as f:
277
provider_config = json.load(f)
278
loaded_provider = module.Provider(provider_config)
279
self.providers.append(loaded_provider)
280
else:
281
self.enabled_providers.pop(provider, None)
282
283
if self.update_task is not None:
284
self.update_task.cancel()
285
self.update_task = asyncio.create_task(self.update_results(widget))
286
287
def update_enabled_providers_all(self, widget):
288
for checkbox in self.provider_checkboxes.values():
289
checkbox.set_active(widget.get_active())
290
self.update_enabled_providers()
291
292
def kill(self, widget=None):
293
self.loop.stop()
294
self.quit()
295
296
def check_escape(self, widget, event):
297
results_list = self.builder.get_object("results-list")
298
search_entry = self.builder.get_object("search-query")
299
rows = results_list.get_children()
300
current_row = results_list.get_selected_row()
301
302
if event.keyval == Gdk.KEY_Escape:
303
self.kill()
304
return True
305
306
if event.keyval == Gdk.KEY_Down:
307
# Move to the next row
308
if current_row:
309
current_index = rows.index(current_row)
310
next_index = (current_index + 1) % len(rows) # Wrap to the beginning if at the end
311
else:
312
next_index = 0
313
314
results_list.select_row(rows[next_index])
315
search_entry.grab_focus() # Refocus the search entry
316
self.scroll_to_row(rows[next_index], results_list.get_adjustment())
317
return True
318
319
if event.keyval == Gdk.KEY_Up:
320
# Move to the previous row
321
if current_row:
322
current_index = rows.index(current_row)
323
prev_index = (current_index - 1) % len(rows) # Wrap to the end if at the beginning
324
else:
325
prev_index = len(rows) - 1
326
327
results_list.select_row(rows[prev_index])
328
search_entry.grab_focus() # Refocus the search entry
329
self.scroll_to_row(rows[prev_index], results_list.get_adjustment())
330
return True
331
332
if event.keyval == Gdk.KEY_Return:
333
if current_row:
334
# Execute the selected row
335
current_row.activate()
336
return True
337
elif len(rows) > 0:
338
# Execute the first row
339
rows[0].activate()
340
return True
341
342
return False
343
344
def scroll_to_row(self, row, adjustment):
345
row_geometry = row.get_allocation()
346
row_y = row_geometry.y
347
row_height = row_geometry.height
348
adjustment_value = adjustment.get_value()
349
adjustment_upper = adjustment.get_upper() - adjustment.get_page_size()
350
351
if row_y + row_height > adjustment_value + adjustment.get_page_size():
352
adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper))
353
elif row_y < adjustment_value:
354
adjustment.set_value(max(row_y, 0))
355
356
def about(self, widget):
357
about_builder = Gtk.Builder()
358
about_builder.add_from_file("about.ui")
359
about_window = about_builder.get_object("about-dialog")
360
about_window.connect("destroy", lambda _: about_window.destroy())
361
about_window.connect("close", lambda _: about_window.destroy())
362
about_window.connect("response", lambda _, __: about_window.destroy())
363
about_window.show_all()
364
365
def provider_menu(self, widget):
366
providers_builder = Gtk.Builder()
367
providers_builder.add_from_file("providers.ui")
368
providers_window = providers_builder.get_object("providers-window")
369
providers_window.connect("destroy", lambda _: providers_window.destroy())
370
371
providers_table = providers_builder.get_object("providers-table") # actually Gtk.Grid
372
rows = 0
373
for provider in self.available_providers.values():
374
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
375
module = importlib.util.module_from_spec(spec)
376
spec.loader.exec_module(module)
377
loaded_provider = module.Provider({})
378
379
provider_label = Gtk.Label(label=loaded_provider.name)
380
provider_label.set_halign(Gtk.Align.START)
381
provider_label.set_valign(Gtk.Align.CENTER)
382
font_desc = Pango.FontDescription()
383
font_desc.set_weight(Pango.Weight.BOLD)
384
provider_label.override_font(font_desc)
385
provider_label.set_line_wrap(True)
386
provider_label.set_justify(Gtk.Justification.LEFT)
387
388
provider_description = Gtk.Label(label=loaded_provider.description)
389
provider_description.set_halign(Gtk.Align.START)
390
provider_description.set_valign(Gtk.Align.CENTER)
391
provider_description.set_justify(Gtk.Justification.LEFT)
392
provider_description.set_line_wrap(True)
393
394
icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR)
395
icon.set_pixel_size(self.icon_size)
396
397
switch = Gtk.Switch()
398
switch.set_valign(Gtk.Align.CENTER)
399
switch.set_active(provider.stem in self.permanently_enabled_providers)
400
switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem)
401
402
config_button = Gtk.Button.new_with_label(_("Configure"))
403
config_button.connect("clicked", lambda _, stem=provider.stem: subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{stem}.json")]))
404
config_button.set_relief(Gtk.ReliefStyle.NONE)
405
406
providers_table.attach(icon, 0, rows, 1, 1)
407
providers_table.attach(provider_label, 1, rows, 1, 1)
408
providers_table.attach(provider_description, 2, rows, 1, 1)
409
providers_table.attach(switch, 3, rows, 1, 1)
410
providers_table.attach(config_button, 4, rows, 1, 1)
411
412
provider_label.show()
413
provider_description.show()
414
415
rows += 1
416
417
providers_window.show_all()
418
419
def preferences(self, widget):
420
preferences_builder = Gtk.Builder()
421
preferences_builder.add_from_file("preferences.ui")
422
preferences_window = preferences_builder.get_object("preferences-window")
423
preferences_window.connect("destroy", lambda _: preferences_window.destroy())
424
425
def update_appearance_preferences(widget, *args):
426
self.icon_size = preferences_builder.get_object("icon-size").get_value()
427
self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active()
428
429
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
430
json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f)
431
432
# Regenerate the results to reflect the changes
433
if self.update_task is not None:
434
self.update_task.cancel()
435
self.update_task = asyncio.create_task(self.update_results(widget))
436
437
preferences_builder.connect_signals(
438
{
439
"update-appearance": update_appearance_preferences,
440
}
441
)
442
443
preferences_window.show_all()
444
445
def update_permanently_enabled_providers(self, widget, param, provider):
446
if widget.get_active():
447
self.permanently_enabled_providers[provider] = self.available_providers[provider]
448
else:
449
self.permanently_enabled_providers.pop(provider, None)
450
451
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
452
f.write("\n".join(self.permanently_enabled_providers.keys()))
453
454
455
async def update_results(self, widget):
456
print("Updating results...")
457
spinner = self.builder.get_object("spinner")
458
spinner.start()
459
generators = []
460
query = self.builder.get_object("search-query-buffer").get_text()
461
# print(f"Query: {query}")
462
for provider in self.providers:
463
generators.append(provider.search)
464
465
# Clear the results list
466
results_list = self.builder.get_object("results-list")
467
for row in results_list.get_children():
468
results_list.remove(row)
469
470
try:
471
async for result in merge_generators_with_params(generators, query):
472
# print(result)
473
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions)
474
results_list.add(result_box)
475
result_box.show_all()
476
except Exception as e:
477
exception_type, exception, tb = sys.exc_info()
478
filename, line_number, function_name, line = traceback.extract_tb(tb)[-1]
479
print(f"{filename} raised an exception!")
480
print(f"{type(e).__name__}: {str(e)}")
481
dialog = ProviderExceptionDialog(Path(filename), e, self.window, tb=tb)
482
dialog.show_all()
483
response = dialog.run()
484
if response == 0:
485
# Open config
486
subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
487
488
self.kill()
489
elif response == 1:
490
# Reset config
491
spec = importlib.util.spec_from_file_location(get_parent_package(filename).stem, Path(filename))
492
module = importlib.util.module_from_spec(spec)
493
spec.loader.exec_module(module)
494
495
if hasattr(module, "config_template"):
496
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
497
json.dump(module.config_template, f)
498
else:
499
with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
500
json.dump({}, f)
501
502
self.kill()
503
elif response == Gtk.ResponseType.CLOSE:
504
self.kill()
505
506
results_list.show_all()
507
spinner.stop()
508
509
510
if __name__ == "__main__":
511
izvor = Izvor()
512
izvor.window.show_all()
513
try:
514
izvor.loop.run_forever()
515
finally:
516
izvor.loop.close()
517