#!/usr/bin/env python3 # # Copyright 2024 David Vazgenovich Shakaryan import glob import subprocess import threading import time from datetime import datetime, timezone from zoneinfo import ZoneInfo def fmt_label(s): return f'%{{F#7b51ca}}{s}%{{F-}}' def spacing(n = 1): return f'%{{O{n * 7}}}' class Mod: def __init__(self): self.cv = None self.t = None self.out = None def run(self): if callable(getattr(self, 'work', None)): self.t = threading.Thread(target=self.work, daemon=True) self.t.start() def process_cmd(self, cmd): pass class ModRight(Mod): def __init__(self): self.out = '%{r}' class ModSpacer(Mod): def __init__(self, n=1): self.out = spacing(n) class ModDate(Mod): def __init__(self, fmts=None, tzs=None): super().__init__() self.e = threading.Event() self.fmts = fmts or ('%H:%M',) self.fmt = 0 self.tzs = tzs or ({'id': None},) self.tz = 0 def work(self): while True: fmt = self.fmts[self.fmt] tz = self.tzs[self.tz] tz_id = tz['id'] tz_label = tz.get('label') dt = datetime.now().astimezone(ZoneInfo(tz_id) if tz_id else None) label = tz_label or dt.strftime('%Z') buf = ('%{A4:ModDate tz +1:}%{A5:ModDate tz -1:}' f'%{{A:ModDate tz:}}{fmt_label(label)}%{{A}}{spacing()}' f'%{{A:ModDate fmt:}}{dt.strftime(fmt)}%{{A}}' '%{A}%{A}') with self.cv: self.out = buf self.cv.notify() self.e.wait(1 - (dt.microsecond / 1000000)) self.e.clear() def process_cmd(self, cmd): if cmd == 'fmt': self.fmt = (self.fmt + 1) % len(self.fmts) self.e.set() elif cmd.startswith('tz'): self.tz = (self.tz + int(cmd[3:] or 1)) % len(self.tzs) self.e.set() class ModHLWM(Mod): def __init__(self): super().__init__() self.out_tags = None self.out_title = None def flush(self): self.out = f'{self.out_tags}{spacing()}{self.out_title}' def fmt_tag(self, sym, tag, tagstr): buf = f'%{{A:ModHLWM use {tag}:}}' match sym: case '.': buf += f'%{{F#777777}}%{{O7}}{tag}%{{O7}}%{{F-}}' case '#': buf += (f'%{{B#333333}}%{{U#7b51ca}}%{{+u}}%{{O7}}%{{T2}}' f'{tag}%{{T-}}%{{O7}}%{{-u}}%{{U-}}%{{B-}}') case '!': if '#' in tagstr: buf += (f'%{{B#a03000}}%{{U#000000}}%{{+u}}%{{O7}}' f'{tag}%{{O7}}%{{-u}}%{{U-}}%{{B-}}') else: buf += (f'%{{B#a03000}}%{{U#7b51ca}}%{{+u}}%{{O7}}%{{T2}}' f'{tag}%{{T-}}%{{O7}}%{{-u}}%{{U-}}%{{B-}}') case ':': buf += f'%{{O7}}{tag}%{{O7}}' buf += '%{A}' return buf def refresh_tags(self): res = subprocess.run( ('herbstclient', 'tag_status'), stdout=subprocess.PIPE, text=True) tagstr = res.stdout.strip() tags = tagstr.split('\t') buf = '%{A4:ModHLWM use_index +1:}%{A5:ModHLWM use_index -1:}' for tag in tags: buf += self.fmt_tag(tag[0], tag[1:], tagstr) buf += '%{A}%{A}' with self.cv: self.out_tags = buf self.flush() self.cv.notify() def refresh_title(self): res = subprocess.run( ('herbstclient', 'attr', 'clients.focus.title'), stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True) title = res.stdout.strip() buf = title if len(title) < 65 else title[0:63] + '…' with self.cv: self.out_title = buf self.flush() self.cv.notify() def work(self): p = subprocess.Popen( ('herbstclient', '--idle'), stdout=subprocess.PIPE, text=True) self.refresh_tags() self.refresh_title() for line in iter(p.stdout.readline, ''): if line.startswith('tag_'): self.refresh_tags() elif (line.startswith('focus_changed') or line.startswith('window_title_changed')): self.refresh_title() def process_cmd(self, cmd): subprocess.run(('herbstclient', *cmd.split())) class ModNoKBD(Mod): def __init__(self): super().__init__() self.prev = True def work(self): while True: has_kbd = bool(glob.glob('/dev/input/by-id/*-kbd')) if has_kbd != self.prev: if has_kbd: buf = None else: buf = f'%{{B#a03000}}{spacing()}NO KBD{spacing()}%{{B-}}' self.prev = has_kbd with self.cv: self.out = buf self.cv.notify() time.sleep(1) class Bar: def __init__(self, *mods): self.cv = threading.Condition() self.mods = mods for m in self.mods: m.cv = self.cv def process_cmds(self, pipe): while True: line = pipe.readline() if not line: break cmd_mod, cmd = line.rstrip().split(' ', 1) for mod in self.mods: if mod.__class__.__name__ == cmd_mod: mod.process_cmd(cmd) def run(self): for mod in self.mods: mod.run() self.bar = subprocess.Popen( ('/home/david/lemonbar-xft/lemonbar', '-bf', 'Monospace:size=10:dpi=96', '-f', 'Monospace:size=10:dpi=96:bold', '-u', '2', '-g', '1920x22', '-a', '100'), stdin=subprocess.PIPE, stdout=subprocess.PIPE, text=True) threading.Thread( target=self.process_cmds, args=(self.bar.stdout,), daemon=True ).start() while True: with self.cv: self.cv.wait() print(''.join([m.out for m in self.mods if m.out]), file=self.bar.stdin, flush=True) Bar( ModHLWM(), ModRight(), ModNoKBD(), ModSpacer(n=2), ModDate( fmts=('%Y-%m-%d %H:%M:%S', '%H:%M'), tzs=({'id': None}, {'id': 'UTC'}, {'id': 'Asia/Yerevan', 'label': 'AMT'})), ModSpacer() ).run()