main.py
Python script, ASCII text executable
1""" 2Izvor - modular, GTK+ desktop search and launcher 3Copyright (C) 2024 <root@roundabout-host.com> 4 5This program is free software: you can redistribute it and/or modify 6it under the terms of the GNU General Public License as published by 7the Free Software Foundation, either version 3 of the License, or 8(at your option) any later version. 9 10This program is distributed in the hope that it will be useful, 11but WITHOUT ANY WARRANTY; without even the implied warranty of 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13GNU General Public License for more details. 14 15You should have received a copy of the GNU General Public License 16along with this program. If not, see <https://www.gnu.org/licenses/>. 17""" 18 19import os 20import gi 21import asyncio 22import importlib 23import gbulb 24from pathlib import Path 25from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any 26 27gi.require_version("Gtk", "3.0") 28from gi.repository import Gtk, Gdk, Pango 29 30 31_T = TypeVar("_T") 32 33ICON_SIZE = 32 34 35 36async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]], 37*args, **kwargs): 38iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs] 39tasks = {asyncio.create_task(it.__anext__()): it for it in iterators} 40 41while tasks: 42done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED) 43 44for task in done: 45iterator = tasks.pop(task) 46try: 47result = task.result() 48yield result 49tasks[asyncio.create_task(iterator.__anext__())] = iterator 50except StopAsyncIteration: 51pass 52 53 54class ResultInfoWidget(Gtk.ListBoxRow): 55def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None]): 56super().__init__() 57self.name = name 58self.description = description 59self.image = image 60self.execute = execute 61 62self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6) 63self.add(self.box) 64 65if image[0] == "logo": 66self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR) 67self.image.set_pixel_size(ICON_SIZE) 68self.box.pack_start(self.image, False, False, 0) 69 70self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) 71self.box.pack_start(self.label_box, True, True, 0) 72 73self.name_label = Gtk.Label(label=self.name) 74self.name_label.set_line_wrap(True) 75self.name_label.set_justify(Gtk.Justification.LEFT) 76self.name_label.set_halign(Gtk.Align.START) 77attributes = Pango.AttrList() 78attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE)) 79self.name_label.set_attributes(attributes) 80self.label_box.pack_start(self.name_label, True, True, 0) 81 82self.description_label = Gtk.Label(label=self.description) 83self.description_label.set_line_wrap(True) 84self.description_label.set_justify(Gtk.Justification.LEFT) 85self.description_label.set_halign(Gtk.Align.START) 86self.label_box.pack_start(self.description_label, True, True, 0) 87 88 89class Izvor(Gtk.Application): 90USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor" 91USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor" 92USER_PROVIDERS = USER_DATA / "providers" 93USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers" 94SYSTEM_CONFIGS = Path("/etc/izvor") 95SYSTEM_DATA = Path("/usr/share/izvor") 96SYSTEM_PROVIDERS = SYSTEM_DATA / "providers" 97SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers" 98 99def __init__(self): 100super().__init__(application_id="com.roundabout-host.roundabout.izvor") 101gbulb.install(gtk=True) 102self.loop = asyncio.get_event_loop() 103self.builder = Gtk.Builder() 104self.builder.add_from_file("izvor.ui") 105self.window = self.builder.get_object("root") 106self.window.connect("destroy", self.kill) 107self.window.connect("key-press-event", self.check_escape) 108 109if not os.getenv("XDG_DATA_HOME"): 110print("XDG_DATA_HOME is not set. Using default path.") 111if not os.getenv("XDG_CONFIG_HOME"): 112print("XDG_CONFIG_HOME is not set. Using default path.") 113if not self.USER_PROVIDER_CONFIGS.exists(): 114print("Creating user config directory.") 115self.USER_PROVIDER_CONFIGS.mkdir(parents=True) 116if not self.USER_PROVIDERS.exists(): 117print("Creating user data directory.") 118self.USER_PROVIDERS.mkdir(parents=True) 119 120self.available_providers = {} 121 122if self.SYSTEM_PROVIDERS.exists(): 123for provider in self.SYSTEM_PROVIDERS.iterdir(): 124if not (provider / "__init__.py").exists(): 125continue 126self.available_providers[provider.stem] = provider 127if self.USER_PROVIDERS.exists(): 128for provider in self.USER_PROVIDERS.iterdir(): 129if not (provider / "__init__.py").exists(): 130continue 131self.available_providers[provider.stem] = provider 132 133print(f"Available providers: {list(self.available_providers.keys())}") 134 135self.enabled_providers = self.available_providers.copy() 136 137providers_menu = self.builder.get_object("providers-menu") 138 139self.providers = [] 140self.provider_checkboxes = {} 141 142for provider in self.enabled_providers.values(): 143spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py") 144module = importlib.util.module_from_spec(spec) 145spec.loader.exec_module(module) 146loaded_provider = module.Provider({}) 147self.providers.append(loaded_provider) 148print(f"Loaded provider: {loaded_provider.name}") 149print(loaded_provider.description) 150self.provider_checkboxes[loaded_provider.name] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name) 151providers_menu.append(self.provider_checkboxes[loaded_provider.name]) 152self.provider_checkboxes[loaded_provider.name].set_active(True) 153self.provider_checkboxes[loaded_provider.name].show() 154 155def call_update_results(widget): 156asyncio.create_task(self.update_results(widget)) 157 158def execute_result(widget, row): 159row.execute() 160self.kill() 161 162self.builder.connect_signals( 163{ 164"update-search": call_update_results, 165"result-activated": execute_result, 166} 167) 168 169def kill(self, widget=None): 170self.loop.stop() 171self.quit() 172 173def check_escape(self, widget, event): 174if event.keyval == 65307: 175self.kill() 176 177async def update_results(self, widget): 178print("Updating results...") 179generators = [] 180query = self.builder.get_object("search-query-buffer").get_text() 181# print(f"Query: {query}") 182for provider in self.enabled_providers.values(): 183spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py") 184module = importlib.util.module_from_spec(spec) 185spec.loader.exec_module(module) 186loaded_provider = module.Provider({}) 187# print(f"Searching with provider: {loaded_provider.name}") 188 189generators.append(loaded_provider.search) 190 191# Clear the results list 192results_list = self.builder.get_object("results-list") 193for row in results_list.get_children(): 194results_list.remove(row) 195 196async for result in merge_generators_with_params(generators, query): 197# print(result) 198result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"]) 199results_list.add(result_box) 200result_box.show_all() 201 202results_list.show_all() 203 204 205if __name__ == "__main__": 206izvor = Izvor() 207izvor.window.show_all() 208try: 209izvor.loop.run_forever() 210finally: 211izvor.loop.close() 212