#!/usr/bin/env python # # Copyright 2025 David Vazgenovich Shakaryan import collections import copy import functools import io import ipaddress import os import pathlib import re import shutil import sys import tomllib class Network(dict): def __init__(self, data, name): super().__init__(data) self.name = name self.peers = {} def add_peer(self, peer): self.peers[peer.name] = peer class Peer(dict): def __init__(self, data, name, network): super().__init__(data) self.name = name self.network = network self.interfaces = {} self.network.add_peer(self) def add_interface(self, interface): self.interfaces[interface.name] = interface def ip_interfaces(self, version=None): return [ ipaddress.ip_interface( f'{x[int(self['id']) if x.version == 4 else int(str(self['id']), 16)]}' f'/{x.prefixlen}') for x in self.network['subnets'] if not version or x.version == int(version)] class Interface(collections.UserDict): _INHERITABLE = [ 'port', 'fwmark', 'prefix', 'file-prefix', ] def __init__(self, data, name, peer): super().__init__(data) self.name = name self.peer = peer self.peer.add_interface(self) def __contains__(self, key): return (key in self.data or (key in self._INHERITABLE and key in self.peer)) def __getitem__(self, key): try: return self.data[key] except KeyError as e: if key in self._INHERITABLE: return self.peer[key] raise e @functools.cached_property def network(self): return self.peer.network @functools.cached_property def qualified_name(self): name = f'{self.get('prefix', '')}{self.network.name}' if self.name: name += f'-{self.name}' return name def deep_merge(d, src): for k, v in src.items(): if isinstance(v, dict) and (dv := d.get(k)): deep_merge(dv, v) else: d[k] = v return d # given peer with ips 10.0.0.20, fc00:ff:ff:dead:beef:a1:b2:c3 # {peer4/24} = 10.0.0.0/24 # {peer4/28} = 10.0.0.16/28 # {peer4} = 10.0.0.20/32 # {peer6/64} = fc00:ff:ff:dead::/64 # {peer6/96} = fc00:ff:ff:dead:beef:a1::/96 # {peer6} = fc00:ff:ff:dead:beef:a1:b2:c3/128 # {peer} = 10.0.0.20/32, fc00:ff:ff:dead:beef:a1:b2:c3/128 # # a subnet size of '-', e.g. {peer/-}, will take the subnet sizes from the # network configuration. # # by default, this returns network addresses with host bits removed. # passing interface=True will maintain the host bits. def ipspec_to_ips(peer, ipspec, interface=False): if not (m := re.fullmatch(r'\{peer(.)?(/.*)?\}', ipspec)): return [ipspec] version = m[1] subnet = m[2] f = (ipaddress.ip_interface if interface else lambda x: ipaddress.ip_network(x, False)) return [ str(f(x if subnet == '/-' else f'{x.ip}{subnet if subnet else ''}')) for x in peer.ip_interfaces(version)] def ipspecs_to_ips(peer, ipspecs, interface=False): return [ ip for ipspec in ipspecs for ip in ipspec_to_ips(peer, ipspec, interface=interface)] def auto_peerspecs(peer): return [ {'name': x, 'ips': ['{peer}']} for x in list(dict.fromkeys( net_peer_name for net_peer_name, net_peer in peer.network.peers.items() for if_ in net_peer.interfaces.values() for if_peer in if_['peers'] if if_peer.get('name') == peer.name))] def expand_peerspecs(peer, peerspecs): return [ x for peerspec in peerspecs for x in ( auto_peerspecs(peer) if peerspec.get('auto') else [peerspec])] def gc_if_wgquick_add_peer(buf, network, peerspec): peer = network.peers[peerspec['name']] buf.write( '\n' f'# {peer.name}\n' '[Peer]\n' f'PublicKey = {peer['pubkey']}\n') for ip in ipspecs_to_ips(peer, peerspec.get('ips', ['{peer}'])): buf.write(f'AllowedIPs = {ip}\n') if (host := peer.get('host')): port = peer.get('port', 51820) buf.write(f'Endpoint = {host}:{port}\n') def gc_if_wgquick(if_, privkeys): buf = io.StringIO() buf.write( '[Interface]\n' f'PrivateKey = {privkeys.get(if_.network.name, 'FIXME')}\n') for addr in ipspecs_to_ips( if_.peer, if_.get('ips', ['{peer/-}']), interface=True): buf.write(f'Address = {addr}\n') if (port := if_.get('port')): buf.write(f'ListenPort = {port}\n') if (fwmark := if_.get('fwmark')): buf.write(f'FwMark = {fwmark}\n') for peerspec in expand_peerspecs(if_.peer, if_['peers']): gc_if_wgquick_add_peer(buf, if_.network, peerspec) return buf def gc_if_systemd_network(if_): buf = io.StringIO() buf.write( '[Match]\n' f'Name={if_.qualified_name}\n' '\n' '[Network]\n' 'IPMasquerade=both\n') for addr in ipspecs_to_ips( if_.peer, if_.get('ips', ['{peer/-}']), interface=True): buf.write(f'Address={addr}\n') return buf def gc_if_systemd_netdev_add_peer(buf, network, peerspec): peer = network.peers[peerspec['name']] buf.write( '\n' f'# {peer.name}\n' '[WireGuardPeer]\n' f'PublicKey={peer['pubkey']}\n') for ip in ipspecs_to_ips(peer, peerspec.get('ips', ['{peer}'])): buf.write(f'AllowedIPs={ip}\n') if (host := peer.get('host')): port = peer.get('port', 51820) buf.write(f'Endpoint={host}:{port}\n') def gc_if_systemd_netdev(if_, privkeys): buf = io.StringIO() buf.write( '[NetDev]\n' f'Name={if_.qualified_name}\n' 'Kind=wireguard\n' f'Description=WireGuard tunnel {if_.qualified_name}\n' '\n' '[WireGuard]\n' f'PrivateKey={privkeys.get(if_.network.name, 'FIXME')}\n') if (port := if_.get('port')): buf.write(f'ListenPort={port}\n') if (fwmark := if_.get('fwmark')): buf.write(f'FirewallMark={fwmark}\n') for peerspec in expand_peerspecs(if_.peer, if_['peers']): gc_if_systemd_netdev_add_peer(buf, if_.network, peerspec) return buf def buf_to_file(buf, path, mode=None): print(f'Creating file {path}') if mode: path = os.open(path, os.O_WRONLY | os.O_CREAT | os.O_TRUNC, mode=mode) with open(path, 'w') as f: buf.seek(0) shutil.copyfileobj(buf, f) def create_if_files(if_, privkeys): file_prefix = f'out/{if_.get('file-prefix', '')}' if if_.get('type') == 'systemd': buf_to_file( gc_if_systemd_netdev(if_, privkeys), f'{file_prefix}{if_.qualified_name}.netdev', mode=0o640) buf_to_file( gc_if_systemd_network(if_), f'{file_prefix}{if_.qualified_name}.network') else: buf_to_file( gc_if_wgquick(if_, privkeys), f'{file_prefix}{if_.qualified_name}.conf') def load_network_peer_interfaces(config, peer): ifs = peer.pop('if', {}) for ifset_name in peer.pop('ifsets', []): for if_name, if_conf in config['ifset'][ifset_name]['if'].items(): ifs.setdefault(if_name, copy.deepcopy(if_conf)) for if_name, if_conf in ifs.items(): # remove self from interface peers peers = [x for x in if_conf['peers'] if x.get('name') != peer.name] if not peers: continue if_conf['peers'] = peers Interface(if_conf, if_name, peer) def load_network_peers(config, network): for peer_name, peer_conf in copy.deepcopy(config['peer']).items(): if (peer_net_conf := peer_conf.pop('net', {}).get(network.name)): peer_conf = deep_merge(peer_conf, peer_net_conf) peer = Peer(peer_conf, peer_name, network) load_network_peer_interfaces(config, peer) def load_networks(config): nets = [] for net_name, net_conf in copy.deepcopy(config['net']).items(): net_conf['subnets'] = [ ipaddress.ip_network(x) for x in net_conf['subnets']] net = Network(net_conf, net_name) nets.append(net) load_network_peers(config, net) return nets def load_privkeys(): privkeys = {} for path in pathlib.Path.home().joinpath('.wg-genconf').glob('*.privkey'): with open(path, 'r') as f: privkeys[path.stem] = f.read().rstrip() return privkeys def main(): conf_file = sys.argv[1] peer_name = sys.argv[2] with open(conf_file, 'rb') as f: config = tomllib.load(f) networks = load_networks(config) privkeys = load_privkeys() if not config['peer'].get(peer_name): sys.exit(f"peer '{peer_name}' not configured") for network in networks: peer = network.peers[peer_name] for if_ in peer.interfaces.values(): # file=false allows associating peers without creating interface # config files, useful when an auto-peer interface is being # separately created if if_.get('file', True): create_if_files(if_, privkeys) main()