diff options
| -rwxr-xr-x | panel.py | 119 |
1 files changed, 63 insertions, 56 deletions
@@ -2,8 +2,8 @@ # # Copyright 2024 David Vazgenovich Shakaryan -import os import psutil +import pyudev import queue import re import subprocess @@ -283,7 +283,46 @@ class ModHLWMTitle(ModHLWMBase): self.refreshing = False self.repaint() +class UdevClient(): + def __init__(self): + self.t = None + self.lock = threading.Lock() + self.subsystem_cbs = {} + + def register(self, subsystem, cb): + self.subsystem_cbs.setdefault(subsystem, []).append(cb) + + def run(self): + with self.lock: + if self.t is None: + self.context = pyudev.Context() + self.monitor = pyudev.Monitor.from_netlink(self.context) + for subsystem in self.subsystem_cbs: + self.monitor.filter_by(subsystem) + + self.t = threading.Thread(target=self.work, daemon=True) + self.t.start() + + def work(self): + for e in iter(self.monitor.poll, None): + for cb in self.subsystem_cbs[e.subsystem]: + cb(e) + + def device_from_file(self, path): + try: + return pyudev.Devices.from_device_file(self.context, path) + except pyudev.DeviceNotFoundByFileError: + return None + + def device_from_name(self, subsystem, name): + try: + return pyudev.Devices.from_name(self.context, subsystem, name) + except pyudev.DeviceNotFoundByNameError: + return None + class ModInputAvail(Mod): + udev_client = UdevClient() + def __init__( self, path, avail_text=None, avail_bg='#207000', @@ -296,36 +335,20 @@ class ModInputAvail(Mod): self.unavail_text = unavail_text self.unavail_bg = unavail_bg - def work(self): - p = subprocess.Popen( - ('udevadm', 'monitor', '-ups', 'input'), - stdout=subprocess.PIPE, text=True) - - self._update_state(os.path.exists(self.path)) + self.udev_client.register('input', self._process_event) - building = False - for line in iter(p.stdout.readline, ''): - line = line.rstrip() + def run(self): + self.udev_client.run() - if not building and line.startswith('UDEV'): - building = True - e = {} - elif building: - if '=' in line: - k, v = line.split('=', 1) - e[k] = v - else: - building = False - if e: - self._process_event(e) + self._update_state( + self.udev_client.device_from_file(self.path) is not None) def _process_event(self, e): - if (action := e.get('ACTION')) and (paths := e.get('DEVLINKS')): - if self.path in paths.split(' '): - if action == 'add': - self._update_state(True) - elif action == 'remove': - self._update_state(False) + if (action := e.action) and self.path in e.device_links: + if action == 'add': + self._update_state(True) + elif action == 'remove': + self._update_state(False) def _update_state(self, avail): self.refreshing = True @@ -339,6 +362,8 @@ class ModInputAvail(Mod): self.repaint() class ModBattery(Mod): + udev_client = UdevClient() + def __init__(self, battery, adapter=None, **kwargs): super().__init__(**kwargs) self.battery = battery @@ -347,38 +372,20 @@ class ModBattery(Mod): self._percent = None self._charging = None - def work(self): - # FIXME copied from ModInputAvail. should abstract udev code. - p = subprocess.Popen( - ('udevadm', 'monitor', '-ups', 'power_supply'), - stdout=subprocess.PIPE, text=True) - - with open(f'/sys/class/power_supply/{self.battery}/capacity') as f: - percent = f.read().rstrip('\n') - charging = None - if self.adapter: - with open(f'/sys/class/power_supply/{self.adapter}/online') as f: - charging = (f.read().rstrip('\n') == '1') - self._update_state(percent, charging) + self.udev_client.register('power_supply', self._process_event) - building = False - for line in iter(p.stdout.readline, ''): - line = line.rstrip() + def run(self): + self.udev_client.run() - if not building and line.startswith('UDEV'): - building = True - e = {} - elif building: - if '=' in line: - k, v = line.split('=', 1) - e[k] = v - else: - building = False - if e: - self._process_event(e) + self._process_event( + self.udev_client.device_from_name('power_supply', self.battery), + True) + self._process_event( + self.udev_client.device_from_name('power_supply', self.adapter), + True) - def _process_event(self, e): - if (e.get('ACTION') == 'change' and + def _process_event(self, e, force=False): + if ((force or e.action == 'change') and (name := e.get('POWER_SUPPLY_NAME'))): if (name == self.battery and (cap := e.get('POWER_SUPPLY_CAPACITY'))): |
