By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 main.py

View raw Download
text/plain • 7.8 kiB
Python script, ASCII text executable
        
            
1
"""
2
Izvor - modular, GTK+ desktop search and launcher
3
Copyright (C) 2024 <root@roundabout-host.com>
4
5
This program is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with this program. If not, see <https://www.gnu.org/licenses/>.
17
"""
18
19
import os
20
import gi
21
import asyncio
22
import importlib
23
import gbulb
24
from pathlib import Path
25
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
26
27
gi.require_version("Gtk", "3.0")
28
from gi.repository import Gtk, Gdk, Pango
29
30
31
_T = TypeVar("_T")
32
33
ICON_SIZE = 32
34
35
36
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
37
*args, **kwargs):
38
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
39
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
40
41
while tasks:
42
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
43
44
for task in done:
45
iterator = tasks.pop(task)
46
try:
47
result = task.result()
48
yield result
49
tasks[asyncio.create_task(iterator.__anext__())] = iterator
50
except StopAsyncIteration:
51
pass
52
53
54
class ResultInfoWidget(Gtk.ListBoxRow):
55
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None]):
56
super().__init__()
57
self.name = name
58
self.description = description
59
self.image = image
60
self.execute = execute
61
62
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
63
self.add(self.box)
64
65
if image[0] == "logo":
66
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
67
self.image.set_pixel_size(ICON_SIZE)
68
self.box.pack_start(self.image, False, False, 0)
69
70
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
71
self.box.pack_start(self.label_box, True, True, 0)
72
73
self.name_label = Gtk.Label(label=self.name)
74
self.name_label.set_line_wrap(True)
75
self.name_label.set_justify(Gtk.Justification.LEFT)
76
self.name_label.set_halign(Gtk.Align.START)
77
attributes = Pango.AttrList()
78
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
79
self.name_label.set_attributes(attributes)
80
self.label_box.pack_start(self.name_label, True, True, 0)
81
82
self.description_label = Gtk.Label(label=self.description)
83
self.description_label.set_line_wrap(True)
84
self.description_label.set_justify(Gtk.Justification.LEFT)
85
self.description_label.set_halign(Gtk.Align.START)
86
self.label_box.pack_start(self.description_label, True, True, 0)
87
88
89
class Izvor:
90
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
91
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
92
USER_PROVIDERS = USER_DATA / "providers"
93
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
94
SYSTEM_CONFIGS = Path("/etc/izvor")
95
SYSTEM_DATA = Path("/usr/share/izvor")
96
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
97
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
98
99
def __init__(self):
100
self.builder = Gtk.Builder()
101
self.builder.add_from_file("izvor.ui")
102
self.window = self.builder.get_object("root")
103
self.window.connect("destroy", Gtk.main_quit)
104
self.window.connect("key-press-event", self.check_escape)
105
106
if not os.getenv("XDG_DATA_HOME"):
107
print("XDG_DATA_HOME is not set. Using default path.")
108
if not os.getenv("XDG_CONFIG_HOME"):
109
print("XDG_CONFIG_HOME is not set. Using default path.")
110
if not self.USER_PROVIDER_CONFIGS.exists():
111
print("Creating user config directory.")
112
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
113
if not self.USER_PROVIDERS.exists():
114
print("Creating user data directory.")
115
self.USER_PROVIDERS.mkdir(parents=True)
116
117
self.available_providers = {}
118
119
if self.SYSTEM_PROVIDERS.exists():
120
for provider in self.SYSTEM_PROVIDERS.iterdir():
121
if not (provider / "__init__.py").exists():
122
continue
123
self.available_providers[provider.stem] = provider
124
if self.USER_PROVIDERS.exists():
125
for provider in self.USER_PROVIDERS.iterdir():
126
if not (provider / "__init__.py").exists():
127
continue
128
self.available_providers[provider.stem] = provider
129
130
print(f"Available providers: {list(self.available_providers.keys())}")
131
132
self.enabled_providers = self.available_providers.copy()
133
134
providers_menu = self.builder.get_object("providers-menu")
135
136
self.providers = []
137
self.provider_checkboxes = {}
138
139
for provider in self.enabled_providers.values():
140
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
141
module = importlib.util.module_from_spec(spec)
142
spec.loader.exec_module(module)
143
loaded_provider = module.Provider({})
144
self.providers.append(loaded_provider)
145
print(f"Loaded provider: {loaded_provider.name}")
146
print(loaded_provider.description)
147
self.provider_checkboxes[loaded_provider.name] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
148
providers_menu.append(self.provider_checkboxes[loaded_provider.name])
149
self.provider_checkboxes[loaded_provider.name].set_active(True)
150
self.provider_checkboxes[loaded_provider.name].show()
151
152
def call_update_results(widget):
153
asyncio.create_task(self.update_results(widget))
154
155
def execute_result(widget, row):
156
row.execute()
157
158
self.builder.connect_signals(
159
{
160
"update-search": call_update_results,
161
"result-activated": execute_result,
162
}
163
)
164
165
def check_escape(self, widget, event):
166
if event.keyval == 65307:
167
self.window.destroy()
168
169
async def update_results(self, widget):
170
print("Updating results...")
171
generators = []
172
query = self.builder.get_object("search-query-buffer").get_text()
173
# print(f"Query: {query}")
174
for provider in self.enabled_providers.values():
175
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
176
module = importlib.util.module_from_spec(spec)
177
spec.loader.exec_module(module)
178
loaded_provider = module.Provider({})
179
# print(f"Searching with provider: {loaded_provider.name}")
180
181
generators.append(loaded_provider.search)
182
183
# Clear the results list
184
results_list = self.builder.get_object("results-list")
185
for row in results_list.get_children():
186
results_list.remove(row)
187
188
async for result in merge_generators_with_params(generators, query):
189
# print(result)
190
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"])
191
results_list.add(result_box)
192
result_box.show_all()
193
194
results_list.show_all()
195
196
197
if __name__ == "__main__":
198
gbulb.install(gtk=True)
199
loop = asyncio.get_event_loop()
200
izvor = Izvor()
201
izvor.window.show_all()
202
loop.run_forever()
203