By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 main.py

View raw Download
text/plain • 8.94 kiB
Python script, ASCII text executable
        
            
1
"""
2
Izvor - modular, GTK+ desktop search and launcher
3
Copyright (C) 2024 <root@roundabout-host.com>
4
5
This program is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with this program. If not, see <https://www.gnu.org/licenses/>.
17
"""
18
19
import os
20
import gi
21
import asyncio
22
import importlib
23
import gbulb
24
from pathlib import Path
25
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
26
27
gi.require_version("Gtk", "3.0")
28
from gi.repository import Gtk, Gdk, Pango, GLib
29
30
31
_T = TypeVar("_T")
32
33
ICON_SIZE = 32
34
35
36
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
37
*args, **kwargs):
38
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
39
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
40
41
while tasks:
42
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
43
44
for task in done:
45
iterator = tasks.pop(task)
46
try:
47
result = task.result()
48
yield result
49
tasks[asyncio.create_task(iterator.__anext__())] = iterator
50
except StopAsyncIteration:
51
pass
52
53
54
class ResultInfoWidget(Gtk.ListBoxRow):
55
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None]):
56
super().__init__()
57
self.name = name
58
self.description = description
59
self.image = image
60
self.execute = execute
61
62
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
63
self.add(self.box)
64
65
if image[0] == "logo":
66
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
67
self.image.set_pixel_size(ICON_SIZE)
68
self.box.pack_start(self.image, False, False, 0)
69
70
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
71
self.box.pack_start(self.label_box, True, True, 0)
72
73
self.name_label = Gtk.Label(label=self.name)
74
self.name_label.set_line_wrap(True)
75
self.name_label.set_justify(Gtk.Justification.LEFT)
76
self.name_label.set_halign(Gtk.Align.START)
77
attributes = Pango.AttrList()
78
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
79
self.name_label.set_attributes(attributes)
80
self.label_box.pack_start(self.name_label, True, True, 0)
81
82
self.description_label = Gtk.Label(label=self.description)
83
self.description_label.set_line_wrap(True)
84
self.description_label.set_justify(Gtk.Justification.LEFT)
85
self.description_label.set_halign(Gtk.Align.START)
86
self.label_box.pack_start(self.description_label, True, True, 0)
87
88
89
class Izvor(Gtk.Application):
90
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
91
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
92
USER_PROVIDERS = USER_DATA / "providers"
93
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
94
SYSTEM_CONFIGS = Path("/etc/izvor")
95
SYSTEM_DATA = Path("/usr/share/izvor")
96
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
97
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
98
99
def __init__(self):
100
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
101
gbulb.install(gtk=True)
102
self.loop = asyncio.get_event_loop()
103
self.builder = Gtk.Builder()
104
self.builder.add_from_file("izvor.ui")
105
self.window = self.builder.get_object("root")
106
self.window.connect("destroy", self.kill)
107
self.window.connect("key-press-event", self.check_escape)
108
109
if not os.getenv("XDG_DATA_HOME"):
110
print("XDG_DATA_HOME is not set. Using default path.")
111
if not os.getenv("XDG_CONFIG_HOME"):
112
print("XDG_CONFIG_HOME is not set. Using default path.")
113
if not self.USER_PROVIDER_CONFIGS.exists():
114
print("Creating user config directory.")
115
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
116
if not self.USER_PROVIDERS.exists():
117
print("Creating user data directory.")
118
self.USER_PROVIDERS.mkdir(parents=True)
119
120
self.available_providers = {}
121
122
if self.SYSTEM_PROVIDERS.exists():
123
for provider in self.SYSTEM_PROVIDERS.iterdir():
124
if not (provider / "__init__.py").exists():
125
continue
126
self.available_providers[provider.stem] = provider
127
if self.USER_PROVIDERS.exists():
128
for provider in self.USER_PROVIDERS.iterdir():
129
if not (provider / "__init__.py").exists():
130
continue
131
self.available_providers[provider.stem] = provider
132
133
print(f"Available providers: {list(self.available_providers.keys())}")
134
135
self.permanently_enabled_providers = self.available_providers.copy()
136
self.enabled_providers = self.permanently_enabled_providers.copy()
137
138
providers_menu = self.builder.get_object("providers-menu")
139
140
self.providers = []
141
self.provider_checkboxes = {}
142
143
for provider in self.enabled_providers.values():
144
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
145
module = importlib.util.module_from_spec(spec)
146
spec.loader.exec_module(module)
147
loaded_provider = module.Provider({})
148
self.providers.append(loaded_provider)
149
print(f"Loaded provider: {loaded_provider.name}")
150
print(loaded_provider.description)
151
self.provider_checkboxes[provider.stem] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
152
providers_menu.append(self.provider_checkboxes[provider.stem])
153
self.provider_checkboxes[provider.stem].set_active(True)
154
self.provider_checkboxes[provider.stem].show()
155
self.provider_checkboxes[provider.stem].connect("toggled", self.update_enabled_providers)
156
157
def call_update_results(widget):
158
asyncio.create_task(self.update_results(widget))
159
160
def execute_result(widget, row):
161
row.execute()
162
self.kill()
163
164
self.builder.connect_signals(
165
{
166
"update-search": call_update_results,
167
"result-activated": execute_result,
168
"providers-menu-all-toggled": self.update_enabled_providers_all,
169
}
170
)
171
172
GLib.idle_add(self.update_enabled_providers, None)
173
174
def update_enabled_providers(self, widget=None):
175
for provider, checkbox in self.provider_checkboxes.items():
176
if checkbox.get_active():
177
self.enabled_providers[provider] = self.permanently_enabled_providers[provider]
178
else:
179
self.enabled_providers.pop(provider, None)
180
181
asyncio.create_task(self.update_results(None))
182
183
def update_enabled_providers_all(self, widget):
184
for checkbox in self.provider_checkboxes.values():
185
checkbox.set_active(widget.get_active())
186
self.update_enabled_providers()
187
188
def kill(self, widget=None):
189
self.loop.stop()
190
self.quit()
191
192
def check_escape(self, widget, event):
193
if event.keyval == 65307:
194
self.kill()
195
196
async def update_results(self, widget):
197
print("Updating results...")
198
generators = []
199
query = self.builder.get_object("search-query-buffer").get_text()
200
# print(f"Query: {query}")
201
for provider in self.enabled_providers.values():
202
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
203
module = importlib.util.module_from_spec(spec)
204
spec.loader.exec_module(module)
205
loaded_provider = module.Provider({})
206
# print(f"Searching with provider: {loaded_provider.name}")
207
208
generators.append(loaded_provider.search)
209
210
# Clear the results list
211
results_list = self.builder.get_object("results-list")
212
for row in results_list.get_children():
213
results_list.remove(row)
214
215
async for result in merge_generators_with_params(generators, query):
216
# print(result)
217
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"])
218
results_list.add(result_box)
219
result_box.show_all()
220
221
results_list.show_all()
222
223
224
if __name__ == "__main__":
225
izvor = Izvor()
226
izvor.window.show_all()
227
try:
228
izvor.loop.run_forever()
229
finally:
230
izvor.loop.close()
231