__init__.py
Python script, ASCII text executable
1#!/usr/bin/env python3 2 3""" 4Izvor - modular, GTK+ desktop search and launcher 5Copyright (C) 2024 <root@roundabout-host.com> 6 7This program is free software: you can redistribute it and/or modify 8it under the terms of the GNU General Public License as published by 9the Free Software Foundation, either version 3 of the License, or 10(at your option) any later version. 11 12This program is distributed in the hope that it will be useful, 13but WITHOUT ANY WARRANTY; without even the implied warranty of 14MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 15GNU General Public License for more details. 16 17You should have received a copy of the GNU General Public License 18along with this program. If not, see <https://www.gnu.org/licenses/>. 19""" 20 21import os 22import subprocess 23 24import gi 25from gi.events import GLibEventLoopPolicy 26import asyncio 27import importlib 28from pathlib import Path 29from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any 30 31gi.require_version("Gtk", "3.0") 32gi.require_version("Pango", "1.0") 33from gi.repository import Gtk, Gdk, Pango, GLib 34import gettext 35import json 36import traceback 37import sys 38 39_ = gettext.gettext 40 41_T = TypeVar("_T") 42 43PACKAGE_DIRECTORY = Path(__file__).resolve().parent 44 45def get_parent_package(path): 46parent_path = os.path.split(path)[0] 47while parent_path != "/": 48if "__init__.py" in os.listdir(parent_path): 49return Path(parent_path) 50parent_path = os.path.split(parent_path)[0] 51return None 52 53 54async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]], 55*args, **kwargs): 56iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs] 57tasks = {asyncio.create_task(it.__anext__()): it for it in iterators} 58 59while tasks: 60done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) 61 62for task in done: 63iterator = tasks.pop(task) 64try: 65result = task.result() 66yield result 67tasks[asyncio.create_task(iterator.__anext__())] = iterator 68except StopAsyncIteration: 69pass 70 71 72class ResultInfoWidget(Gtk.ListBoxRow): 73def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True): 74super().__init__() 75self.name = name 76self.description = description 77self.image = image 78self.execute = execute 79 80self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6) 81self.add(self.box) 82 83if icon_size: 84# If the icon size is 0, have no icons 85if image[0] == "logo": 86self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR) 87self.image.set_pixel_size(icon_size) 88self.box.pack_start(self.image, False, False, 0) 89 90self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) 91self.label_box.set_valign(Gtk.Align.START) 92self.box.pack_start(self.label_box, True, True, 0) 93 94self.name_label = Gtk.Label(label=self.name) 95self.name_label.set_line_wrap(True) 96self.name_label.set_justify(Gtk.Justification.LEFT) 97self.name_label.set_halign(Gtk.Align.START) 98self.name_label.set_line_wrap_mode(Pango.WrapMode.WORD_CHAR) 99attributes = Pango.AttrList() 100attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE)) 101self.name_label.set_attributes(attributes) 102self.label_box.pack_start(self.name_label, True, True, 0) 103 104if show_description: 105self.description_label = Gtk.Label(label=self.description) 106self.description_label.set_line_wrap(True) 107self.description_label.set_justify(Gtk.Justification.LEFT) 108self.description_label.set_halign(Gtk.Align.START) 109self.description_label.set_line_wrap_mode(Pango.WrapMode.WORD_CHAR) 110self.label_box.pack_start(self.description_label, True, True, 0) 111 112 113class ProviderExceptionDialog(Gtk.Dialog): 114def __init__(self, provider: Path, exception: Exception, parent=None, tb=None): 115super().__init__(title=_("{provider} raised a {exception}".format(provider=get_parent_package(provider), exception=type(exception).__name__)), transient_for=parent) 116self.add_button(_("Open config"), 0) 117self.add_button(_("Reset config"), 1) 118self.add_button(_("Quit app"), Gtk.ResponseType.CLOSE) 119self.set_default_response(Gtk.ResponseType.CLOSE) 120self.set_default_size(360, 240) 121self.set_resizable(True) 122 123self.provider = provider 124self.exception = exception 125 126traceback_text = "\n".join(traceback.format_exception(type(exception), exception, tb)) 127 128self.label = Gtk.Label(label=traceback_text) 129 130monospaced_font = Pango.FontDescription() 131monospaced_font.set_family("monospace") 132self.label.override_font(monospaced_font) 133self.label.set_line_wrap(True) 134self.label.set_justify(Gtk.Justification.LEFT) 135self.label.set_halign(Gtk.Align.START) 136self.label.set_selectable(True) 137 138self.get_content_area().add(self.label) 139self.label.show() 140 141class Izvor(Gtk.Application): 142USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor" 143USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor" 144USER_PROVIDERS = USER_DATA / "providers" 145USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers" 146SYSTEM_CONFIGS = Path("/etc/izvor") 147SYSTEM_DATA = Path("/usr/share/izvor") 148SYSTEM_PROVIDERS = SYSTEM_DATA / "providers" 149SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers" 150ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers" 151APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json" 152 153def __init__(self): 154super().__init__(application_id="com.roundabout-host.roundabout.izvor") 155asyncio.set_event_loop_policy(GLibEventLoopPolicy()) 156self.loop = asyncio.get_event_loop() 157self.builder = Gtk.Builder() 158self.builder.add_from_file(str(PACKAGE_DIRECTORY / "izvor.ui")) 159self.window = self.builder.get_object("root") 160self.window.connect("destroy", self.kill) 161self.window.connect("key-press-event", self.check_escape) 162self.window.set_title(_("Launcher")) 163 164if not os.getenv("XDG_DATA_HOME"): 165print("XDG_DATA_HOME is not set. Using default path.") 166if not os.getenv("XDG_CONFIG_HOME"): 167print("XDG_CONFIG_HOME is not set. Using default path.") 168if not self.USER_PROVIDER_CONFIGS.exists(): 169print("Creating user config directory.") 170self.USER_PROVIDER_CONFIGS.mkdir(parents=True) 171if not self.USER_PROVIDERS.exists(): 172print("Creating user data directory.") 173self.USER_PROVIDERS.mkdir(parents=True) 174 175self.available_providers = {} 176 177if self.SYSTEM_PROVIDERS.exists(): 178for provider in self.SYSTEM_PROVIDERS.iterdir(): 179if not (provider / "__init__.py").exists(): 180continue 181self.available_providers[provider.stem] = provider 182if self.USER_PROVIDERS.exists(): 183for provider in self.USER_PROVIDERS.iterdir(): 184if not (provider / "__init__.py").exists(): 185continue 186self.available_providers[provider.stem] = provider 187 188print(f"Available providers: {list(self.available_providers.keys())}") 189 190for provider, path in self.available_providers.items(): 191if not (self.USER_PROVIDER_CONFIGS / f"{provider}.json").exists(): 192with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "w") as f: 193spec = importlib.util.spec_from_file_location(provider, path / "__init__.py") 194module = importlib.util.module_from_spec(spec) 195spec.loader.exec_module(module) 196if hasattr(module, "config_template"): 197json.dump(module.config_template, f) 198else: 199json.dump({}, f) 200 201if not self.ENABLED_PROVIDERS_FILE.exists(): 202self.ENABLED_PROVIDERS_FILE.touch() 203 204with open(self.ENABLED_PROVIDERS_FILE, "w") as f: 205for provider in self.available_providers: 206f.write(provider + "\n") 207 208if not self.APPEARANCE_CONFIG_FILE.exists(): 209self.APPEARANCE_CONFIG_FILE.touch() 210with open(self.APPEARANCE_CONFIG_FILE, "w") as f: 211json.dump({"icon_size": 32, "show_result_descriptions": True}, f) 212 213with open(self.APPEARANCE_CONFIG_FILE, "r") as f: 214appearance_config = json.load(f) 215self.icon_size = appearance_config.get("icon_size", 32) 216self.show_result_descriptions = appearance_config.get("show_result_descriptions", True) 217 218self.permanently_enabled_providers = {} 219 220with open(self.ENABLED_PROVIDERS_FILE, "r") as f: 221for line in f: 222provider = line.strip() 223if provider in self.available_providers: 224self.permanently_enabled_providers[provider] = self.available_providers[provider] 225 226self.enabled_providers = self.permanently_enabled_providers.copy() 227 228providers_menu = self.builder.get_object("providers-menu") 229 230self.providers = [] 231self.provider_checkboxes = {} 232 233for provider in self.permanently_enabled_providers.values(): 234spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py") 235module = importlib.util.module_from_spec(spec) 236spec.loader.exec_module(module) 237with open(self.USER_PROVIDER_CONFIGS / f"{provider.stem}.json", "r") as f: 238provider_config = json.load(f) 239loaded_provider = module.Provider(provider_config) 240self.providers.append(loaded_provider) 241self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name) 242providers_menu.append(self.provider_checkboxes[provider.stem]) 243self.provider_checkboxes[provider.stem].set_active(True) 244self.provider_checkboxes[provider.stem].show() 245self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers) 246 247self.update_task = None 248 249def call_update_results(widget): 250if self.update_task is not None: 251self.update_task.cancel() 252self.update_task = asyncio.create_task(self.update_results(widget)) 253 254def execute_result(widget, row): 255row.execute() 256self.kill() 257 258self.builder.connect_signals( 259{ 260"update-search": call_update_results, 261"result-activated": execute_result, 262"providers-menu-all-toggled": self.update_enabled_providers_all, 263"menu-about": self.about, 264"menu-providers": self.provider_menu, 265"menu-preferences": self.preferences, 266} 267) 268 269GLib.idle_add(self.update_enabled_providers, None) 270 271def update_enabled_providers(self, widget=None): 272self.providers = [] 273for provider, checkbox in self.provider_checkboxes.items(): 274if checkbox.get_active(): 275self.enabled_providers[provider] = self.permanently_enabled_providers[provider] 276spec = importlib.util.spec_from_file_location(provider, self.enabled_providers[provider] / "__init__.py") 277module = importlib.util.module_from_spec(spec) 278spec.loader.exec_module(module) 279with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "r") as f: 280provider_config = json.load(f) 281loaded_provider = module.Provider(provider_config) 282self.providers.append(loaded_provider) 283else: 284self.enabled_providers.pop(provider, None) 285 286if self.update_task is not None: 287self.update_task.cancel() 288self.update_task = asyncio.create_task(self.update_results(widget)) 289 290def update_enabled_providers_all(self, widget): 291for checkbox in self.provider_checkboxes.values(): 292checkbox.set_active(widget.get_active()) 293self.update_enabled_providers() 294 295def kill(self, widget=None): 296self.loop.stop() 297self.quit() 298 299def check_escape(self, widget, event): 300results_list = self.builder.get_object("results-list") 301search_entry = self.builder.get_object("search-query") 302rows = results_list.get_children() 303current_row = results_list.get_selected_row() 304 305if event.keyval == Gdk.KEY_Escape: 306self.kill() 307return True 308 309if event.keyval == Gdk.KEY_Down: 310# Move to the next row 311if current_row: 312current_index = rows.index(current_row) 313next_index = (current_index + 1) % len(rows) # Wrap to the beginning if at the end 314else: 315next_index = 0 316 317results_list.select_row(rows[next_index]) 318search_entry.grab_focus() # Refocus the search entry 319self.scroll_to_row(rows[next_index], results_list.get_adjustment()) 320return True 321 322if event.keyval == Gdk.KEY_Up: 323# Move to the previous row 324if current_row: 325current_index = rows.index(current_row) 326prev_index = (current_index - 1) % len(rows) # Wrap to the end if at the beginning 327else: 328prev_index = len(rows) - 1 329 330results_list.select_row(rows[prev_index]) 331search_entry.grab_focus() # Refocus the search entry 332self.scroll_to_row(rows[prev_index], results_list.get_adjustment()) 333return True 334 335if event.keyval == Gdk.KEY_Return: 336if current_row: 337# Execute the selected row 338current_row.activate() 339return True 340elif len(rows) > 0: 341# Execute the first row 342rows[0].activate() 343return True 344 345return False 346 347def scroll_to_row(self, row, adjustment): 348row_geometry = row.get_allocation() 349row_y = row_geometry.y 350row_height = row_geometry.height 351adjustment_value = adjustment.get_value() 352adjustment_upper = adjustment.get_upper() - adjustment.get_page_size() 353 354if row_y + row_height > adjustment_value + adjustment.get_page_size(): 355adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper)) 356elif row_y < adjustment_value: 357adjustment.set_value(max(row_y, 0)) 358 359def about(self, widget): 360about_builder = Gtk.Builder() 361about_builder.add_from_file(str(PACKAGE_DIRECTORY / "about.ui")) 362about_window = about_builder.get_object("about-dialog") 363about_window.connect("destroy", lambda _: about_window.destroy()) 364about_window.connect("close", lambda _: about_window.destroy()) 365about_window.connect("response", lambda _, __: about_window.destroy()) 366about_window.show_all() 367 368def provider_menu(self, widget): 369providers_builder = Gtk.Builder() 370providers_builder.add_from_file(str(PACKAGE_DIRECTORY / "providers.ui")) 371providers_window = providers_builder.get_object("providers-window") 372providers_window.connect("destroy", lambda _: providers_window.destroy()) 373 374providers_table = providers_builder.get_object("providers-table") # actually Gtk.Grid 375rows = 0 376for provider in self.available_providers.values(): 377spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py") 378module = importlib.util.module_from_spec(spec) 379spec.loader.exec_module(module) 380loaded_provider = module.Provider({}) 381 382provider_label = Gtk.Label(label=loaded_provider.name) 383provider_label.set_halign(Gtk.Align.START) 384provider_label.set_valign(Gtk.Align.CENTER) 385font_desc = Pango.FontDescription() 386font_desc.set_weight(Pango.Weight.BOLD) 387provider_label.override_font(font_desc) 388provider_label.set_line_wrap(True) 389provider_label.set_justify(Gtk.Justification.LEFT) 390 391provider_description = Gtk.Label(label=loaded_provider.description) 392provider_description.set_halign(Gtk.Align.START) 393provider_description.set_valign(Gtk.Align.CENTER) 394provider_description.set_justify(Gtk.Justification.LEFT) 395provider_description.set_line_wrap(True) 396 397icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR) 398icon.set_pixel_size(self.icon_size) 399 400switch = Gtk.Switch() 401switch.set_valign(Gtk.Align.CENTER) 402switch.set_active(provider.stem in self.permanently_enabled_providers) 403switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem) 404 405config_button = Gtk.Button.new_with_label(_("Configure")) 406def open_config(widget, stem=provider.stem): 407# User-accessible provider config, not Flatpak sandboxed 408config_path = str(self.USER_PROVIDER_CONFIGS / f"{stem}.json") 409if os.getenv("FLATPAK_SANDBOX_DIR"): 410subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", config_path]) 411else: 412subprocess.Popen(["xdg-open", config_path]) 413config_button.connect("clicked", open_config) 414config_button.set_relief(Gtk.ReliefStyle.NONE) 415 416providers_table.attach(icon, 0, rows, 1, 1) 417providers_table.attach(provider_label, 1, rows, 1, 1) 418providers_table.attach(provider_description, 2, rows, 1, 1) 419providers_table.attach(switch, 3, rows, 1, 1) 420providers_table.attach(config_button, 4, rows, 1, 1) 421 422provider_label.show() 423provider_description.show() 424 425rows += 1 426 427def open_provider_directory(widget): 428# User-accessible provider directory, not Flatpak sandboxed 429provider_directory = str(self.USER_PROVIDERS) 430if os.getenv("FLATPAK_SANDBOX_DIR"): 431subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", provider_directory]) 432else: 433subprocess.Popen(["xdg-open", provider_directory]) 434 435providers_builder.connect_signals( 436{ 437"open-provider-directory": open_provider_directory, 438} 439) 440 441providers_window.show_all() 442 443def preferences(self, widget): 444preferences_builder = Gtk.Builder() 445preferences_builder.add_from_file(str(PACKAGE_DIRECTORY / "preferences.ui")) 446preferences_window = preferences_builder.get_object("preferences-window") 447preferences_window.connect("destroy", lambda _: preferences_window.destroy()) 448 449def update_appearance_preferences(widget, *args): 450self.icon_size = preferences_builder.get_object("icon-size").get_value() 451self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active() 452 453with open(self.APPEARANCE_CONFIG_FILE, "w") as f: 454json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f) 455 456# Regenerate the results to reflect the changes 457if self.update_task is not None: 458self.update_task.cancel() 459self.update_task = asyncio.create_task(self.update_results(widget)) 460 461preferences_builder.connect_signals( 462{ 463"update-appearance": update_appearance_preferences, 464} 465) 466 467preferences_window.show_all() 468 469def update_permanently_enabled_providers(self, widget, param, provider): 470if widget.get_active(): 471self.permanently_enabled_providers[provider] = self.available_providers[provider] 472else: 473self.permanently_enabled_providers.pop(provider, None) 474 475with open(self.ENABLED_PROVIDERS_FILE, "w") as f: 476f.write("\n".join(self.permanently_enabled_providers.keys())) 477 478 479async def update_results(self, widget): 480spinner = self.builder.get_object("spinner") 481spinner.start() 482generators = [] 483query = self.builder.get_object("search-query-buffer").get_text() 484for provider in self.providers: 485generators.append(provider.search) 486 487# Clear the results list 488results_list = self.builder.get_object("results-list") 489for row in results_list.get_children(): 490results_list.remove(row) 491 492try: 493async for result in merge_generators_with_params(generators, query): 494result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions) 495results_list.add(result_box) 496result_box.show_all() 497except Exception as e: 498exception_type, exception, tb = sys.exc_info() 499filename, line_number, function_name, line = traceback.extract_tb(tb)[-1] 500print(f"{filename} raised an exception!") 501print(f"{type(e).__name__}: {str(e)}") 502dialog = ProviderExceptionDialog(Path(filename), e, self.window, tb=tb) 503dialog.show_all() 504response = dialog.run() 505if response == 0: 506# Open config 507if os.getenv("FLATPAK_SANDBOX_DIR"): 508subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")]) 509else: 510subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")]) 511 512self.kill() 513elif response == 1: 514# Reset config 515spec = importlib.util.spec_from_file_location(get_parent_package(filename).stem, Path(filename)) 516module = importlib.util.module_from_spec(spec) 517spec.loader.exec_module(module) 518 519if hasattr(module, "config_template"): 520with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f: 521json.dump(module.config_template, f) 522else: 523with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f: 524json.dump({}, f) 525 526self.kill() 527elif response == Gtk.ResponseType.CLOSE: 528self.kill() 529 530results_list.show_all() 531spinner.stop() 532 533 534if __name__ == "__main__": 535izvor = Izvor() 536izvor.window.show_all() 537izvor.window.present() 538try: 539izvor.loop.run_forever() 540finally: 541izvor.loop.close() 542