#!/usr/bin/env python3 # # Copyright 2024 David Vazgenovich Shakaryan import os import psutil import queue import subprocess import threading import time from datetime import datetime from zoneinfo import ZoneInfo class Fmt: @classmethod def fg(cls, col, s): return f'%{{F{col}}}{s}%{{F-}}' @classmethod def bg(cls, col, s): return f'%{{B{col}}}{s}%{{B-}}' @classmethod def ul(cls, col, s): return f'%{{U{col}}}%{{+u}}{s}%{{-u}}%{{U-}}' @classmethod def bold(cls, s): return f'%{{T2}}{s}%{{T-}}' @classmethod def label(cls, s): return cls.fg('#7b51ca', s) @classmethod def labelled(cls, label, s): return cls.label(label) + cls.spacer() + s @classmethod def spacer(cls, n=1): return f'%{{O{n * 7}}}' @classmethod def pad(cls, s, n=1): sp = cls.spacer(n) return f'{sp}{s}{sp}' @classmethod def clickable(cls, btn, cmd, s): return f'%{{A{btn}:{cmd}:}}{s}%{{A}}' class Mod: def __init__(self, spacing=1, spacing_l=None, spacing_r=None): self.e_repaint = None self.spacing_l = spacing_l if spacing_l is not None else spacing self.spacing_r = spacing_r if spacing_r is not None else spacing # changes should be applied atomically, as the main thread may read # this at any time. self.out = None def repaint(self): self.e_repaint.set() def run(self): if callable(getattr(self, 'work', None)): threading.Thread(target=self.work, daemon=True).start() def to_paint(self): if (buf := self.out): if self.spacing_l: buf = Fmt.spacer(self.spacing_l) + buf if self.spacing_r: buf += Fmt.spacer(self.spacing_r) return buf def process_cmd(self, cmd): pass class ModRight(Mod): def __init__(self, **kwargs): kwargs.setdefault('spacing', 0) super().__init__(**kwargs) self.out = '%{r}' class ModSpacer(Mod): def __init__(self, n=1, **kwargs): kwargs.setdefault('spacing', 0) super().__init__(**kwargs) self.out = Fmt.spacer(n) class ModDate(Mod): def __init__(self, fmts=None, tzs=None, **kwargs): super().__init__(**kwargs) self.e = threading.Event() self.fmts = fmts or ('%H:%M',) self.fmt = 0 self.tzs = tzs or ({'id': None},) self.tz = 0 def work(self): while True: fmt = self.fmts[self.fmt] tz = self.tzs[self.tz] tz_id = tz['id'] tz_label = tz.get('label') dt = datetime.now().astimezone(ZoneInfo(tz_id) if tz_id else None) label = tz_label or dt.strftime('%Z') self.out = Fmt.clickable( 4, f'{id(self)} tz +1', Fmt.clickable( 5, f'{id(self)} tz -1', Fmt.clickable('', f'{id(self)} tz', Fmt.label(label)) + Fmt.spacer() + Fmt.clickable( '', f'{id(self)} fmt', dt.strftime(fmt)))) self.repaint() self.e.wait(1 - (dt.microsecond / 1000000)) self.e.clear() def process_cmd(self, cmd): if cmd == 'fmt': self.fmt = (self.fmt + 1) % len(self.fmts) self.e.set() elif cmd.startswith('tz'): self.tz = (self.tz + int(cmd[3:] or 1)) % len(self.tzs) self.e.set() class HLWMClient(): def __init__(self): self.t = None self.lock = threading.Lock() self.listeners = {} def run(self): with self.lock: if self.t is None: self.t = threading.Thread(target=self.work, daemon=True) self.t.start() def work(self): p = subprocess.Popen( ('herbstclient', '--idle'), stdout=subprocess.PIPE, text=True) for line in iter(p.stdout.readline, ''): for hook, arr in self.listeners.items(): if line.startswith(hook): for q, cb_index in arr: q.put((cb_index, line)) def listen(self, hook, q, cb_index): d = (q, cb_index) if hook in self.listeners: self.listeners[hook].append(d) else: self.listeners[hook] = [d] def exec(self, *args): res = subprocess.run( ('herbstclient', *args), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True) return res.stdout.rstrip() class ModHLWMBase(Mod): client = HLWMClient() def __init__(self, **kwargs): super().__init__(**kwargs) self.q = queue.Queue() self.cbs = [] def post_start(self): pass def listen(self, hook, cb): self.client.listen(hook, self.q, len(self.cbs)) self.cbs.append(cb) def work(self): self.client.run() self.post_start() while True: cb_index, line = self.q.get() self.cbs[cb_index]() def hc(self, *args): return self.client.exec(*args); class ModHLWMTags(ModHLWMBase): def __init__(self, **kwargs): super().__init__(**kwargs) self.listen('tag_', self.refresh) def post_start(self): self.refresh() def fmt_tag(self, sym, tag, focused): disp = Fmt.pad(tag) match sym: case '.': buf = Fmt.fg('#777777', disp) case '#': buf = Fmt.bg('#333333', Fmt.ul('#7b51ca', Fmt.bold(disp))) case '!': buf = Fmt.bg( '#a03000', Fmt.ul('#7b51ca', Fmt.bold(disp)) if tag == focused else Fmt.ul('#000000', disp)) case ':': buf = disp return Fmt.clickable('', f'{id(self)} use {tag}', buf) def refresh(self): tags = self.hc('tag_status').lstrip('\t').split('\t') focused = self.hc('attr', 'tags.focus.name') self.out = Fmt.clickable( 4, f'{id(self)} use_index +1', Fmt.clickable( 5, f'{id(self)} use_index -1', ''.join( self.fmt_tag(tag[0], tag[1:], focused) for tag in tags))) self.repaint() def process_cmd(self, cmd): subprocess.run(('herbstclient', *cmd.split())) class ModHLWMTitle(ModHLWMBase): def __init__(self, max_len=None, **kwargs): super().__init__(**kwargs) self.max_len = max_len self.listen('focus_changed', self.refresh) self.listen('window_title_changed', self.refresh) def post_start(self): self.refresh() def refresh(self): title = self.hc('attr', 'clients.focus.title') self.out = ( title[0:self.max_len-1] + '…' if self.max_len and len(title) > self.max_len else title) self.repaint() class ModInputAvail(Mod): def __init__(self, path, avail_text=None, avail_bg='#207000', unavail_text=None, unavail_bg='#a03000', **kwargs): super().__init__(**kwargs) self.path = path self.avail_text = avail_text self.avail_bg = avail_bg self.unavail_text = unavail_text self.unavail_bg = unavail_bg def work(self): p = subprocess.Popen( ('udevadm', 'monitor', '-ups', 'input'), stdout=subprocess.PIPE, text=True) self._update_state(os.path.exists(self.path)) building = False for line in iter(p.stdout.readline, ''): line = line.rstrip() if not building and line.startswith('UDEV'): building = True e = {} elif building: if '=' in line: k, v = line.split('=', 1) e[k] = v else: building = False if e: self._process_event(e) def _process_event(self, e): if (action := e.get('ACTION')) and (paths := e.get('DEVLINKS')): if self.path in paths.split(' '): if action == 'add': self._update_state(True) elif action == 'remove': self._update_state(False) def _update_state(self, avail): if avail and self.avail_text is not None: self.out = Fmt.bg(self.avail_bg, Fmt.pad(self.avail_text)) elif not avail and self.unavail_text is not None: self.out = Fmt.bg(self.unavail_bg, Fmt.pad(self.unavail_text)) else: self.out = None self.repaint() class IntervalTimer(): def __init__(self): self.t = None self.lock = threading.Lock() self.func_intervals = {} self.func_next_time = {} self.e_repaint = None def run(self): with self.lock: if self.t is None: self.t = threading.Thread(target=self.work, daemon=True) self.t.start() def work(self): while True: now = time.time() for func, ival in self.func_intervals.items(): if now > self.func_next_time.get(func, 0): func() self.func_next_time[func] = now - (now % ival) + ival self.e_repaint.set() time.sleep(min(self.func_next_time.values()) - time.time()) class ModInterval(Mod): timer = IntervalTimer() def __init__(self, interval=1, **kwargs): super().__init__(**kwargs) self.timer.func_intervals[self.refresh] = interval def run(self): if not self.timer.e_repaint: self.timer.e_repaint = self.e_repaint self.timer.run() class ModNetIf(ModInterval): def __init__(self, ifname, label=None, **kwargs): super().__init__(**kwargs) self.ifname = ifname self.label = label or ifname def refresh(self): stats = psutil.net_if_stats().get(self.ifname) state = 'up' if stats and stats.isup else 'down' self.out = Fmt.labelled(self.label, state) class ModCPU(ModInterval): def refresh(self): percent = psutil.cpu_percent() self.out = Fmt.labelled('CPU', f'{percent:.0f}%%') class ModMem(ModInterval): def refresh(self): percent = psutil.virtual_memory().percent self.out = Fmt.labelled('MEM', f'{percent:.0f}%%') class Panel: def __init__(self, *mods): self.e_repaint = threading.Event() self.mods = mods for m in self.mods: m.e_repaint = self.e_repaint self.mod_by_id = {id(m): m for m in self.mods} def process_cmds(self, pipe): while True: line = pipe.readline() if not line: break mod_id, cmd = line.rstrip().split(' ', 1) if mod := self.mod_by_id.get(int(mod_id)): mod.process_cmd(cmd) def run(self): for mod in self.mods: mod.run() self.panel = subprocess.Popen( ('/home/david/lemonbar-xft/lemonbar', '-bf', 'Monospace:size=10:dpi=96', '-f', 'Monospace:size=10:dpi=96:bold', '-u', '2', '-g', '1920x22', '-a', '100'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) threading.Thread( target=self.process_cmds, args=(self.panel.stdout,), daemon=True ).start() while True: self.e_repaint.wait() self.e_repaint.clear() print(''.join([s for m in self.mods if (s := m.to_paint())]), file=self.panel.stdin, flush=True) Panel( ModHLWMTags(spacing=0), ModHLWMTitle(max_len=96), ModRight(), ModInputAvail( '/dev/input/by-id/usb-HID_Keyboard_HID_Keyboard-event-kbd', unavail_text='NO KEYBOARD'), ModNetIf('wg0', interval=1), ModCPU(interval=0.5), ModMem(interval=0.5), ModDate( fmts=('%Y-%m-%d %H:%M:%S', '%H:%M'), tzs=({'id': None}, {'id': 'UTC'}, {'id': 'Asia/Yerevan', 'label': 'AMT'})) ).run()