By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 main.py

View raw Download
text/plain • 8.06 kiB
Python script, ASCII text executable
        
            
1
"""
2
Izvor - modular, GTK+ desktop search and launcher
3
Copyright (C) 2024 <root@roundabout-host.com>
4
5
This program is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with this program. If not, see <https://www.gnu.org/licenses/>.
17
"""
18
19
import os
20
import gi
21
import asyncio
22
import importlib
23
import gbulb
24
from pathlib import Path
25
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
26
27
gi.require_version("Gtk", "3.0")
28
from gi.repository import Gtk, Gdk, Pango
29
30
31
_T = TypeVar("_T")
32
33
ICON_SIZE = 32
34
35
36
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
37
*args, **kwargs):
38
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
39
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
40
41
while tasks:
42
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
43
44
for task in done:
45
iterator = tasks.pop(task)
46
try:
47
result = task.result()
48
yield result
49
tasks[asyncio.create_task(iterator.__anext__())] = iterator
50
except StopAsyncIteration:
51
pass
52
53
54
class ResultInfoWidget(Gtk.ListBoxRow):
55
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None]):
56
super().__init__()
57
self.name = name
58
self.description = description
59
self.image = image
60
self.execute = execute
61
62
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
63
self.add(self.box)
64
65
if image[0] == "logo":
66
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
67
self.image.set_pixel_size(ICON_SIZE)
68
self.box.pack_start(self.image, False, False, 0)
69
70
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
71
self.box.pack_start(self.label_box, True, True, 0)
72
73
self.name_label = Gtk.Label(label=self.name)
74
self.name_label.set_line_wrap(True)
75
self.name_label.set_justify(Gtk.Justification.LEFT)
76
self.name_label.set_halign(Gtk.Align.START)
77
attributes = Pango.AttrList()
78
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
79
self.name_label.set_attributes(attributes)
80
self.label_box.pack_start(self.name_label, True, True, 0)
81
82
self.description_label = Gtk.Label(label=self.description)
83
self.description_label.set_line_wrap(True)
84
self.description_label.set_justify(Gtk.Justification.LEFT)
85
self.description_label.set_halign(Gtk.Align.START)
86
self.label_box.pack_start(self.description_label, True, True, 0)
87
88
89
class Izvor(Gtk.Application):
90
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
91
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
92
USER_PROVIDERS = USER_DATA / "providers"
93
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
94
SYSTEM_CONFIGS = Path("/etc/izvor")
95
SYSTEM_DATA = Path("/usr/share/izvor")
96
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
97
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
98
99
def __init__(self):
100
super().__init__(application_id="com.roundabout-host.roundabout.izvor")
101
gbulb.install(gtk=True)
102
self.loop = asyncio.get_event_loop()
103
self.builder = Gtk.Builder()
104
self.builder.add_from_file("izvor.ui")
105
self.window = self.builder.get_object("root")
106
self.window.connect("destroy", self.kill)
107
self.window.connect("key-press-event", self.check_escape)
108
109
if not os.getenv("XDG_DATA_HOME"):
110
print("XDG_DATA_HOME is not set. Using default path.")
111
if not os.getenv("XDG_CONFIG_HOME"):
112
print("XDG_CONFIG_HOME is not set. Using default path.")
113
if not self.USER_PROVIDER_CONFIGS.exists():
114
print("Creating user config directory.")
115
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
116
if not self.USER_PROVIDERS.exists():
117
print("Creating user data directory.")
118
self.USER_PROVIDERS.mkdir(parents=True)
119
120
self.available_providers = {}
121
122
if self.SYSTEM_PROVIDERS.exists():
123
for provider in self.SYSTEM_PROVIDERS.iterdir():
124
if not (provider / "__init__.py").exists():
125
continue
126
self.available_providers[provider.stem] = provider
127
if self.USER_PROVIDERS.exists():
128
for provider in self.USER_PROVIDERS.iterdir():
129
if not (provider / "__init__.py").exists():
130
continue
131
self.available_providers[provider.stem] = provider
132
133
print(f"Available providers: {list(self.available_providers.keys())}")
134
135
self.enabled_providers = self.available_providers.copy()
136
137
providers_menu = self.builder.get_object("providers-menu")
138
139
self.providers = []
140
self.provider_checkboxes = {}
141
142
for provider in self.enabled_providers.values():
143
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
144
module = importlib.util.module_from_spec(spec)
145
spec.loader.exec_module(module)
146
loaded_provider = module.Provider({})
147
self.providers.append(loaded_provider)
148
print(f"Loaded provider: {loaded_provider.name}")
149
print(loaded_provider.description)
150
self.provider_checkboxes[loaded_provider.name] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
151
providers_menu.append(self.provider_checkboxes[loaded_provider.name])
152
self.provider_checkboxes[loaded_provider.name].set_active(True)
153
self.provider_checkboxes[loaded_provider.name].show()
154
155
def call_update_results(widget):
156
asyncio.create_task(self.update_results(widget))
157
158
def execute_result(widget, row):
159
row.execute()
160
self.kill()
161
162
self.builder.connect_signals(
163
{
164
"update-search": call_update_results,
165
"result-activated": execute_result,
166
}
167
)
168
169
def kill(self, widget=None):
170
self.loop.stop()
171
self.quit()
172
173
def check_escape(self, widget, event):
174
if event.keyval == 65307:
175
self.kill()
176
177
async def update_results(self, widget):
178
print("Updating results...")
179
generators = []
180
query = self.builder.get_object("search-query-buffer").get_text()
181
# print(f"Query: {query}")
182
for provider in self.enabled_providers.values():
183
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
184
module = importlib.util.module_from_spec(spec)
185
spec.loader.exec_module(module)
186
loaded_provider = module.Provider({})
187
# print(f"Searching with provider: {loaded_provider.name}")
188
189
generators.append(loaded_provider.search)
190
191
# Clear the results list
192
results_list = self.builder.get_object("results-list")
193
for row in results_list.get_children():
194
results_list.remove(row)
195
196
async for result in merge_generators_with_params(generators, query):
197
# print(result)
198
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"])
199
results_list.add(result_box)
200
result_box.show_all()
201
202
results_list.show_all()
203
204
205
if __name__ == "__main__":
206
izvor = Izvor()
207
izvor.window.show_all()
208
try:
209
izvor.loop.run_forever()
210
finally:
211
izvor.loop.close()
212