#!/usr/bin/env python3 # # Copyright 2024 David Vazgenovich Shakaryan import os import psutil import queue import re import subprocess import threading import time from datetime import datetime from zoneinfo import ZoneInfo class Fmt: @classmethod def fg(cls, col, s): return f'%{{F{col}}}{s}%{{F-}}' @classmethod def bg(cls, col, s): return f'%{{B{col}}}{s}%{{B-}}' @classmethod def ul(cls, col, s): return f'%{{U{col}}}%{{+u}}{s}%{{-u}}%{{U-}}' @classmethod def bold(cls, s): return f'%{{T2}}{s}%{{T-}}' @classmethod def label(cls, s): return cls.fg('#7b51ca', s) @classmethod def labelled(cls, label, s): return cls.label(label) + cls.spacer() + s @classmethod def spacer(cls, n=1): return f'%{{O{n * 7}}}' @classmethod def pad(cls, s, n=1): sp = cls.spacer(n) return f'{sp}{s}{sp}' @classmethod def clickable(cls, btn, cmd, s): return f'%{{A{btn}:{cmd}:}}{s}%{{A}}' class Mod: def __init__(self, spacing=1, spacing_l=None, spacing_r=None): self.e_repaint = None self.e_flush = None self.refreshing = False self.spacing_l = spacing_l if spacing_l is not None else spacing self.spacing_r = spacing_r if spacing_r is not None else spacing # changes should be applied atomically, as the main thread may read # this at any time. self.out = None def repaint(self, flush=False): if flush: self.e_flush.set() self.e_repaint.set() def run(self): if callable(getattr(self, 'work', None)): threading.Thread(target=self.work, daemon=True).start() def to_paint(self): if (buf := self.out): if self.spacing_l: buf = Fmt.spacer(self.spacing_l) + buf if self.spacing_r: buf += Fmt.spacer(self.spacing_r) return buf def process_cmd(self, cmd): pass class ModText(Mod): def __init__(self, text, fg=None, bg=None, padding=0, **kwargs): super().__init__(**kwargs) buf = text if padding: buf = Fmt.pad(buf, padding) if fg: buf = Fmt.fg(fg, buf) if bg: buf = Fmt.bg(bg, buf) self.out = buf class ModCentre(Mod): def __init__(self, **kwargs): kwargs.setdefault('spacing', 0) super().__init__(**kwargs) self.out = '%{c}' class ModRight(Mod): def __init__(self, **kwargs): kwargs.setdefault('spacing', 0) super().__init__(**kwargs) self.out = '%{r}' class ModSpacer(Mod): def __init__(self, n=1, **kwargs): kwargs.setdefault('spacing', 0) super().__init__(**kwargs) self.out = Fmt.spacer(n) class ModDate(Mod): def __init__(self, fmts=None, tzs=None, **kwargs): super().__init__(**kwargs) self.e = threading.Event() self.fmts = fmts or ('%H:%M',) self.fmt = 0 self.tzs = tzs or ({'id': None},) self.tz = 0 def work(self): while True: self.refreshing = True fmt = self.fmts[self.fmt] tz = self.tzs[self.tz] tz_id = tz['id'] tz_label = tz.get('label') dt = datetime.now().astimezone(ZoneInfo(tz_id) if tz_id else None) label = tz_label or dt.strftime('%Z') self.out = Fmt.clickable( 4, f'{id(self)} tz +1', Fmt.clickable( 5, f'{id(self)} tz -1', Fmt.clickable('', f'{id(self)} tz', Fmt.label(label)) + Fmt.spacer() + Fmt.clickable( '', f'{id(self)} fmt', dt.strftime(fmt)))) self.refreshing = False self.repaint(flush=True) if (t := 1 - (dt.microsecond / 1000000)) > 0: self.e.wait(t) self.e.clear() def process_cmd(self, cmd): if cmd == 'fmt': self.fmt = (self.fmt + 1) % len(self.fmts) self.e.set() elif cmd.startswith('tz'): self.tz = (self.tz + int(cmd[3:] or 1)) % len(self.tzs) self.e.set() class HLWMClient(): def __init__(self): self.t = None self.lock = threading.Lock() self.listeners = {} def run(self): with self.lock: if self.t is None: self.t = threading.Thread(target=self.work, daemon=True) self.t.start() def work(self): p = subprocess.Popen( ('herbstclient', '--idle'), stdout=subprocess.PIPE, text=True) for line in iter(p.stdout.readline, ''): for hook, arr in self.listeners.items(): if line.startswith(hook): for q, cb_index in arr: q.put((cb_index, line)) def listen(self, hook, q, cb_index): d = (q, cb_index) if hook in self.listeners: self.listeners[hook].append(d) else: self.listeners[hook] = [d] @staticmethod def exec(*args): res = subprocess.run( ('herbstclient', *args), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True) return res.stdout.rstrip() class ModHLWMBase(Mod): client = HLWMClient() def __init__(self, **kwargs): super().__init__(**kwargs) self.q = queue.Queue() self.cbs = [] def post_start(self): pass def listen(self, hook, cb): self.client.listen(hook, self.q, len(self.cbs)) self.cbs.append(cb) def work(self): self.client.run() self.post_start() while True: cb_index, line = self.q.get() self.cbs[cb_index]() def hc(self, *args): return self.client.exec(*args); class ModHLWMTags(ModHLWMBase): def __init__(self, **kwargs): super().__init__(**kwargs) self.listen('tag_', self.refresh) def post_start(self): self.refresh() def fmt_tag(self, sym, tag, focused): disp = Fmt.pad(tag) match sym: case '.': buf = Fmt.fg('#777777', disp) case '#': buf = Fmt.bg('#333333', Fmt.ul('#7b51ca', Fmt.bold(disp))) case '!': buf = Fmt.bg( '#a03000', Fmt.ul('#7b51ca', Fmt.bold(disp)) if tag == focused else Fmt.ul('#000000', disp)) case ':': buf = disp return Fmt.clickable('', f'{id(self)} use {tag}', buf) def refresh(self): self.refreshing = True tags = self.hc('tag_status').lstrip('\t').split('\t') focused = self.hc('attr', 'tags.focus.name') self.out = Fmt.clickable( 4, f'{id(self)} use_index +1', Fmt.clickable( 5, f'{id(self)} use_index -1', ''.join( self.fmt_tag(tag[0], tag[1:], focused) for tag in tags))) self.refreshing = False self.repaint() def process_cmd(self, cmd): subprocess.run(('herbstclient', *cmd.split())) class ModHLWMTitle(ModHLWMBase): def __init__(self, max_len=None, **kwargs): super().__init__(**kwargs) self.max_len = max_len self.listen('focus_changed', self.refresh) self.listen('window_title_changed', self.refresh) def post_start(self): self.refresh() def refresh(self): self.refreshing = True title = self.hc('attr', 'clients.focus.title') self.out = ( title[0:self.max_len-1] + '…' if self.max_len and len(title) > self.max_len else title) self.refreshing = False self.repaint() class ModInputAvail(Mod): def __init__(self, path, avail_text=None, avail_bg='#207000', unavail_text=None, unavail_bg='#a03000', **kwargs): super().__init__(**kwargs) self.path = path self.avail_text = avail_text self.avail_bg = avail_bg self.unavail_text = unavail_text self.unavail_bg = unavail_bg def work(self): p = subprocess.Popen( ('udevadm', 'monitor', '-ups', 'input'), stdout=subprocess.PIPE, text=True) self._update_state(os.path.exists(self.path)) building = False for line in iter(p.stdout.readline, ''): line = line.rstrip() if not building and line.startswith('UDEV'): building = True e = {} elif building: if '=' in line: k, v = line.split('=', 1) e[k] = v else: building = False if e: self._process_event(e) def _process_event(self, e): if (action := e.get('ACTION')) and (paths := e.get('DEVLINKS')): if self.path in paths.split(' '): if action == 'add': self._update_state(True) elif action == 'remove': self._update_state(False) def _update_state(self, avail): self.refreshing = True if avail and self.avail_text is not None: self.out = Fmt.bg(self.avail_bg, Fmt.pad(self.avail_text)) elif not avail and self.unavail_text is not None: self.out = Fmt.bg(self.unavail_bg, Fmt.pad(self.unavail_text)) else: self.out = None self.refreshing = False self.repaint() class ModBattery(Mod): def __init__(self, battery, adapter=None, **kwargs): super().__init__(**kwargs) self.battery = battery self.adapter = adapter self._percent = None self._charging = None def work(self): # FIXME copied from ModInputAvail. should abstract udev code. p = subprocess.Popen( ('udevadm', 'monitor', '-ups', 'power_supply'), stdout=subprocess.PIPE, text=True) with open(f'/sys/class/power_supply/{self.battery}/capacity') as f: percent = f.read().rstrip('\n') charging = None if self.adapter: with open(f'/sys/class/power_supply/{self.adapter}/online') as f: charging = (f.read().rstrip('\n') == '1') self._update_state(percent, charging) building = False for line in iter(p.stdout.readline, ''): line = line.rstrip() if not building and line.startswith('UDEV'): building = True e = {} elif building: if '=' in line: k, v = line.split('=', 1) e[k] = v else: building = False if e: self._process_event(e) def _process_event(self, e): if (e.get('ACTION') == 'change' and (name := e.get('POWER_SUPPLY_NAME'))): if (name == self.battery and (cap := e.get('POWER_SUPPLY_CAPACITY'))): self._update_state(percent=cap) elif (name == self.adapter and (online := e.get('POWER_SUPPLY_ONLINE'))): self._update_state(charging=(online == '1')) def _update_state(self, percent=None, charging=None): self.refreshing = True if percent is not None: self._percent = percent if charging is not None: self._charging = charging buf = f'{self._percent}%%' if self._charging: buf += ' (charging)' self.out = Fmt.labelled('BAT', buf) self.refreshing = False self.repaint() class IntervalTimer(): def __init__(self): self.t = None self.lock = threading.Lock() self.func_intervals = {} self.func_next_time = {} def run(self): with self.lock: if self.t is None: self.t = threading.Thread(target=self.work, daemon=True) self.t.start() def work(self): while True: now = time.time() for func, ival in self.func_intervals.items(): if now > self.func_next_time.get(func, 0): func() self.func_next_time[func] = now - (now % ival) + ival if (t := min(self.func_next_time.values()) - time.time()) > 0: time.sleep(t) class ModInterval(Mod): timer = IntervalTimer() def __init__(self, interval=1, **kwargs): super().__init__(**kwargs) self.timer.func_intervals[self.work] = interval def run(self): self.timer.run() def work(self): self.refreshing = True self.refresh() self.refreshing = False self.repaint() class ModNetIf(ModInterval): def __init__(self, ifname, label=None, regex=False, **kwargs): super().__init__(**kwargs) self.ifname = ifname self.regex = regex self.label = label or ifname def refresh(self): stats = psutil.net_if_stats() if self.regex: arr = [v for k, v in stats.items() if re.fullmatch(self.ifname, k)] else: arr = [x] if (x := stats.get(self.ifname)) else [] state = 'up' if any(x.isup for x in arr) else 'down' self.out = Fmt.labelled(self.label, state) class ModCPU(ModInterval): def refresh(self): percent = psutil.cpu_percent() self.out = Fmt.labelled('CPU', f'{percent:.0f}%%') class ModMem(ModInterval): def refresh(self): percent = psutil.virtual_memory().percent self.out = Fmt.labelled('MEM', f'{percent:.0f}%%') class ModPWVol(ModInterval): def __init__(self, sink='@DEFAULT_AUDIO_SINK@', step='0.05', **kwargs): super().__init__(**kwargs) self.sink = sink self.step = step def refresh(self): res = subprocess.run( ('wpctl', 'get-volume', self.sink), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True) if (m := re.match('Volume: ([0-9.]*)( \[MUTED\])?', res.stdout)): vol = float(m.group(1)) * 100 buf = f'{vol:.0f}%%' if m.group(2): buf += ' [muted]' else: buf = Fmt.bg('#a03000', ' error ') self.out = Fmt.clickable( 4, f'{id(self)} set {self.step}+', Fmt.clickable( 5, f'{id(self)} set {self.step}-', Fmt.clickable( '', f'{id(self)} toggle-mute', Fmt.labelled('VOL', buf)))) def process_cmd(self, cmd): if cmd == 'toggle-mute': subprocess.run(('wpctl', 'set-mute', self.sink, 'toggle')) elif cmd.startswith('set'): subprocess.run(('wpctl', 'set-volume', self.sink, cmd[4:])) self.refresh() self.repaint() class Panel: def __init__(self, *mods, max_paint_delay=0, width=1920, height=22): self.e_repaint = threading.Event() self.e_flush = threading.Event() self.mods = mods for m in self.mods: m.e_repaint = self.e_repaint m.e_flush = self.e_flush self.mod_by_id = {id(m): m for m in self.mods} self.max_paint_delay = max_paint_delay self.width = width self.height = height def process_cmds(self, pipe): while True: line = pipe.readline() if not line: break mod_id, cmd = line.rstrip().split(' ', 1) if mod := self.mod_by_id.get(int(mod_id)): mod.process_cmd(cmd) def run(self): for mod in self.mods: mod.run() self.panel = subprocess.Popen( ('/home/david/dev/lemonbar-xft/lemonbar', '-bf', 'Monospace:size=10:dpi=96', '-f', 'Monospace:size=10:dpi=96:bold', '-g', f'{self.width}x{self.height}', '-u', '2', '-a', '100'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) threading.Thread( target=self.process_cmds, args=(self.panel.stdout,), daemon=True ).start() while True: self.e_repaint.wait() self.e_repaint.clear() if not self.e_flush.is_set() and self.max_paint_delay: max_t = time.monotonic() + self.max_paint_delay while (any(x.refreshing for x in self.mods) and (timeout := max_t - time.monotonic()) > 0): self.e_repaint.wait(timeout) self.e_repaint.clear() if self.e_flush.is_set(): break self.e_flush.clear() print(''.join([s for m in self.mods if (s := m.to_paint())]), file=self.panel.stdin, flush=True) Panel( ModHLWMTags(spacing=0), ModHLWMTitle(max_len=96), ModRight(), ModInputAvail( '/dev/input/by-id/usb-HID_Keyboard_HID_Keyboard-event-kbd', unavail_text='NO KEYBOARD'), ModNetIf('wg(?!0$).*', regex=True, label='wg*'), ModPWVol(), ModCPU(interval=0.5), ModMem(interval=0.5), ModDate( fmts=('%Y-%m-%d %H:%M:%S', '%H:%M'), tzs=({'id': None}, {'id': 'UTC'}, {'id': 'Asia/Yerevan', 'label': 'AMT'})), max_paint_delay=0.01, width=(re.match('[0-9]+', HLWMClient.exec('attr', 'monitors.0.geometry')) .group()) ).run()