<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;">#!/usr/bin/env python3

"""
Izvor - modular, GTK+ desktop search and launcher
Copyright (C) 2024 &lt;root@roundabout-host.com&gt;

This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program.  If not, see &lt;https://www.gnu.org/licenses/&gt;.
"""

import os
import subprocess

import gi
from gi.events import GLibEventLoopPolicy
import asyncio
import importlib
from pathlib import Path
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any

gi.require_version("Gtk", "3.0")
gi.require_version("Pango", "1.0")
from gi.repository import Gtk, Gdk, Pango, GLib
import gettext
import json
import traceback
import sys

_ = gettext.gettext

_T = TypeVar("_T")

PACKAGE_DIRECTORY = Path(__file__).resolve().parent

def get_parent_package(path):
    parent_path = os.path.split(path)[0]
    while parent_path != "/":
        if "__init__.py" in os.listdir(parent_path):
            return Path(parent_path)
        parent_path = os.path.split(parent_path)[0]
    return None


async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
                                       *args, **kwargs):
    iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
    tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}

    while tasks:
        done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)

        for task in done:
            iterator = tasks.pop(task)
            try:
                result = task.result()
                yield result
                tasks[asyncio.create_task(iterator.__anext__())] = iterator
            except StopAsyncIteration:
                pass


class ResultInfoWidget(Gtk.ListBoxRow):
    def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True):
        super().__init__()
        self.name = name
        self.description = description
        self.image = image
        self.execute = execute

        self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
        self.add(self.box)

        if icon_size:
            # If the icon size is 0, have no icons
            if image[0] == "logo":
                self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
                self.image.set_pixel_size(icon_size)
                self.box.pack_start(self.image, False, False, 0)

        self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
        self.label_box.set_valign(Gtk.Align.START)
        self.box.pack_start(self.label_box, True, True, 0)

        self.name_label = Gtk.Label(label=self.name)
        self.name_label.set_line_wrap(True)
        self.name_label.set_justify(Gtk.Justification.LEFT)
        self.name_label.set_halign(Gtk.Align.START)
        self.name_label.set_line_wrap_mode(Pango.WrapMode.WORD_CHAR)
        attributes = Pango.AttrList()
        attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
        self.name_label.set_attributes(attributes)
        self.label_box.pack_start(self.name_label, True, True, 0)

        if show_description:
            self.description_label = Gtk.Label(label=self.description)
            self.description_label.set_line_wrap(True)
            self.description_label.set_justify(Gtk.Justification.LEFT)
            self.description_label.set_halign(Gtk.Align.START)
            self.description_label.set_line_wrap_mode(Pango.WrapMode.WORD_CHAR)
            self.label_box.pack_start(self.description_label, True, True, 0)


class ProviderExceptionDialog(Gtk.Dialog):
    def __init__(self, provider: Path, exception: Exception, parent=None, tb=None):
        super().__init__(title=_("{provider} raised a {exception}".format(provider=get_parent_package(provider), exception=type(exception).__name__)), transient_for=parent)
        self.add_button(_("Open config"), 0)
        self.add_button(_("Reset config"), 1)
        self.add_button(_("Quit app"), Gtk.ResponseType.CLOSE)
        self.set_default_response(Gtk.ResponseType.CLOSE)
        self.set_default_size(360, 240)
        self.set_resizable(True)

        self.provider = provider
        self.exception = exception

        traceback_text = "\n".join(traceback.format_exception(type(exception), exception, tb))

        self.label = Gtk.Label(label=traceback_text)

        monospaced_font = Pango.FontDescription()
        monospaced_font.set_family("monospace")
        self.label.override_font(monospaced_font)
        self.label.set_line_wrap(True)
        self.label.set_justify(Gtk.Justification.LEFT)
        self.label.set_halign(Gtk.Align.START)
        self.label.set_selectable(True)

        self.get_content_area().add(self.label)
        self.label.show()

class Izvor(Gtk.Application):
    USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
    USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
    USER_PROVIDERS = USER_DATA / "providers"
    USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
    SYSTEM_CONFIGS = Path("/etc/izvor")
    SYSTEM_DATA = Path("/usr/share/izvor")
    SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
    SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
    ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers"
    APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json"

    def __init__(self):
        super().__init__(application_id="com.roundabout-host.roundabout.izvor")
        asyncio.set_event_loop_policy(GLibEventLoopPolicy())
        self.loop = asyncio.get_event_loop()
        self.builder = Gtk.Builder()
        self.builder.add_from_file(str(PACKAGE_DIRECTORY / "izvor.ui"))
        self.window = self.builder.get_object("root")
        self.window.connect("destroy", self.kill)
        self.window.connect("key-press-event", self.check_escape)
        self.window.set_title(_("Launcher"))

        if not os.getenv("XDG_DATA_HOME"):
            print("XDG_DATA_HOME is not set. Using default path.")
        if not os.getenv("XDG_CONFIG_HOME"):
            print("XDG_CONFIG_HOME is not set. Using default path.")
        if not self.USER_PROVIDER_CONFIGS.exists():
            print("Creating user config directory.")
            self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
        if not self.USER_PROVIDERS.exists():
            print("Creating user data directory.")
            self.USER_PROVIDERS.mkdir(parents=True)

        self.available_providers = {}

        if self.SYSTEM_PROVIDERS.exists():
            for provider in self.SYSTEM_PROVIDERS.iterdir():
                if not (provider / "__init__.py").exists():
                    continue
                self.available_providers[provider.stem] = provider
        if self.USER_PROVIDERS.exists():
            for provider in self.USER_PROVIDERS.iterdir():
                if not (provider / "__init__.py").exists():
                    continue
                self.available_providers[provider.stem] = provider

        print(f"Available providers: {list(self.available_providers.keys())}")

        for provider, path in self.available_providers.items():
            if not (self.USER_PROVIDER_CONFIGS / f"{provider}.json").exists():
                with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "w") as f:
                    spec = importlib.util.spec_from_file_location(provider, path / "__init__.py")
                    module = importlib.util.module_from_spec(spec)
                    spec.loader.exec_module(module)
                    if hasattr(module, "config_template"):
                        json.dump(module.config_template, f)
                    else:
                        json.dump({}, f)

        if not self.ENABLED_PROVIDERS_FILE.exists():
            self.ENABLED_PROVIDERS_FILE.touch()

            with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
                for provider in self.available_providers:
                    f.write(provider + "\n")

        if not self.APPEARANCE_CONFIG_FILE.exists():
            self.APPEARANCE_CONFIG_FILE.touch()
            with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
                json.dump({"icon_size": 32, "show_result_descriptions": True}, f)

        with open(self.APPEARANCE_CONFIG_FILE, "r") as f:
            appearance_config = json.load(f)
            self.icon_size = appearance_config.get("icon_size", 32)
            self.show_result_descriptions = appearance_config.get("show_result_descriptions", True)

        self.permanently_enabled_providers = {}

        with open(self.ENABLED_PROVIDERS_FILE, "r") as f:
            for line in f:
                provider = line.strip()
                if provider in self.available_providers:
                    self.permanently_enabled_providers[provider] = self.available_providers[provider]

        self.enabled_providers = self.permanently_enabled_providers.copy()

        providers_menu = self.builder.get_object("providers-menu")

        self.providers = []
        self.provider_checkboxes = {}

        for provider in self.permanently_enabled_providers.values():
            spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
            with open(self.USER_PROVIDER_CONFIGS / f"{provider.stem}.json", "r") as f:
                provider_config = json.load(f)
            loaded_provider = module.Provider(provider_config)
            self.providers.append(loaded_provider)
            self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
            providers_menu.append(self.provider_checkboxes[provider.stem])
            self.provider_checkboxes[provider.stem].set_active(True)
            self.provider_checkboxes[provider.stem].show()
            self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)

        self.update_task = None

        def call_update_results(widget):
            if self.update_task is not None:
                self.update_task.cancel()
            self.update_task = asyncio.create_task(self.update_results(widget))

        def execute_result(widget, row):
            row.execute()
            self.kill()

        self.builder.connect_signals(
            {
                "update-search": call_update_results,
                "result-activated": execute_result,
                "providers-menu-all-toggled": self.update_enabled_providers_all,
                "menu-about": self.about,
                "menu-providers": self.provider_menu,
                "menu-preferences": self.preferences,
            }
        )

        GLib.idle_add(self.update_enabled_providers, None)

    def update_enabled_providers(self, widget=None):
        self.providers = []
        for provider, checkbox in self.provider_checkboxes.items():
            if checkbox.get_active():
                self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
                spec = importlib.util.spec_from_file_location(provider, self.enabled_providers[provider] / "__init__.py")
                module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(module)
                with open(self.USER_PROVIDER_CONFIGS / f"{provider}.json", "r") as f:
                    provider_config = json.load(f)
                loaded_provider = module.Provider(provider_config)
                self.providers.append(loaded_provider)
            else:
                self.enabled_providers.pop(provider, None)

        if self.update_task is not None:
            self.update_task.cancel()
        self.update_task = asyncio.create_task(self.update_results(widget))

    def update_enabled_providers_all(self, widget):
        for checkbox in self.provider_checkboxes.values():
            checkbox.set_active(widget.get_active())
        self.update_enabled_providers()

    def kill(self, widget=None):
        self.loop.stop()
        self.quit()

    def check_escape(self, widget, event):
        results_list = self.builder.get_object("results-list")
        search_entry = self.builder.get_object("search-query")
        rows = results_list.get_children()
        current_row = results_list.get_selected_row()

        if event.keyval == Gdk.KEY_Escape:
            self.kill()
            return True

        if event.keyval == Gdk.KEY_Down:
            # Move to the next row
            if current_row:
                current_index = rows.index(current_row)
                next_index = (current_index + 1) % len(rows)  # Wrap to the beginning if at the end
            else:
                next_index = 0

            results_list.select_row(rows[next_index])
            search_entry.grab_focus()  # Refocus the search entry
            self.scroll_to_row(rows[next_index], results_list.get_adjustment())
            return True

        if event.keyval == Gdk.KEY_Up:
            # Move to the previous row
            if current_row:
                current_index = rows.index(current_row)
                prev_index = (current_index - 1) % len(rows)  # Wrap to the end if at the beginning
            else:
                prev_index = len(rows) - 1

            results_list.select_row(rows[prev_index])
            search_entry.grab_focus()  # Refocus the search entry
            self.scroll_to_row(rows[prev_index], results_list.get_adjustment())
            return True

        if event.keyval == Gdk.KEY_Return:
            if current_row:
                # Execute the selected row
                current_row.activate()
                return True
            elif len(rows) &gt; 0:
                # Execute the first row
                rows[0].activate()
                return True

        return False

    def scroll_to_row(self, row, adjustment):
        row_geometry = row.get_allocation()
        row_y = row_geometry.y
        row_height = row_geometry.height
        adjustment_value = adjustment.get_value()
        adjustment_upper = adjustment.get_upper() - adjustment.get_page_size()

        if row_y + row_height &gt; adjustment_value + adjustment.get_page_size():
            adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper))
        elif row_y &lt; adjustment_value:
            adjustment.set_value(max(row_y, 0))

    def about(self, widget):
        about_builder = Gtk.Builder()
        about_builder.add_from_file(str(PACKAGE_DIRECTORY / "about.ui"))
        about_window = about_builder.get_object("about-dialog")
        about_window.connect("destroy", lambda _: about_window.destroy())
        about_window.connect("close", lambda _: about_window.destroy())
        about_window.connect("response", lambda _, __: about_window.destroy())
        about_window.show_all()

    def provider_menu(self, widget):
        providers_builder = Gtk.Builder()
        providers_builder.add_from_file(str(PACKAGE_DIRECTORY / "providers.ui"))
        providers_window = providers_builder.get_object("providers-window")
        providers_window.connect("destroy", lambda _: providers_window.destroy())

        providers_table = providers_builder.get_object("providers-table")   # actually Gtk.Grid
        rows = 0
        for provider in self.available_providers.values():
            spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
            loaded_provider = module.Provider({})

            provider_label = Gtk.Label(label=loaded_provider.name)
            provider_label.set_halign(Gtk.Align.START)
            provider_label.set_valign(Gtk.Align.CENTER)
            font_desc = Pango.FontDescription()
            font_desc.set_weight(Pango.Weight.BOLD)
            provider_label.override_font(font_desc)
            provider_label.set_line_wrap(True)
            provider_label.set_justify(Gtk.Justification.LEFT)

            provider_description = Gtk.Label(label=loaded_provider.description)
            provider_description.set_halign(Gtk.Align.START)
            provider_description.set_valign(Gtk.Align.CENTER)
            provider_description.set_justify(Gtk.Justification.LEFT)
            provider_description.set_line_wrap(True)

            icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR)
            icon.set_pixel_size(self.icon_size)

            switch = Gtk.Switch()
            switch.set_valign(Gtk.Align.CENTER)
            switch.set_active(provider.stem in self.permanently_enabled_providers)
            switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem)

            config_button = Gtk.Button.new_with_label(_("Configure"))
            def open_config(widget, stem=provider.stem):
                # User-accessible provider config, not Flatpak sandboxed
                config_path = str(self.USER_PROVIDER_CONFIGS / f"{stem}.json")
                if os.getenv("FLATPAK_SANDBOX_DIR"):
                    subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", config_path])
                else:
                    subprocess.Popen(["xdg-open", config_path])
            config_button.connect("clicked", open_config)
            config_button.set_relief(Gtk.ReliefStyle.NONE)

            providers_table.attach(icon, 0, rows, 1, 1)
            providers_table.attach(provider_label, 1, rows, 1, 1)
            providers_table.attach(provider_description, 2, rows, 1, 1)
            providers_table.attach(switch, 3, rows, 1, 1)
            providers_table.attach(config_button, 4, rows, 1, 1)

            provider_label.show()
            provider_description.show()

            rows += 1

        def open_provider_directory(widget):
            # User-accessible provider directory, not Flatpak sandboxed
            provider_directory = str(self.USER_PROVIDERS)
            if os.getenv("FLATPAK_SANDBOX_DIR"):
                subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", provider_directory])
            else:
                subprocess.Popen(["xdg-open", provider_directory])

        providers_builder.connect_signals(
            {
                "open-provider-directory": open_provider_directory,
            }
        )

        providers_window.show_all()

    def preferences(self, widget):
        preferences_builder = Gtk.Builder()
        preferences_builder.add_from_file(str(PACKAGE_DIRECTORY / "preferences.ui"))
        preferences_window = preferences_builder.get_object("preferences-window")
        preferences_window.connect("destroy", lambda _: preferences_window.destroy())

        def update_appearance_preferences(widget, *args):
            self.icon_size = preferences_builder.get_object("icon-size").get_value()
            self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active()

            with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
                json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f)

            # Regenerate the results to reflect the changes
            if self.update_task is not None:
                self.update_task.cancel()
            self.update_task = asyncio.create_task(self.update_results(widget))

        preferences_builder.connect_signals(
            {
                "update-appearance": update_appearance_preferences,
            }
        )

        preferences_window.show_all()

    def update_permanently_enabled_providers(self, widget, param, provider):
        if widget.get_active():
            self.permanently_enabled_providers[provider] = self.available_providers[provider]
        else:
            self.permanently_enabled_providers.pop(provider, None)

        with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
            f.write("\n".join(self.permanently_enabled_providers.keys()))


    async def update_results(self, widget):
        spinner = self.builder.get_object("spinner")
        spinner.start()
        generators = []
        query = self.builder.get_object("search-query-buffer").get_text()
        for provider in self.providers:
            generators.append(provider.search)

        # Clear the results list
        results_list = self.builder.get_object("results-list")
        for row in results_list.get_children():
            results_list.remove(row)

        try:
            async for result in merge_generators_with_params(generators, query):
                result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions)
                results_list.add(result_box)
                result_box.show_all()
        except Exception as e:
            exception_type, exception, tb = sys.exc_info()
            filename, line_number, function_name, line = traceback.extract_tb(tb)[-1]
            print(f"{filename} raised an exception!")
            print(f"{type(e).__name__}: {str(e)}")
            dialog = ProviderExceptionDialog(Path(filename), e, self.window, tb=tb)
            dialog.show_all()
            response = dialog.run()
            if response == 0:
                # Open config
                if os.getenv("FLATPAK_SANDBOX_DIR"):
                    subprocess.Popen(["flatpak-spawn", "--host", "xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])
                else:
                    subprocess.Popen(["xdg-open", str(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json")])

                self.kill()
            elif response == 1:
                # Reset config
                spec = importlib.util.spec_from_file_location(get_parent_package(filename).stem, Path(filename))
                module = importlib.util.module_from_spec(spec)
                spec.loader.exec_module(module)

                if hasattr(module, "config_template"):
                    with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
                        json.dump(module.config_template, f)
                else:
                    with open(self.USER_PROVIDER_CONFIGS / f"{get_parent_package(filename).stem}.json", "w") as f:
                        json.dump({}, f)

                self.kill()
            elif response == Gtk.ResponseType.CLOSE:
                self.kill()

        results_list.show_all()
        spinner.stop()


if __name__ == "__main__":
    izvor = Izvor()
    izvor.window.show_all()
    izvor.window.present()
    try:
        izvor.loop.run_forever()
    finally:
        izvor.loop.close()
</pre></body></html>