diff options
| -rwxr-xr-x | wg-genconf.py | 342 |
1 files changed, 190 insertions, 152 deletions
diff --git a/wg-genconf.py b/wg-genconf.py index 604f0c0..1a1fd99 100755 --- a/wg-genconf.py +++ b/wg-genconf.py @@ -4,15 +4,93 @@ import collections import copy +import functools import io import ipaddress -import pathlib import os +import pathlib import re import shutil import sys import tomllib + +class Network(dict): + def __init__(self, data, name): + super().__init__(data) + + self.name = name + self.peers = {} + + def add_peer(self, peer): + self.peers[peer.name] = peer + + +class Peer(dict): + def __init__(self, data, name, network): + super().__init__(data) + + self.name = name + self.network = network + self.interfaces = {} + + self.network.add_peer(self) + + def add_interface(self, interface): + self.interfaces[interface.name] = interface + + def ip_interfaces(self, version=None): + return [ + ipaddress.ip_interface( + f'{x[int(self['id']) + if x.version == 4 + else int(str(self['id']), 16)]}' + f'/{x.prefixlen}') + for x in self.network['subnets'] + if not version or x.version == int(version)] + + +class Interface(collections.UserDict): + _INHERITABLE = [ + 'port', + 'fwmark', + + 'prefix', + 'file-prefix', + ] + + def __init__(self, data, name, peer): + super().__init__(data) + + self.name = name + self.peer = peer + + self.peer.add_interface(self) + + def __contains__(self, key): + return (key in self.data or + (key in self._INHERITABLE and key in self.peer)) + + def __getitem__(self, key): + try: + return self.data[key] + except KeyError as e: + if key in self._INHERITABLE: + return self.peer[key] + raise e + + @functools.cached_property + def network(self): + return self.peer.network + + @functools.cached_property + def qualified_name(self): + name = f'{self.get('prefix', '')}{self.network.name}' + if self.name: + name += f'-{self.name}' + return name + + def deep_merge(d, src): for k, v in src.items(): if isinstance(v, dict) and (dv := d.get(k)): @@ -21,14 +99,6 @@ def deep_merge(d, src): d[k] = v return d -def peer_ip_interfaces(network, id_, version=None): - return [ - ipaddress.ip_interface( - f'{x[int(id_) if x.version == 4 else int(str(id_), 16)]}' - f'/{x.prefixlen}') - for x in config['net'][network] - if not version or x.version == int(version)] - # given peer with ips 10.0.0.20, fc00:ff:ff:dead:beef:a1:b2:c3 # {peer4/24} = 10.0.0.0/24 # {peer4/28} = 10.0.0.16/28 @@ -43,7 +113,7 @@ def peer_ip_interfaces(network, id_, version=None): # # by default, this returns network addresses with host bits removed. # passing interface=True will maintain the host bits. -def ipspec_to_ips(network, ipspec, peer, interface=False): +def ipspec_to_ips(peer, ipspec, interface=False): if not (m := re.fullmatch(r'\{peer(.)?(/.*)?\}', ipspec)): return [ipspec] @@ -55,115 +125,108 @@ def ipspec_to_ips(network, ipspec, peer, interface=False): return [ str(f(x if subnet == '/-' else f'{x.ip}{subnet if subnet else ''}')) - for x in peer_ip_interfaces(network, peer['id'], version)] + for x in peer.ip_interfaces(version)] -def ipspecs_to_ips(network, ipspecs, peer, interface=False): - return [ip +def ipspecs_to_ips(peer, ipspecs, interface=False): + return [ + ip for ipspec in ipspecs - for ip in ipspec_to_ips(net_name, ipspec, peer, interface=interface)] - -def expand_auto_peer(net_name, local_name): - clients = list(dict.fromkeys( - peer_name - for peer_name in config['peer'] - for peer_if_conf in peer_conf(net_name, peer_name)['if'].values() - for peer_if_peer in peer_if_conf['peers'] - if peer_if_peer.get('name') == local_name)) + for ip in ipspec_to_ips(peer, ipspec, interface=interface)] - return [{'name': x, 'ips': ['{peer}']} for x in clients] +def auto_peerspecs(peer): + return [ + {'name': x, 'ips': ['{peer}']} + for x in list(dict.fromkeys( + net_peer_name + for net_peer_name, net_peer in peer.network.peers.items() + for if_ in net_peer.interfaces.values() + for if_peer in if_['peers'] + if if_peer.get('name') == peer.name))] + +def expand_peerspecs(peer, peerspecs): + return [ + x + for peerspec in peerspecs + for x in ( + auto_peerspecs(peer) if peerspec.get('auto') else [peerspec])] -def gc_if_wgquick_add_peer(buf, net_name, peerspec): - peer = peer_conf(net_name, peerspec['name']) +def gc_if_wgquick_add_peer(buf, network, peerspec): + peer = network.peers[peerspec['name']] buf.write( '\n' - f'# {peer['name']}\n' + f'# {peer.name}\n' '[Peer]\n' f'PublicKey = {peer['pubkey']}\n') - for ip in ipspecs_to_ips(net_name, peerspec.get('ips', ['{peer}']), peer): + for ip in ipspecs_to_ips(peer, peerspec.get('ips', ['{peer}'])): buf.write(f'AllowedIPs = {ip}\n') if (host := peer.get('host')): port = peer.get('port', 51820) buf.write(f'Endpoint = {host}:{port}\n') -def gc_if_wgquick(local, net_name, if_conf): - cm = collections.ChainMap(if_conf, local) +def gc_if_wgquick(if_, privkeys): buf = io.StringIO() - privkey = config['privkey'].get(net_name, 'FIXME') - buf.write( '[Interface]\n' - f'PrivateKey = {privkey}\n') + f'PrivateKey = {privkeys.get(if_.network.name, 'FIXME')}\n') for addr in ipspecs_to_ips( - net_name, if_conf.get('ips', ['{peer/-}']), local, interface=True): + if_.peer, if_.get('ips', ['{peer/-}']), interface=True): buf.write(f'Address = {addr}\n') - if (port := cm.get('port')) != 'auto': + if (port := if_.get('port')): buf.write(f'ListenPort = {port}\n') - if (fwmark := cm.get('fwmark')): + if (fwmark := if_.get('fwmark')): buf.write(f'FwMark = {fwmark}\n') - for peerspec in if_conf['peers']: - if peerspec.get('auto'): - for auto_spec in expand_auto_peer(net_name, local['name']): - gc_if_wgquick_add_peer(buf, net_name, auto_spec) - else: - gc_if_wgquick_add_peer(buf, net_name, peerspec) + for peerspec in expand_peerspecs(if_.peer, if_['peers']): + gc_if_wgquick_add_peer(buf, if_.network, peerspec) return buf -def gc_if_systemd_network(local, net_name, netif_name): +def gc_if_systemd_network(if_): buf = io.StringIO() - buf.write( '[Match]\n' - f'Name={netif_name}\n' + f'Name={if_.qualified_name}\n' '\n' '[Network]\n' 'IPMasquerade=both\n') for addr in ipspecs_to_ips( - net_name, if_conf.get('ips', ['{peer/-}']), local, interface=True): + if_.peer, if_.get('ips', ['{peer/-}']), interface=True): buf.write(f'Address={addr}\n') return buf -def gc_if_systemd_netdev_add_peer(buf, net_name, peerspec): - peer = peer_conf(net_name, peerspec['name']) +def gc_if_systemd_netdev_add_peer(buf, network, peerspec): + peer = network.peers[peerspec['name']] buf.write( '\n' - f'# {peer['name']}\n' + f'# {peer.name}\n' '[WireGuardPeer]\n' f'PublicKey={peer['pubkey']}\n') - for ip in ipspecs_to_ips(net_name, peerspec.get('ips', ['{peer}']), peer): + for ip in ipspecs_to_ips(peer, peerspec.get('ips', ['{peer}'])): buf.write(f'AllowedIPs={ip}\n') if (host := peer.get('host')): port = peer.get('port', 51820) buf.write(f'Endpoint={host}:{port}\n') -def gc_if_systemd_netdev(local, net_name, if_conf, netif_name): - cm = collections.ChainMap(if_conf, local) +def gc_if_systemd_netdev(if_, privkeys): buf = io.StringIO() - privkey = config['privkey'].get(net_name, 'FIXME') - buf.write( '[NetDev]\n' - f'Name={netif_name}\n' + f'Name={if_.qualified_name}\n' 'Kind=wireguard\n' - f'Description=WireGuard tunnel {netif_name}\n' + f'Description=WireGuard tunnel {if_.qualified_name}\n' '\n' '[WireGuard]\n' - f'PrivateKey={privkey}\n') - if (port := cm.get('port')) != 'auto': + f'PrivateKey={privkeys.get(if_.network.name, 'FIXME')}\n') + if (port := if_.get('port')): buf.write(f'ListenPort={port}\n') - if (fwmark := cm.get('fwmark')): + if (fwmark := if_.get('fwmark')): buf.write(f'FirewallMark={fwmark}\n') - for peerspec in if_conf['peers']: - if peerspec.get('auto'): - for auto_spec in expand_auto_peer(net_name, local['name']): - gc_if_systemd_netdev_add_peer(buf, net_name, auto_spec) - else: - gc_if_systemd_netdev_add_peer(buf, net_name, peerspec) + for peerspec in expand_peerspecs(if_.peer, if_['peers']): + gc_if_systemd_netdev_add_peer(buf, if_.network, peerspec) return buf @@ -176,106 +239,81 @@ def buf_to_file(buf, path, mode=None): buf.seek(0) shutil.copyfileobj(buf, f) -def create_if_files(local, net_name, if_name, if_conf): - cm = collections.ChainMap(if_conf, local) - netif_name = f'{cm.get('prefix', '')}{net_name}' - if if_name: - netif_name += f'-{if_name}' - file_prefix = f'out/{cm.get('file-prefix', '')}' +def create_if_files(if_, privkeys): + file_prefix = f'out/{if_.get('file-prefix', '')}' - if if_conf.get('type') == 'systemd': + if if_.get('type') == 'systemd': buf_to_file( - gc_if_systemd_netdev(local, net_name, if_conf, netif_name), - f'{file_prefix}{netif_name}.netdev', + gc_if_systemd_netdev(if_, privkeys), + f'{file_prefix}{if_.qualified_name}.netdev', mode=0o640) buf_to_file( - gc_if_systemd_network(local, net_name, netif_name), - f'{file_prefix}{netif_name}.network') + gc_if_systemd_network(if_), + f'{file_prefix}{if_.qualified_name}.network') else: buf_to_file( - gc_if_wgquick(local, net_name, if_conf), - f'{file_prefix}{netif_name}.conf') - -def peer_conf(net_name, peer_name): - if not (peer := config['peer'].get(peer_name)): - return - - if (nets := peer.get('net')) and (net := nets.get(net_name)): - return net - - return peer['default'] - -def process_config_networks(raw_config): - return { - k: [ipaddress.ip_network(subnet) for subnet in v['subnets']] - for k, v in raw_config['net'].items()} - -def process_config_peer_net(raw_config, peer_conf): - peer_conf.setdefault('if', {}) - if (ifsets := peer_conf.get('ifsets')): - for ifset_name in ifsets: - for if_name, if_conf in ( - raw_config['ifset'][ifset_name]['if'].items()): - peer_conf['if'].setdefault(if_name, copy.deepcopy(if_conf)) - - # remove self from if peers - ifs = peer_conf['if'] - for if_name, if_conf in list(ifs.items()): - filt_peers = [ - x for x in if_conf['peers'] if x.get('name') != peer_conf['name']] - if filt_peers: - if_conf['peers'] = filt_peers - else: - del ifs[if_name] - - return peer_conf - -def process_config_peers(raw_config): - peers = {} - for peer_name, peer_conf in copy.deepcopy(raw_config['peer']).items(): - peer_conf['name'] = peer_name - peer_conf_net = peer_conf.pop('net', {}) - - nets = { - net_name: process_config_peer_net( - raw_config, - deep_merge(copy.deepcopy(peer_conf), net_conf)) - for net_name, net_conf in peer_conf_net.items()} - # do this last because it mutates peer_conf - default = process_config_peer_net(raw_config, peer_conf) - - peers[peer_name] = { 'default': default, 'net': nets } - - return peers - -def process_config(raw_config): - return { - 'net': process_config_networks(raw_config), - 'peer': process_config_peers(raw_config), - } - -def get_privkeys(): + gc_if_wgquick(if_, privkeys), + f'{file_prefix}{if_.qualified_name}.conf') + +def load_network_peer_interfaces(config, peer): + ifs = peer.pop('if', {}) + + for ifset_name in peer.pop('ifsets', []): + for if_name, if_conf in config['ifset'][ifset_name]['if'].items(): + ifs.setdefault(if_name, copy.deepcopy(if_conf)) + + for if_name, if_conf in ifs.items(): + # remove self from interface peers + peers = [x for x in if_conf['peers'] if x.get('name') != peer.name] + if not peers: + continue + if_conf['peers'] = peers + + Interface(if_conf, if_name, peer) + +def load_network_peers(config, network): + for peer_name, peer_conf in copy.deepcopy(config['peer']).items(): + if (peer_net_conf := peer_conf.pop('net', {}).get(network.name)): + peer_conf = deep_merge(peer_conf, peer_net_conf) + peer = Peer(peer_conf, peer_name, network) + load_network_peer_interfaces(config, peer) + +def load_networks(config): + nets = [] + for net_name, net_conf in copy.deepcopy(config['net']).items(): + net_conf['subnets'] = [ + ipaddress.ip_network(x) for x in net_conf['subnets']] + net = Network(net_conf, net_name) + nets.append(net) + load_network_peers(config, net) + return nets + +def load_privkeys(): privkeys = {} for path in pathlib.Path.home().joinpath('.wg-genconf').glob('*.privkey'): with open(path, 'r') as f: privkeys[path.stem] = f.read().rstrip() return privkeys -conf_file = sys.argv[1] -local_name = sys.argv[2] +def main(): + conf_file = sys.argv[1] + peer_name = sys.argv[2] + + with open(conf_file, 'rb') as f: + config = tomllib.load(f) + networks = load_networks(config) + privkeys = load_privkeys() -with open(conf_file, 'rb') as f: - raw_config = tomllib.load(f) -config = process_config(raw_config) -config['privkey'] = get_privkeys() + if not config['peer'].get(peer_name): + sys.exit(f"peer '{peer_name}' not configured") -if not (local_peer := config['peer'].get(local_name)): - sys.exit(f"peer '{local_name}' not configured") + for network in networks: + peer = network.peers[peer_name] + for if_ in peer.interfaces.values(): + # file=false allows associating peers without creating interface + # config files, useful when an auto-peer interface is being + # separately created + if if_.get('file', True): + create_if_files(if_, privkeys) -for net_name in local_peer['net']: - local = peer_conf(net_name, local_name) - for if_name, if_conf in local['if'].items(): - # file=false allows associating peers without creating a config file, - # useful when an auto-peer interface is being separately created - if if_conf.get('file', True): - create_if_files(local, net_name, if_name, if_conf) +main() |
