By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 main.py

View raw Download
text/plain • 9.4 kiB
Python script, ASCII text executable
        
            
1
"""
2
Izvor - modular, GTK+ desktop search and launcher
3
Copyright (C) 2024 <root@roundabout-host.com>
4
5
This program is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with this program. If not, see <https://www.gnu.org/licenses/>.
17
"""
18
19
import os
20
import gi
21
import asyncio
22
import importlib
23
import gbulb
24
from pathlib import Path
25
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
26
27
gi.require_version("Gtk", "3.0")
28
from gi.repository import Gtk, Gdk, Pango, GLib
29
30
31
_T = TypeVar("_T")
32
33
ICON_SIZE = 32
34
35
36
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
37
*args, **kwargs):
38
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
39
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
40
41
while tasks:
42
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
43
44
for task in done:
45
iterator = tasks.pop(task)
46
try:
47
result = task.result()
48
yield result
49
tasks[asyncio.create_task(iterator.__anext__())] = iterator
50
except StopAsyncIteration:
51
pass
52
53
54
class ResultInfoWidget(Gtk.ListBoxRow):
55
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None]):
56
super().__init__()
57
self.name = name
58
self.description = description
59
self.image = image
60
self.execute = execute
61
62
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
63
self.add(self.box)
64
65
if image[0] == "logo":
66
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
67
self.image.set_pixel_size(ICON_SIZE)
68
self.box.pack_start(self.image, False, False, 0)
69
70
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
71
self.box.pack_start(self.label_box, True, True, 0)
72
73
self.name_label = Gtk.Label(label=self.name)
74
self.name_label.set_line_wrap(True)
75
self.name_label.set_justify(Gtk.Justification.LEFT)
76
self.name_label.set_halign(Gtk.Align.START)
77
attributes = Pango.AttrList()
78
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
79
self.name_label.set_attributes(attributes)
80
self.label_box.pack_start(self.name_label, True, True, 0)
81
82
self.description_label = Gtk.Label(label=self.description)
83
self.description_label.set_line_wrap(True)
84
self.description_label.set_justify(Gtk.Justification.LEFT)
85
self.description_label.set_halign(Gtk.Align.START)
86
self.label_box.pack_start(self.description_label, True, True, 0)
87
88
89
class Izvor(Gtk.Application):
90
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
91
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
92
USER_PROVIDERS = USER_DATA / "providers"
93
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
94
SYSTEM_CONFIGS = Path("/etc/izvor")
95
SYSTEM_DATA = Path("/usr/share/izvor")
96
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
97
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
98
99
def __init__(self):
100
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
101
gbulb.install(gtk=True)
102
self.loop = asyncio.get_event_loop()
103
self.builder = Gtk.Builder()
104
self.builder.add_from_file("izvor.ui")
105
self.window = self.builder.get_object("root")
106
self.window.connect("destroy", self.kill)
107
self.window.connect("key-press-event", self.check_escape)
108
109
if not os.getenv("XDG_DATA_HOME"):
110
print("XDG_DATA_HOME is not set. Using default path.")
111
if not os.getenv("XDG_CONFIG_HOME"):
112
print("XDG_CONFIG_HOME is not set. Using default path.")
113
if not self.USER_PROVIDER_CONFIGS.exists():
114
print("Creating user config directory.")
115
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
116
if not self.USER_PROVIDERS.exists():
117
print("Creating user data directory.")
118
self.USER_PROVIDERS.mkdir(parents=True)
119
120
self.available_providers = {}
121
122
if self.SYSTEM_PROVIDERS.exists():
123
for provider in self.SYSTEM_PROVIDERS.iterdir():
124
if not (provider / "__init__.py").exists():
125
continue
126
self.available_providers[provider.stem] = provider
127
if self.USER_PROVIDERS.exists():
128
for provider in self.USER_PROVIDERS.iterdir():
129
if not (provider / "__init__.py").exists():
130
continue
131
self.available_providers[provider.stem] = provider
132
133
print(f"Available providers: {list(self.available_providers.keys())}")
134
135
self.permanently_enabled_providers = self.available_providers.copy()
136
self.enabled_providers = self.permanently_enabled_providers.copy()
137
138
providers_menu = self.builder.get_object("providers-menu")
139
140
self.providers = []
141
self.provider_checkboxes = {}
142
143
for provider in self.enabled_providers.values():
144
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
145
module = importlib.util.module_from_spec(spec)
146
spec.loader.exec_module(module)
147
loaded_provider = module.Provider({})
148
self.providers.append(loaded_provider)
149
print(f"Loaded provider: {loaded_provider.name}")
150
print(loaded_provider.description)
151
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
152
providers_menu.append(self.provider_checkboxes[provider.stem])
153
self.provider_checkboxes[provider.stem].set_active(True)
154
self.provider_checkboxes[provider.stem].show()
155
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
156
157
def call_update_results(widget):
158
asyncio.create_task(self.update_results(widget))
159
160
def execute_result(widget, row):
161
row.execute()
162
self.kill()
163
164
self.builder.connect_signals(
165
{
166
"update-search": call_update_results,
167
"result-activated": execute_result,
168
"providers-menu-all-toggled": self.update_enabled_providers_all,
169
"menu-about": self.about,
170
}
171
)
172
173
GLib.idle_add(self.update_enabled_providers, None)
174
175
def update_enabled_providers(self, widget=None):
176
for provider, checkbox in self.provider_checkboxes.items():
177
if checkbox.get_active():
178
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
179
else:
180
self.enabled_providers.pop(provider, None)
181
182
asyncio.create_task(self.update_results(None))
183
184
def update_enabled_providers_all(self, widget):
185
for checkbox in self.provider_checkboxes.values():
186
checkbox.set_active(widget.get_active())
187
self.update_enabled_providers()
188
189
def kill(self, widget=None):
190
self.loop.stop()
191
self.quit()
192
193
def check_escape(self, widget, event):
194
if event.keyval == 65307:
195
self.kill()
196
197
def about(self, widget):
198
about_builder = Gtk.Builder()
199
about_builder.add_from_file("about.ui")
200
about_window = about_builder.get_object("about-dialog")
201
about_window.connect("destroy", lambda _: about_window.destroy())
202
about_window.connect("close", lambda _: about_window.destroy())
203
about_window.connect("response", lambda _, __: about_window.destroy())
204
about_window.show_all()
205
206
async def update_results(self, widget):
207
print("Updating results...")
208
generators = []
209
query = self.builder.get_object("search-query-buffer").get_text()
210
# print(f"Query: {query}")
211
for provider in self.enabled_providers.values():
212
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
213
module = importlib.util.module_from_spec(spec)
214
spec.loader.exec_module(module)
215
loaded_provider = module.Provider({})
216
# print(f"Searching with provider: {loaded_provider.name}")
217
218
generators.append(loaded_provider.search)
219
220
# Clear the results list
221
results_list = self.builder.get_object("results-list")
222
for row in results_list.get_children():
223
results_list.remove(row)
224
225
async for result in merge_generators_with_params(generators, query):
226
# print(result)
227
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"])
228
results_list.add(result_box)
229
result_box.show_all()
230
231
results_list.show_all()
232
233
234
if __name__ == "__main__":
235
izvor = Izvor()
236
izvor.window.show_all()
237
try:
238
izvor.loop.run_forever()
239
finally:
240
izvor.loop.close()
241