summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Vazgenovich Shakaryan <dvshakaryan@gmail.com>2026-05-13 21:45:03 -0700
committerDavid Vazgenovich Shakaryan <dvshakaryan@gmail.com>2026-05-13 21:45:03 -0700
commit14bffc1a908856d91e514dcca3e1899a3d03b8b2 (patch)
treea92ffc82bdcfa30d84a4bb5d91a8cfdae335983b
parentce0dbc0a9627368b40ab178a6792cd0395f3300f (diff)
downloadpanel-14bffc1a908856d91e514dcca3e1899a3d03b8b2.tar.gz
panel-14bffc1a908856d91e514dcca3e1899a3d03b8b2.tar.xz
use pyudev instead of udevadm
-rwxr-xr-xpanel.py119
1 files changed, 63 insertions, 56 deletions
diff --git a/panel.py b/panel.py
index 14349cf..f2c346f 100755
--- a/panel.py
+++ b/panel.py
@@ -2,8 +2,8 @@
#
# Copyright 2024 David Vazgenovich Shakaryan
-import os
import psutil
+import pyudev
import queue
import re
import subprocess
@@ -283,7 +283,46 @@ class ModHLWMTitle(ModHLWMBase):
self.refreshing = False
self.repaint()
+class UdevClient():
+ def __init__(self):
+ self.t = None
+ self.lock = threading.Lock()
+ self.subsystem_cbs = {}
+
+ def register(self, subsystem, cb):
+ self.subsystem_cbs.setdefault(subsystem, []).append(cb)
+
+ def run(self):
+ with self.lock:
+ if self.t is None:
+ self.context = pyudev.Context()
+ self.monitor = pyudev.Monitor.from_netlink(self.context)
+ for subsystem in self.subsystem_cbs:
+ self.monitor.filter_by(subsystem)
+
+ self.t = threading.Thread(target=self.work, daemon=True)
+ self.t.start()
+
+ def work(self):
+ for e in iter(self.monitor.poll, None):
+ for cb in self.subsystem_cbs[e.subsystem]:
+ cb(e)
+
+ def device_from_file(self, path):
+ try:
+ return pyudev.Devices.from_device_file(self.context, path)
+ except pyudev.DeviceNotFoundByFileError:
+ return None
+
+ def device_from_name(self, subsystem, name):
+ try:
+ return pyudev.Devices.from_name(self.context, subsystem, name)
+ except pyudev.DeviceNotFoundByNameError:
+ return None
+
class ModInputAvail(Mod):
+ udev_client = UdevClient()
+
def __init__(
self, path,
avail_text=None, avail_bg='#207000',
@@ -296,36 +335,20 @@ class ModInputAvail(Mod):
self.unavail_text = unavail_text
self.unavail_bg = unavail_bg
- def work(self):
- p = subprocess.Popen(
- ('udevadm', 'monitor', '-ups', 'input'),
- stdout=subprocess.PIPE, text=True)
-
- self._update_state(os.path.exists(self.path))
+ self.udev_client.register('input', self._process_event)
- building = False
- for line in iter(p.stdout.readline, ''):
- line = line.rstrip()
+ def run(self):
+ self.udev_client.run()
- if not building and line.startswith('UDEV'):
- building = True
- e = {}
- elif building:
- if '=' in line:
- k, v = line.split('=', 1)
- e[k] = v
- else:
- building = False
- if e:
- self._process_event(e)
+ self._update_state(
+ self.udev_client.device_from_file(self.path) is not None)
def _process_event(self, e):
- if (action := e.get('ACTION')) and (paths := e.get('DEVLINKS')):
- if self.path in paths.split(' '):
- if action == 'add':
- self._update_state(True)
- elif action == 'remove':
- self._update_state(False)
+ if (action := e.action) and self.path in e.device_links:
+ if action == 'add':
+ self._update_state(True)
+ elif action == 'remove':
+ self._update_state(False)
def _update_state(self, avail):
self.refreshing = True
@@ -339,6 +362,8 @@ class ModInputAvail(Mod):
self.repaint()
class ModBattery(Mod):
+ udev_client = UdevClient()
+
def __init__(self, battery, adapter=None, **kwargs):
super().__init__(**kwargs)
self.battery = battery
@@ -347,38 +372,20 @@ class ModBattery(Mod):
self._percent = None
self._charging = None
- def work(self):
- # FIXME copied from ModInputAvail. should abstract udev code.
- p = subprocess.Popen(
- ('udevadm', 'monitor', '-ups', 'power_supply'),
- stdout=subprocess.PIPE, text=True)
-
- with open(f'/sys/class/power_supply/{self.battery}/capacity') as f:
- percent = f.read().rstrip('\n')
- charging = None
- if self.adapter:
- with open(f'/sys/class/power_supply/{self.adapter}/online') as f:
- charging = (f.read().rstrip('\n') == '1')
- self._update_state(percent, charging)
+ self.udev_client.register('power_supply', self._process_event)
- building = False
- for line in iter(p.stdout.readline, ''):
- line = line.rstrip()
+ def run(self):
+ self.udev_client.run()
- if not building and line.startswith('UDEV'):
- building = True
- e = {}
- elif building:
- if '=' in line:
- k, v = line.split('=', 1)
- e[k] = v
- else:
- building = False
- if e:
- self._process_event(e)
+ self._process_event(
+ self.udev_client.device_from_name('power_supply', self.battery),
+ True)
+ self._process_event(
+ self.udev_client.device_from_name('power_supply', self.adapter),
+ True)
- def _process_event(self, e):
- if (e.get('ACTION') == 'change' and
+ def _process_event(self, e, force=False):
+ if ((force or e.action == 'change') and
(name := e.get('POWER_SUPPLY_NAME'))):
if (name == self.battery and
(cap := e.get('POWER_SUPPLY_CAPACITY'))):