main.py
Python script, ASCII text executable
1""" 2Izvor - modular, GTK+ desktop search and launcher 3Copyright (C) 2024 <root@roundabout-host.com> 4 5This program is free software: you can redistribute it and/or modify 6it under the terms of the GNU General Public License as published by 7the Free Software Foundation, either version 3 of the License, or 8(at your option) any later version. 9 10This program is distributed in the hope that it will be useful, 11but WITHOUT ANY WARRANTY; without even the implied warranty of 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13GNU General Public License for more details. 14 15You should have received a copy of the GNU General Public License 16along with this program. If not, see <https://www.gnu.org/licenses/>. 17""" 18 19import os 20import gi 21import asyncio 22import importlib 23import gbulb 24from pathlib import Path 25from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any 26 27gi.require_version("Gtk", "3.0") 28from gi.repository import Gtk, Gdk, Pango 29 30 31_T = TypeVar("_T") 32 33ICON_SIZE = 32 34 35 36async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]], 37*args, **kwargs): 38iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs] 39tasks = {asyncio.create_task(it.__anext__()): it for it in iterators} 40 41while tasks: 42done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) 43 44for task in done: 45iterator = tasks.pop(task) 46try: 47result = task.result() 48yield result 49tasks[asyncio.create_task(iterator.__anext__())] = iterator 50except StopAsyncIteration: 51pass 52 53 54class ResultInfoWidget(Gtk.ListBoxRow): 55def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None]): 56super().__init__() 57self.name = name 58self.description = description 59self.image = image 60self.execute = execute 61 62self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6) 63self.add(self.box) 64 65if image[0] == "logo": 66self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR) 67self.image.set_pixel_size(ICON_SIZE) 68self.box.pack_start(self.image, False, False, 0) 69 70self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) 71self.box.pack_start(self.label_box, True, True, 0) 72 73self.name_label = Gtk.Label(label=self.name) 74self.name_label.set_line_wrap(True) 75self.name_label.set_justify(Gtk.Justification.LEFT) 76self.name_label.set_halign(Gtk.Align.START) 77attributes = Pango.AttrList() 78attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE)) 79self.name_label.set_attributes(attributes) 80self.label_box.pack_start(self.name_label, True, True, 0) 81 82self.description_label = Gtk.Label(label=self.description) 83self.description_label.set_line_wrap(True) 84self.description_label.set_justify(Gtk.Justification.LEFT) 85self.description_label.set_halign(Gtk.Align.START) 86self.label_box.pack_start(self.description_label, True, True, 0) 87 88 89class Izvor: 90USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor" 91USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor" 92USER_PROVIDERS = USER_DATA / "providers" 93USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers" 94SYSTEM_CONFIGS = Path("/etc/izvor") 95SYSTEM_DATA = Path("/usr/share/izvor") 96SYSTEM_PROVIDERS = SYSTEM_DATA / "providers" 97SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers" 98 99def __init__(self): 100gbulb.install(gtk=True) 101self.loop = asyncio.get_event_loop() 102self.builder = Gtk.Builder() 103self.builder.add_from_file("izvor.ui") 104self.window = self.builder.get_object("root") 105self.window.connect("destroy", self.kill) 106self.window.connect("key-press-event", self.check_escape) 107 108if not os.getenv("XDG_DATA_HOME"): 109print("XDG_DATA_HOME is not set. Using default path.") 110if not os.getenv("XDG_CONFIG_HOME"): 111print("XDG_CONFIG_HOME is not set. Using default path.") 112if not self.USER_PROVIDER_CONFIGS.exists(): 113print("Creating user config directory.") 114self.USER_PROVIDER_CONFIGS.mkdir(parents=True) 115if not self.USER_PROVIDERS.exists(): 116print("Creating user data directory.") 117self.USER_PROVIDERS.mkdir(parents=True) 118 119self.available_providers = {} 120 121if self.SYSTEM_PROVIDERS.exists(): 122for provider in self.SYSTEM_PROVIDERS.iterdir(): 123if not (provider / "__init__.py").exists(): 124continue 125self.available_providers[provider.stem] = provider 126if self.USER_PROVIDERS.exists(): 127for provider in self.USER_PROVIDERS.iterdir(): 128if not (provider / "__init__.py").exists(): 129continue 130self.available_providers[provider.stem] = provider 131 132print(f"Available providers: {list(self.available_providers.keys())}") 133 134self.enabled_providers = self.available_providers.copy() 135 136providers_menu = self.builder.get_object("providers-menu") 137 138self.providers = [] 139self.provider_checkboxes = {} 140 141for provider in self.enabled_providers.values(): 142spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py") 143module = importlib.util.module_from_spec(spec) 144spec.loader.exec_module(module) 145loaded_provider = module.Provider({}) 146self.providers.append(loaded_provider) 147print(f"Loaded provider: {loaded_provider.name}") 148print(loaded_provider.description) 149self.provider_checkboxes[loaded_provider.name] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name) 150providers_menu.append(self.provider_checkboxes[loaded_provider.name]) 151self.provider_checkboxes[loaded_provider.name].set_active(True) 152self.provider_checkboxes[loaded_provider.name].show() 153 154def call_update_results(widget): 155asyncio.create_task(self.update_results(widget)) 156 157def execute_result(widget, row): 158row.execute() 159self.kill() 160 161self.builder.connect_signals( 162{ 163"update-search": call_update_results, 164"result-activated": execute_result, 165} 166) 167 168def kill(self, widget=None): 169self.loop.stop() 170self.window.destroy() 171 172def check_escape(self, widget, event): 173if event.keyval == 65307: 174self.kill() 175 176async def update_results(self, widget): 177print("Updating results...") 178generators = [] 179query = self.builder.get_object("search-query-buffer").get_text() 180# print(f"Query: {query}") 181for provider in self.enabled_providers.values(): 182spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py") 183module = importlib.util.module_from_spec(spec) 184spec.loader.exec_module(module) 185loaded_provider = module.Provider({}) 186# print(f"Searching with provider: {loaded_provider.name}") 187 188generators.append(loaded_provider.search) 189 190# Clear the results list 191results_list = self.builder.get_object("results-list") 192for row in results_list.get_children(): 193results_list.remove(row) 194 195async for result in merge_generators_with_params(generators, query): 196# print(result) 197result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"]) 198results_list.add(result_box) 199result_box.show_all() 200 201results_list.show_all() 202 203 204if __name__ == "__main__": 205izvor = Izvor() 206izvor.window.show_all() 207try: 208izvor.loop.run_forever() 209finally: 210izvor.loop.close() 211