#!/usr/bin/env python # # Copyright 2025 David Vazgenovich Shakaryan import argparse import collections import copy import functools import io import ipaddress import os import re import shutil import string import sys import tomllib class Network(dict): def __init__(self, data, name): super().__init__(data) self.name = name self.peers = {} def add_peer(self, peer): self.peers[peer.name] = peer class Peer(dict): def __init__(self, data, name, network): super().__init__(data) self.name = name self.network = network self.interfaces = {} self.network.add_peer(self) def add_interface(self, interface): self.interfaces[interface.name] = interface def ip_interfaces(self, version=None): return [ ipaddress.ip_interface( f'{x[ int(self['id']) if x.version == 4 else int(str(self['id']), 16)]}' f'/{x.prefixlen}') for x in self.network['subnets'] if not version or x.version == int(version)] class Interface(collections.UserDict): _INHERITABLE = [ 'port', 'fwmark', 'keepalive', 'file-prefix', 'name-format', 'privkey-path', ] def __init__(self, data, name, peer): super().__init__(data) self.name = name self.peer = peer self.peer.add_interface(self) def __contains__(self, key): return ( key in self.data or (key in self._INHERITABLE and key in self.peer)) def __getitem__(self, key): try: return self.data[key] except KeyError: if key in self._INHERITABLE: return self.peer[key] raise @functools.cached_property def network(self): return self.peer.network @functools.cached_property def formatted_name(self): return self.format_string(self.get('name-format', '$network$_if')) def format_string(self, s): return string.Template(s).substitute({ 'network': self.network.name, 'peer': self.peer.name, 'if': self.name, '_if': f'-{self.name}' if self.name else ''}) def deep_merge(d, src): for k, v in src.items(): if isinstance(v, dict) and (dv := d.get(k)): deep_merge(dv, v) else: d[k] = v return d # given peer with ips 10.0.0.20, fc00:ff:ff:dead:beef:a1:b2:c3 # {peer4/24} = 10.0.0.0/24 # {peer4/28} = 10.0.0.16/28 # {peer4} = 10.0.0.20/32 # {peer6/64} = fc00:ff:ff:dead::/64 # {peer6/96} = fc00:ff:ff:dead:beef:a1::/96 # {peer6} = fc00:ff:ff:dead:beef:a1:b2:c3/128 # {peer} = 10.0.0.20/32, fc00:ff:ff:dead:beef:a1:b2:c3/128 # # a subnet size of '-', e.g. {peer/-}, will take the subnet sizes from the # network configuration. # # by default, this returns network addresses with host bits removed. # passing interface=True will maintain the host bits. def ipspec_to_ips(peer, ipspec, *, interface=False): if not (m := re.fullmatch(r'\{peer([46])?(/.+)?\}', ipspec)): return [ipspec] version = m[1] subnet = m[2] f = ( ipaddress.ip_interface if interface else lambda x: ipaddress.ip_network(x, strict=False)) return [ str(f(x if subnet == '/-' else f'{x.ip}{subnet or ''}')) for x in peer.ip_interfaces(version)] def ipspecs_to_ips(peer, ipspecs, *, interface=False): return [ ip for ipspec in ipspecs for ip in ipspec_to_ips(peer, ipspec, interface=interface)] def auto_peerspecs(peer, peerspec): return [ peerspec | {'name': x} for x in list(dict.fromkeys( net_peer_name for net_peer_name, net_peer in peer.network.peers.items() for if_ in net_peer.interfaces.values() for if_peer in if_['peers'] if if_peer.get('name') == peer.name))] def expand_peerspecs(peer, peerspecs): return [ x for peerspec in peerspecs for x in ( auto_peerspecs(peer, peerspec) if peerspec.get('auto') else [peerspec])] def gc_if_data(if_): if_data = { 'name': if_.formatted_name, 'privkey': get_privkey(if_), 'addrs': ipspecs_to_ips( if_.peer, if_.get('ips', ['{peer/-}']), interface=True), 'port': if_.get('port'), 'fwmark': if_.get('fwmark'), 'peers': [], } for peerspec in expand_peerspecs(if_.peer, if_['peers']): peer = if_.network.peers[peerspec['name']] peer_data = { 'name': peer.name, 'pubkey': peer['pubkey'], 'ips': ipspecs_to_ips(peer, peerspec.get('ips', ['{peer}'])), 'endpoint': None, 'keepalive': ( peerspec['keepalive'] if 'keepalive' in peerspec else if_.get('keepalive')), } if (host := peer.get('host')): peer_data['endpoint'] = f'{host}:{peer.get('port', 51820)}' if_data['peers'].append(peer_data) return if_data def gc_if_wgquick(if_data): buf = io.StringIO() buf.write( '[Interface]\n' f'PrivateKey = {if_data['privkey']}\n') for addr in if_data['addrs']: buf.write(f'Address = {addr}\n') if (port := if_data['port']): buf.write(f'ListenPort = {port}\n') if (fwmark := if_data['fwmark']): buf.write(f'FwMark = {fwmark}\n') for peer_data in if_data['peers']: buf.write( '\n' f'# {peer_data['name']}\n' '[Peer]\n' f'PublicKey = {peer_data['pubkey']}\n') for ip in peer_data['ips']: buf.write(f'AllowedIPs = {ip}\n') if (endpoint := peer_data.get('endpoint')): buf.write(f'Endpoint = {endpoint}\n') if (keepalive := peer_data.get('keepalive')): buf.write(f'PersistentKeepalive = {keepalive}\n') return buf def gc_if_systemd_network(if_data): buf = io.StringIO() buf.write( '[Match]\n' f'Name={if_data['name']}\n' '\n' '[Network]\n' 'IPMasquerade=both\n') for addr in if_data['addrs']: buf.write(f'Address={addr}\n') return buf def gc_if_systemd_netdev(if_data): buf = io.StringIO() buf.write( '[NetDev]\n' f'Name={if_data['name']}\n' 'Kind=wireguard\n' f'Description=WireGuard tunnel {if_data['name']}\n' '\n' '[WireGuard]\n' f'PrivateKey={if_data['privkey']}\n') if (port := if_data['port']): buf.write(f'ListenPort={port}\n') if (fwmark := if_data['fwmark']): buf.write(f'FirewallMark={fwmark}\n') for peer_data in if_data['peers']: buf.write( '\n' f'# {peer_data['name']}\n' '[WireGuardPeer]\n' f'PublicKey={peer_data['pubkey']}\n') for ip in peer_data['ips']: buf.write(f'AllowedIPs={ip}\n') if (endpoint := peer_data.get('endpoint')): buf.write(f'Endpoint={endpoint}\n') if (keepalive := peer_data.get('keepalive')): buf.write(f'PersistentKeepalive={keepalive}\n') return buf def buf_to_file(buf, path, *, mode=None): print(f'Creating file {path}') if mode: path = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=mode) with open(path, 'w') as f: buf.seek(0) shutil.copyfileobj(buf, f) def create_if_files(if_, output_dir): if_data = gc_if_data(if_) prefix = os.path.join(output_dir, if_.get('file-prefix', '')) if if_.get('type') == 'systemd': buf_to_file( gc_if_systemd_netdev(if_data), f'{prefix}{if_.formatted_name}.netdev', mode=0o640) buf_to_file( gc_if_systemd_network(if_data), f'{prefix}{if_.formatted_name}.network') else: buf_to_file( gc_if_wgquick(if_data), f'{prefix}{if_.formatted_name}.conf') def load_network_peer_interfaces(config, peer): ifs = peer.pop('if', {}) for ifset_name in peer.pop('ifsets', []): for if_name, if_conf in config['ifset'][ifset_name]['if'].items(): ifs.setdefault(if_name, copy.deepcopy(if_conf)) for if_name, if_conf in ifs.items(): # remove self from interface peers peers = [x for x in if_conf['peers'] if x.get('name') != peer.name] if not peers: continue if_conf['peers'] = peers Interface(if_conf, if_name, peer) def load_network_peers(config, network): for peer_name, peer_conf in copy.deepcopy(config['peer']).items(): if (peer_net_conf := peer_conf.pop('net', {}).get(network.name)): deep_merge(peer_conf, peer_net_conf) peer = Peer(peer_conf, peer_name, network) load_network_peer_interfaces(config, peer) def load_networks(config): nets = [] for net_name, net_conf in copy.deepcopy(config['net']).items(): net_conf['subnets'] = [ ipaddress.ip_network(x) for x in net_conf['subnets']] net = Network(net_conf, net_name) nets.append(net) load_network_peers(config, net) return nets @functools.cache def read_privkey(path): try: with open(path) as f: return f.read().rstrip() except OSError: return 'FIXME' def get_privkey(if_): return read_privkey(if_.format_string(os.path.expanduser( if_.get('privkey-path', '~/.wg-genconf/${network}.privkey')))) def argparse_dir(s): if not os.path.exists(s): raise argparse.ArgumentTypeError(f'{s}: no such file or directory') if not os.path.isdir(s): raise argparse.ArgumentTypeError(f'{s}: not a directory') return s def main(): parser = argparse.ArgumentParser() parser.add_argument( '-c', '--config', dest='config_file', default='~/.wg-genconf/config.toml') parser.add_argument('-o', '--output-dir', default='.', type=argparse_dir) parser.add_argument('peer_name') args = parser.parse_args() with open(os.path.expanduser(args.config_file), 'rb') as f: config = tomllib.load(f) networks = load_networks(config) if not config['peer'].get(args.peer_name): sys.exit(f"peer '{args.peer_name}' not configured") for network in networks: peer = network.peers[args.peer_name] for if_ in peer.interfaces.values(): # file=false allows associating peers without creating interface # config files, useful when an auto-peer interface is being # separately created if if_.get('file', True): create_if_files(if_, args.output_dir) if __name__ == '__main__': main()