#!/usr/bin/env python3 # # Copyright 2024 David Vazgenovich Shakaryan import glob import os import queue import subprocess import threading import time from datetime import datetime, timezone from zoneinfo import ZoneInfo def fmt_label(s): return f'%{{F#7b51ca}}{s}%{{F-}}' def spacing(n = 1): return f'%{{O{n * 7}}}' class Mod: def __init__(self): self.cv = None self.t = None self.out = None def run(self): if callable(getattr(self, 'work', None)): self.t = threading.Thread(target=self.work, daemon=True) self.t.start() def process_cmd(self, cmd): pass class ModRight(Mod): def __init__(self): self.out = '%{r}' class ModSpacer(Mod): def __init__(self, n=1): self.out = spacing(n) class ModDate(Mod): def __init__(self, fmts=None, tzs=None): super().__init__() self.e = threading.Event() self.fmts = fmts or ('%H:%M',) self.fmt = 0 self.tzs = tzs or ({'id': None},) self.tz = 0 def work(self): while True: fmt = self.fmts[self.fmt] tz = self.tzs[self.tz] tz_id = tz['id'] tz_label = tz.get('label') dt = datetime.now().astimezone(ZoneInfo(tz_id) if tz_id else None) label = tz_label or dt.strftime('%Z') buf = (f'%{{A4:{id(self)} tz +1:}}%{{A5:{id(self)} tz -1:}}' f'%{{A:{id(self)} tz:}}{fmt_label(label)}%{{A}}{spacing()}' f'%{{A:{id(self)} fmt:}}{dt.strftime(fmt)}%{{A}}' '%{A}%{A}') with self.cv: self.out = buf self.cv.notify() self.e.wait(1 - (dt.microsecond / 1000000)) self.e.clear() def process_cmd(self, cmd): if cmd == 'fmt': self.fmt = (self.fmt + 1) % len(self.fmts) self.e.set() elif cmd.startswith('tz'): self.tz = (self.tz + int(cmd[3:] or 1)) % len(self.tzs) self.e.set() class HLWMClient(): def __init__(self): self.t = None self.lock = threading.Lock() self.listeners = {} def run(self): with self.lock: if self.t is None: self.t = threading.Thread(target=self.work, daemon=True) self.t.start() def work(self): p = subprocess.Popen( ('herbstclient', '--idle'), stdout=subprocess.PIPE, text=True) for line in iter(p.stdout.readline, ''): for hook, arr in self.listeners.items(): if line.startswith(hook): for q, cb_index in arr: q.put((cb_index, line)) def listen(self, hook, q, cb_index): d = (q, cb_index) if hook in self.listeners: self.listeners[hook].append(d) else: self.listeners[hook] = [d] def exec(self, *args): res = subprocess.run( ('herbstclient', *args), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True) return res.stdout.rstrip() class ModHLWMBase(Mod): client = HLWMClient() def __init__(self): super().__init__() self.q = queue.Queue() self.cbs = [] def post_start(self): pass def listen(self, hook, cb): self.client.listen(hook, self.q, len(self.cbs)) self.cbs.append(cb) def work(self): self.client.run() self.post_start() while True: cb_index, line = self.q.get() self.cbs[cb_index]() def hc(self, *args): return self.client.exec(*args); class ModHLWMTags(ModHLWMBase): def __init__(self): super().__init__() self.listen('tag_', self.refresh) def post_start(self): self.refresh() def fmt_tag(self, sym, tag, tagstr): buf = f'%{{A:{id(self)} use {tag}:}}' match sym: case '.': buf += f'%{{F#777777}}%{{O7}}{tag}%{{O7}}%{{F-}}' case '#': buf += (f'%{{B#333333}}%{{U#7b51ca}}%{{+u}}%{{O7}}%{{T2}}' f'{tag}%{{T-}}%{{O7}}%{{-u}}%{{U-}}%{{B-}}') case '!': if '#' in tagstr: buf += (f'%{{B#a03000}}%{{U#000000}}%{{+u}}%{{O7}}' f'{tag}%{{O7}}%{{-u}}%{{U-}}%{{B-}}') else: buf += (f'%{{B#a03000}}%{{U#7b51ca}}%{{+u}}%{{O7}}%{{T2}}' f'{tag}%{{T-}}%{{O7}}%{{-u}}%{{U-}}%{{B-}}') case ':': buf += f'%{{O7}}{tag}%{{O7}}' buf += '%{A}' return buf def refresh(self): tagstr = self.hc('tag_status') tags = tagstr.lstrip('\t').split('\t') buf = (f'%{{A4:{id(self)} use_index +1:}}' f'%{{A5:{id(self)} use_index -1:}}') for tag in tags: buf += self.fmt_tag(tag[0], tag[1:], tagstr) buf += '%{A}%{A}' with self.cv: self.out = buf self.cv.notify() def process_cmd(self, cmd): subprocess.run(('herbstclient', *cmd.split())) class ModHLWMTitle(ModHLWMBase): def __init__(self): super().__init__() self.listen('focus_changed', self.refresh) self.listen('window_title_changed', self.refresh) def post_start(self): self.refresh() def refresh(self): title = self.hc('attr', 'clients.focus.title') buf = title if len(title) < 65 else title[0:63] + '…' with self.cv: self.out = buf self.cv.notify() class ModInputUnavail(Mod): def __init__(self, path, label=''): super().__init__() self.path = path self.label = label def work(self): p = subprocess.Popen( ('udevadm', 'monitor', '-ups', 'input'), stdout=subprocess.PIPE, text=True) self._update_state(os.path.exists(self.path)) building = False for line in iter(p.stdout.readline, ''): line = line.rstrip() if not building and line.startswith('UDEV'): building = True e = {} elif building: if '=' in line: k, v = line.split('=', 1) e[k] = v else: building = False if e: self._process_event(e) def _process_event(self, e): if (action := e.get('ACTION')) and (paths := e.get('DEVLINKS')): if self.path in paths.split(' '): if action == 'add': self._update_state(True) elif action == 'remove': self._update_state(False) def _update_state(self, avail): if avail: buf = None else: buf = f'%{{B#a03000}}{spacing()}{self.label}{spacing()}%{{B-}}' with self.cv: self.out = buf self.cv.notify() class Bar: def __init__(self, *mods): self.cv = threading.Condition() self.mods = mods for m in self.mods: m.cv = self.cv self.mod_by_id = {id(m): m for m in self.mods} def process_cmds(self, pipe): while True: line = pipe.readline() if not line: break mod_id, cmd = line.rstrip().split(' ', 1) if mod := self.mod_by_id.get(int(mod_id)): mod.process_cmd(cmd) def run(self): for mod in self.mods: mod.run() self.bar = subprocess.Popen( ('/home/david/lemonbar-xft/lemonbar', '-bf', 'Monospace:size=10:dpi=96', '-f', 'Monospace:size=10:dpi=96:bold', '-u', '2', '-g', '1920x22', '-a', '100'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) threading.Thread( target=self.process_cmds, args=(self.bar.stdout,), daemon=True ).start() while True: with self.cv: self.cv.wait() print(''.join([m.out for m in self.mods if m.out]), file=self.bar.stdin, flush=True) Bar( ModHLWMTags(), ModSpacer(), ModHLWMTitle(), ModRight(), ModInputUnavail( '/dev/input/by-id/usb-HID_Keyboard_HID_Keyboard-event-kbd', label='NO KEYBOARD'), ModSpacer(n=2), ModDate( fmts=('%Y-%m-%d %H:%M:%S', '%H:%M'), tzs=({'id': None}, {'id': 'UTC'}, {'id': 'Asia/Yerevan', 'label': 'AMT'})), ModSpacer() ).run()