By using this site, you agree to have cookies stored on your device, strictly for functional purposes, such as storing your session and preferences.

Dismiss

 main.py

View raw Download
text/plain • 7.97 kiB
Python script, ASCII text executable
        
            
1
"""
2
Izvor - modular, GTK+ desktop search and launcher
3
Copyright (C) 2024 <root@roundabout-host.com>
4
5
This program is free software: you can redistribute it and/or modify
6
it under the terms of the GNU General Public License as published by
7
the Free Software Foundation, either version 3 of the License, or
8
(at your option) any later version.
9
10
This program is distributed in the hope that it will be useful,
11
but WITHOUT ANY WARRANTY; without even the implied warranty of
12
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13
GNU General Public License for more details.
14
15
You should have received a copy of the GNU General Public License
16
along with this program. If not, see <https://www.gnu.org/licenses/>.
17
"""
18
19
import os
20
import gi
21
import asyncio
22
import importlib
23
import gbulb
24
from pathlib import Path
25
from typing import AsyncIterable, AsyncIterator, Collection, TypeVar, Iterable, Callable, Any
26
27
gi.require_version("Gtk", "3.0")
28
from gi.repository import Gtk, Gdk, Pango
29
30
31
_T = TypeVar("_T")
32
33
ICON_SIZE = 32
34
35
36
async def merge_generators_with_params(generator_funcs: list[Callable[..., AsyncIterable]],
37
*args, **kwargs):
38
iterators = [gen_func(*args, **kwargs).__aiter__() for gen_func in generator_funcs]
39
tasks = {asyncio.create_task(it.__anext__()): it for it in iterators}
40
41
while tasks:
42
done, _ = await asyncio.wait(tasks.keys(), return_when=asyncio.FIRST_COMPLETED)
43
44
for task in done:
45
iterator = tasks.pop(task)
46
try:
47
result = task.result()
48
yield result
49
tasks[asyncio.create_task(iterator.__anext__())] = iterator
50
except StopAsyncIteration:
51
pass
52
53
54
class ResultInfoWidget(Gtk.ListBoxRow):
55
def __init__(self, name: str, description: str, image: tuple[str, Any], execute: Callable[[], None]):
56
super().__init__()
57
self.name = name
58
self.description = description
59
self.image = image
60
self.execute = execute
61
62
self.box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL, spacing=6)
63
self.add(self.box)
64
65
if image[0] == "logo":
66
self.image = Gtk.Image.new_from_icon_name(self.image[1], Gtk.IconSize.LARGE_TOOLBAR)
67
self.image.set_pixel_size(ICON_SIZE)
68
self.box.pack_start(self.image, False, False, 0)
69
70
self.label_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
71
self.box.pack_start(self.label_box, True, True, 0)
72
73
self.name_label = Gtk.Label(label=self.name)
74
self.name_label.set_line_wrap(True)
75
self.name_label.set_justify(Gtk.Justification.LEFT)
76
self.name_label.set_halign(Gtk.Align.START)
77
attributes = Pango.AttrList()
78
attributes.insert(Pango.AttrSize.new(16 * Pango.SCALE))
79
self.name_label.set_attributes(attributes)
80
self.label_box.pack_start(self.name_label, True, True, 0)
81
82
self.description_label = Gtk.Label(label=self.description)
83
self.description_label.set_line_wrap(True)
84
self.description_label.set_justify(Gtk.Justification.LEFT)
85
self.description_label.set_halign(Gtk.Align.START)
86
self.label_box.pack_start(self.description_label, True, True, 0)
87
88
89
class Izvor:
90
USER_CONFIGS = Path(os.getenv("XDG_CONFIG_HOME", Path.home() / ".config")) / "izvor"
91
USER_DATA = Path(os.getenv("XDG_DATA_HOME", Path.home() / ".local" / "share")) / "izvor"
92
USER_PROVIDERS = USER_DATA / "providers"
93
USER_PROVIDER_CONFIGS = USER_CONFIGS / "providers"
94
SYSTEM_CONFIGS = Path("/etc/izvor")
95
SYSTEM_DATA = Path("/usr/share/izvor")
96
SYSTEM_PROVIDERS = SYSTEM_DATA / "providers"
97
SYSTEM_PROVIDER_CONFIGS = SYSTEM_CONFIGS / "providers"
98
99
def __init__(self):
100
gbulb.install(gtk=True)
101
self.loop = asyncio.get_event_loop()
102
self.builder = Gtk.Builder()
103
self.builder.add_from_file("izvor.ui")
104
self.window = self.builder.get_object("root")
105
self.window.connect("destroy", self.kill)
106
self.window.connect("key-press-event", self.check_escape)
107
108
if not os.getenv("XDG_DATA_HOME"):
109
print("XDG_DATA_HOME is not set. Using default path.")
110
if not os.getenv("XDG_CONFIG_HOME"):
111
print("XDG_CONFIG_HOME is not set. Using default path.")
112
if not self.USER_PROVIDER_CONFIGS.exists():
113
print("Creating user config directory.")
114
self.USER_PROVIDER_CONFIGS.mkdir(parents=True)
115
if not self.USER_PROVIDERS.exists():
116
print("Creating user data directory.")
117
self.USER_PROVIDERS.mkdir(parents=True)
118
119
self.available_providers = {}
120
121
if self.SYSTEM_PROVIDERS.exists():
122
for provider in self.SYSTEM_PROVIDERS.iterdir():
123
if not (provider / "__init__.py").exists():
124
continue
125
self.available_providers[provider.stem] = provider
126
if self.USER_PROVIDERS.exists():
127
for provider in self.USER_PROVIDERS.iterdir():
128
if not (provider / "__init__.py").exists():
129
continue
130
self.available_providers[provider.stem] = provider
131
132
print(f"Available providers: {list(self.available_providers.keys())}")
133
134
self.enabled_providers = self.available_providers.copy()
135
136
providers_menu = self.builder.get_object("providers-menu")
137
138
self.providers = []
139
self.provider_checkboxes = {}
140
141
for provider in self.enabled_providers.values():
142
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
143
module = importlib.util.module_from_spec(spec)
144
spec.loader.exec_module(module)
145
loaded_provider = module.Provider({})
146
self.providers.append(loaded_provider)
147
print(f"Loaded provider: {loaded_provider.name}")
148
print(loaded_provider.description)
149
self.provider_checkboxes[loaded_provider.name] = Gtk.CheckMenuItem.new_with_label(loaded_provider.name)
150
providers_menu.append(self.provider_checkboxes[loaded_provider.name])
151
self.provider_checkboxes[loaded_provider.name].set_active(True)
152
self.provider_checkboxes[loaded_provider.name].show()
153
154
def call_update_results(widget):
155
asyncio.create_task(self.update_results(widget))
156
157
def execute_result(widget, row):
158
row.execute()
159
self.kill()
160
161
self.builder.connect_signals(
162
{
163
"update-search": call_update_results,
164
"result-activated": execute_result,
165
}
166
)
167
168
def kill(self, widget=None):
169
self.loop.stop()
170
self.window.destroy()
171
172
def check_escape(self, widget, event):
173
if event.keyval == 65307:
174
self.kill()
175
176
async def update_results(self, widget):
177
print("Updating results...")
178
generators = []
179
query = self.builder.get_object("search-query-buffer").get_text()
180
# print(f"Query: {query}")
181
for provider in self.enabled_providers.values():
182
spec = importlib.util.spec_from_file_location(provider.stem, provider / "__init__.py")
183
module = importlib.util.module_from_spec(spec)
184
spec.loader.exec_module(module)
185
loaded_provider = module.Provider({})
186
# print(f"Searching with provider: {loaded_provider.name}")
187
188
generators.append(loaded_provider.search)
189
190
# Clear the results list
191
results_list = self.builder.get_object("results-list")
192
for row in results_list.get_children():
193
results_list.remove(row)
194
195
async for result in merge_generators_with_params(generators, query):
196
# print(result)
197
result_box = ResultInfoWidget(result["name"], result["description"], result["image"], result["execute"])
198
results_list.add(result_box)
199
result_box.show_all()
200
201
results_list.show_all()
202
203
204
if __name__ == "__main__":
205
izvor = Izvor()
206
izvor.window.show_all()
207
try:
208
izvor.loop.run_forever()
209
finally:
210
izvor.loop.close()
211