By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 main.py

View raw Download
text/plain • 17.3 kiB
Python script, ASCII text executable
        
            
1
"""
2
Izvor - modular, GTK+ desktop search and launcher
3
Copyright (C) 2024 <root@roundabout-host.com>
4
5
This program is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with this program. If not, see <https://www.gnu.org/licenses/>.
17
"""
18
19
import os
20
import gi
21
import asyncio
22
import importlib
23
import gbulb
24
from pathlib import Path
25
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
26
27
from xdg.Config import icon_size
28
29
gi.require_version("Gtk", "3.0")
30
from gi.repository import Gtk, Gdk, Pango, GLib
31
import gettext
32
import json
33
34
_ = gettext.gettext
35
36
_T = TypeVar("_T")
37
38
39
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
40
*args, **kwargs):
41
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
42
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
43
44
while tasks:
45
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
46
47
for task in done:
48
iterator = tasks.pop(task)
49
try:
50
result = task.result()
51
yield result
52
tasks[asyncio.create_task(iterator.__anext__())] = iterator
53
except StopAsyncIteration:
54
pass
55
56
57
class ResultInfoWidget(Gtk.ListBoxRow):
58
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None], icon_size=32, show_description=True):
59
super().__init__()
60
self.name = name
61
self.description = description
62
self.image = image
63
self.execute = execute
64
65
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
66
self.add(self.box)
67
68
if icon_size:
69
# If the icon size is 0, have no icons
70
if image[0] == "logo":
71
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
72
self.image.set_pixel_size(icon_size)
73
self.box.pack_start(self.image, False, False, 0)
74
75
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
76
self.box.pack_start(self.label_box, True, True, 0)
77
78
self.name_label = Gtk.Label(label=self.name)
79
self.name_label.set_line_wrap(True)
80
self.name_label.set_justify(Gtk.Justification.LEFT)
81
self.name_label.set_halign(Gtk.Align.START)
82
attributes = Pango.AttrList()
83
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
84
self.name_label.set_attributes(attributes)
85
self.label_box.pack_start(self.name_label, True, True, 0)
86
87
if show_description:
88
self.description_label = Gtk.Label(label=self.description)
89
self.description_label.set_line_wrap(True)
90
self.description_label.set_justify(Gtk.Justification.LEFT)
91
self.description_label.set_halign(Gtk.Align.START)
92
self.label_box.pack_start(self.description_label, True, True, 0)
93
94
class Izvor(Gtk.Application):
95
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
96
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
97
USER_PROVIDERS = USER_DATA / "providers"
98
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
99
SYSTEM_CONFIGS = Path("/etc/izvor")
100
SYSTEM_DATA = Path("/usr/share/izvor")
101
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
102
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
103
ENABLED_PROVIDERS_FILE = USER_CONFIGS / "enabled_providers"
104
APPEARANCE_CONFIG_FILE = USER_CONFIGS / "appearance.json"
105
106
def __init__(self):
107
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
108
gbulb.install(gtk=True)
109
self.loop = asyncio.get_event_loop()
110
self.builder = Gtk.Builder()
111
self.builder.add_from_file("izvor.ui")
112
self.window = self.builder.get_object("root")
113
self.window.connect("destroy", self.kill)
114
self.window.connect("key-press-event", self.check_escape)
115
self.window.set_title(_("Launcher"))
116
117
if not os.getenv("XDG_DATA_HOME"):
118
print("XDG_DATA_HOME is not set. Using default path.")
119
if not os.getenv("XDG_CONFIG_HOME"):
120
print("XDG_CONFIG_HOME is not set. Using default path.")
121
if not self.USER_PROVIDER_CONFIGS.exists():
122
print("Creating user config directory.")
123
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
124
if not self.USER_PROVIDERS.exists():
125
print("Creating user data directory.")
126
self.USER_PROVIDERS.mkdir(parents=True)
127
128
self.available_providers = {}
129
130
if self.SYSTEM_PROVIDERS.exists():
131
for provider in self.SYSTEM_PROVIDERS.iterdir():
132
if not (provider / "__init__.py").exists():
133
continue
134
self.available_providers[provider.stem] = provider
135
if self.USER_PROVIDERS.exists():
136
for provider in self.USER_PROVIDERS.iterdir():
137
if not (provider / "__init__.py").exists():
138
continue
139
self.available_providers[provider.stem] = provider
140
141
print(f"Available providers: {list(self.available_providers.keys())}")
142
143
if not self.ENABLED_PROVIDERS_FILE.exists():
144
self.ENABLED_PROVIDERS_FILE.touch()
145
146
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
147
for provider in self.available_providers:
148
f.write(provider + "\n")
149
150
if not self.APPEARANCE_CONFIG_FILE.exists():
151
self.APPEARANCE_CONFIG_FILE.touch()
152
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
153
json.dump({"icon_size": 32, "show_result_descriptions": True}, f)
154
155
with open(self.APPEARANCE_CONFIG_FILE, "r") as f:
156
appearance_config = json.load(f)
157
self.icon_size = appearance_config.get("icon_size", 32)
158
self.show_result_descriptions = appearance_config.get("show_result_descriptions", True)
159
160
self.permanently_enabled_providers = {}
161
162
with open(self.ENABLED_PROVIDERS_FILE, "r") as f:
163
for line in f:
164
provider = line.strip()
165
if provider in self.available_providers:
166
self.permanently_enabled_providers[provider] = self.available_providers[provider]
167
168
self.enabled_providers = self.permanently_enabled_providers.copy()
169
170
providers_menu = self.builder.get_object("providers-menu")
171
172
self.providers = []
173
self.provider_checkboxes = {}
174
175
for provider in self.permanently_enabled_providers.values():
176
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
177
module = importlib.util.module_from_spec(spec)
178
spec.loader.exec_module(module)
179
loaded_provider = module.Provider({})
180
self.providers.append(loaded_provider)
181
print(f"Loaded provider: {loaded_provider.name}")
182
print(loaded_provider.description)
183
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
184
providers_menu.append(self.provider_checkboxes[provider.stem])
185
self.provider_checkboxes[provider.stem].set_active(True)
186
self.provider_checkboxes[provider.stem].show()
187
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
188
189
self.update_task = None
190
191
def call_update_results(widget):
192
if self.update_task is not None:
193
self.update_task.cancel()
194
self.update_task = asyncio.create_task(self.update_results(widget))
195
196
def execute_result(widget, row):
197
row.execute()
198
self.kill()
199
200
self.builder.connect_signals(
201
{
202
"update-search": call_update_results,
203
"result-activated": execute_result,
204
"providers-menu-all-toggled": self.update_enabled_providers_all,
205
"menu-about": self.about,
206
"menu-providers": self.provider_menu,
207
"menu-preferences": self.preferences,
208
}
209
)
210
211
GLib.idle_add(self.update_enabled_providers, None)
212
213
def update_enabled_providers(self, widget=None):
214
for provider, checkbox in self.provider_checkboxes.items():
215
if checkbox.get_active():
216
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
217
else:
218
self.enabled_providers.pop(provider, None)
219
220
if self.update_task is not None:
221
self.update_task.cancel()
222
self.update_task = asyncio.create_task(self.update_results(widget))
223
224
def update_enabled_providers_all(self, widget):
225
for checkbox in self.provider_checkboxes.values():
226
checkbox.set_active(widget.get_active())
227
self.update_enabled_providers()
228
229
def kill(self, widget=None):
230
self.loop.stop()
231
self.quit()
232
233
def check_escape(self, widget, event):
234
results_list = self.builder.get_object("results-list")
235
search_entry = self.builder.get_object("search-query")
236
rows = results_list.get_children()
237
current_row = results_list.get_selected_row()
238
239
if event.keyval == Gdk.KEY_Escape:
240
self.kill()
241
return True
242
243
if event.keyval == Gdk.KEY_Down:
244
# Move to the next row
245
if current_row:
246
current_index = rows.index(current_row)
247
next_index = (current_index + 1) % len(rows) # Wrap to the beginning if at the end
248
else:
249
next_index = 0
250
251
results_list.select_row(rows[next_index])
252
search_entry.grab_focus() # Refocus the search entry
253
self.scroll_to_row(rows[next_index], results_list.get_adjustment())
254
return True
255
256
if event.keyval == Gdk.KEY_Up:
257
# Move to the previous row
258
if current_row:
259
current_index = rows.index(current_row)
260
prev_index = (current_index - 1) % len(rows) # Wrap to the end if at the beginning
261
else:
262
prev_index = len(rows) - 1
263
264
results_list.select_row(rows[prev_index])
265
search_entry.grab_focus() # Refocus the search entry
266
self.scroll_to_row(rows[prev_index], results_list.get_adjustment())
267
return True
268
269
if event.keyval == Gdk.KEY_Return:
270
if current_row:
271
# Execute the selected row
272
current_row.activate()
273
return True
274
elif len(rows) > 0:
275
# Execute the first row
276
rows[0].activate()
277
return True
278
279
return False
280
281
def scroll_to_row(self, row, adjustment):
282
row_geometry = row.get_allocation()
283
row_y = row_geometry.y
284
row_height = row_geometry.height
285
adjustment_value = adjustment.get_value()
286
adjustment_upper = adjustment.get_upper() - adjustment.get_page_size()
287
288
if row_y + row_height > adjustment_value + adjustment.get_page_size():
289
adjustment.set_value(min(row_y + row_height - adjustment.get_page_size(), adjustment_upper))
290
elif row_y < adjustment_value:
291
adjustment.set_value(max(row_y, 0))
292
293
def about(self, widget):
294
about_builder = Gtk.Builder()
295
about_builder.add_from_file("about.ui")
296
about_window = about_builder.get_object("about-dialog")
297
about_window.connect("destroy", lambda _: about_window.destroy())
298
about_window.connect("close", lambda _: about_window.destroy())
299
about_window.connect("response", lambda _, __: about_window.destroy())
300
about_window.show_all()
301
302
def provider_menu(self, widget):
303
providers_builder = Gtk.Builder()
304
providers_builder.add_from_file("providers.ui")
305
providers_window = providers_builder.get_object("providers-window")
306
providers_window.connect("destroy", lambda _: providers_window.destroy())
307
308
providers_table = providers_builder.get_object("providers-table") # actually Gtk.Grid
309
rows = 0
310
for provider in self.available_providers.values():
311
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
312
module = importlib.util.module_from_spec(spec)
313
spec.loader.exec_module(module)
314
loaded_provider = module.Provider({})
315
316
provider_label = Gtk.Label(label=loaded_provider.name)
317
provider_label.set_halign(Gtk.Align.START)
318
provider_label.set_valign(Gtk.Align.CENTER)
319
font_desc = Pango.FontDescription()
320
font_desc.set_weight(Pango.Weight.BOLD)
321
provider_label.override_font(font_desc)
322
provider_label.set_line_wrap(True)
323
provider_label.set_justify(Gtk.Justification.LEFT)
324
325
provider_description = Gtk.Label(label=loaded_provider.description)
326
provider_description.set_halign(Gtk.Align.START)
327
provider_description.set_valign(Gtk.Align.CENTER)
328
provider_description.set_justify(Gtk.Justification.LEFT)
329
provider_description.set_line_wrap(True)
330
331
icon = Gtk.Image.new_from_icon_name(loaded_provider.icon, Gtk.IconSize.LARGE_TOOLBAR)
332
icon.set_pixel_size(self.icon_size)
333
334
switch = Gtk.Switch()
335
switch.set_valign(Gtk.Align.CENTER)
336
switch.set_active(provider.stem in self.permanently_enabled_providers)
337
switch.connect("notify::active", self.update_permanently_enabled_providers, provider.stem)
338
339
providers_table.attach(icon, 0, rows, 1, 1)
340
providers_table.attach(provider_label, 1, rows, 1, 1)
341
providers_table.attach(provider_description, 2, rows, 1, 1)
342
providers_table.attach(switch, 3, rows, 1, 1)
343
344
provider_label.show()
345
provider_description.show()
346
347
rows += 1
348
349
providers_window.show_all()
350
351
def preferences(self, widget):
352
preferences_builder = Gtk.Builder()
353
preferences_builder.add_from_file("preferences.ui")
354
preferences_window = preferences_builder.get_object("preferences-window")
355
preferences_window.connect("destroy", lambda _: preferences_window.destroy())
356
357
def update_appearance_preferences(widget, *args):
358
self.icon_size = preferences_builder.get_object("icon-size").get_value()
359
self.show_result_descriptions = preferences_builder.get_object("show-result-descriptions").get_active()
360
361
with open(self.APPEARANCE_CONFIG_FILE, "w") as f:
362
json.dump({"icon_size": self.icon_size, "show_result_descriptions": self.show_result_descriptions}, f)
363
364
# Regenerate the results to reflect the changes
365
if self.update_task is not None:
366
self.update_task.cancel()
367
self.update_task = asyncio.create_task(self.update_results(widget))
368
369
preferences_builder.connect_signals(
370
{
371
"update-appearance": update_appearance_preferences,
372
}
373
)
374
375
preferences_window.show_all()
376
377
def update_permanently_enabled_providers(self, widget, param, provider):
378
if widget.get_active():
379
self.permanently_enabled_providers[provider] = self.available_providers[provider]
380
else:
381
self.permanently_enabled_providers.pop(provider, None)
382
383
with open(self.ENABLED_PROVIDERS_FILE, "w") as f:
384
f.write("\n".join(self.permanently_enabled_providers.keys()))
385
386
387
async def update_results(self, widget):
388
print("Updating results...")
389
spinner = self.builder.get_object("spinner")
390
spinner.start()
391
generators = []
392
query = self.builder.get_object("search-query-buffer").get_text()
393
# print(f"Query: {query}")
394
for provider in self.enabled_providers.values():
395
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
396
module = importlib.util.module_from_spec(spec)
397
spec.loader.exec_module(module)
398
loaded_provider = module.Provider({})
399
# print(f"Searching with provider: {loaded_provider.name}")
400
401
generators.append(loaded_provider.search)
402
403
# Clear the results list
404
results_list = self.builder.get_object("results-list")
405
for row in results_list.get_children():
406
results_list.remove(row)
407
408
async for result in merge_generators_with_params(generators, query):
409
# print(result)
410
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"], self.icon_size, self.show_result_descriptions)
411
results_list.add(result_box)
412
result_box.show_all()
413
414
results_list.show_all()
415
spinner.stop()
416
417
418
if __name__ == "__main__":
419
izvor = Izvor()
420
izvor.window.show_all()
421
try:
422
izvor.loop.run_forever()
423
finally:
424
izvor.loop.close()
425