summaryrefslogtreecommitdiff
path: root/wg-genconf.py
diff options
context:
space:
mode:
Diffstat (limited to 'wg-genconf.py')
-rwxr-xr-xwg-genconf.py342
1 files changed, 190 insertions, 152 deletions
diff --git a/wg-genconf.py b/wg-genconf.py
index 604f0c0..1a1fd99 100755
--- a/wg-genconf.py
+++ b/wg-genconf.py
@@ -4,15 +4,93 @@
import collections
import copy
+import functools
import io
import ipaddress
-import pathlib
import os
+import pathlib
import re
import shutil
import sys
import tomllib
+
+class Network(dict):
+ def __init__(self, data, name):
+ super().__init__(data)
+
+ self.name = name
+ self.peers = {}
+
+ def add_peer(self, peer):
+ self.peers[peer.name] = peer
+
+
+class Peer(dict):
+ def __init__(self, data, name, network):
+ super().__init__(data)
+
+ self.name = name
+ self.network = network
+ self.interfaces = {}
+
+ self.network.add_peer(self)
+
+ def add_interface(self, interface):
+ self.interfaces[interface.name] = interface
+
+ def ip_interfaces(self, version=None):
+ return [
+ ipaddress.ip_interface(
+ f'{x[int(self['id'])
+ if x.version == 4
+ else int(str(self['id']), 16)]}'
+ f'/{x.prefixlen}')
+ for x in self.network['subnets']
+ if not version or x.version == int(version)]
+
+
+class Interface(collections.UserDict):
+ _INHERITABLE = [
+ 'port',
+ 'fwmark',
+
+ 'prefix',
+ 'file-prefix',
+ ]
+
+ def __init__(self, data, name, peer):
+ super().__init__(data)
+
+ self.name = name
+ self.peer = peer
+
+ self.peer.add_interface(self)
+
+ def __contains__(self, key):
+ return (key in self.data or
+ (key in self._INHERITABLE and key in self.peer))
+
+ def __getitem__(self, key):
+ try:
+ return self.data[key]
+ except KeyError as e:
+ if key in self._INHERITABLE:
+ return self.peer[key]
+ raise e
+
+ @functools.cached_property
+ def network(self):
+ return self.peer.network
+
+ @functools.cached_property
+ def qualified_name(self):
+ name = f'{self.get('prefix', '')}{self.network.name}'
+ if self.name:
+ name += f'-{self.name}'
+ return name
+
+
def deep_merge(d, src):
for k, v in src.items():
if isinstance(v, dict) and (dv := d.get(k)):
@@ -21,14 +99,6 @@ def deep_merge(d, src):
d[k] = v
return d
-def peer_ip_interfaces(network, id_, version=None):
- return [
- ipaddress.ip_interface(
- f'{x[int(id_) if x.version == 4 else int(str(id_), 16)]}'
- f'/{x.prefixlen}')
- for x in config['net'][network]
- if not version or x.version == int(version)]
-
# given peer with ips 10.0.0.20, fc00:ff:ff:dead:beef:a1:b2:c3
# {peer4/24} = 10.0.0.0/24
# {peer4/28} = 10.0.0.16/28
@@ -43,7 +113,7 @@ def peer_ip_interfaces(network, id_, version=None):
#
# by default, this returns network addresses with host bits removed.
# passing interface=True will maintain the host bits.
-def ipspec_to_ips(network, ipspec, peer, interface=False):
+def ipspec_to_ips(peer, ipspec, interface=False):
if not (m := re.fullmatch(r'\{peer(.)?(/.*)?\}', ipspec)):
return [ipspec]
@@ -55,115 +125,108 @@ def ipspec_to_ips(network, ipspec, peer, interface=False):
return [
str(f(x if subnet == '/-' else f'{x.ip}{subnet if subnet else ''}'))
- for x in peer_ip_interfaces(network, peer['id'], version)]
+ for x in peer.ip_interfaces(version)]
-def ipspecs_to_ips(network, ipspecs, peer, interface=False):
- return [ip
+def ipspecs_to_ips(peer, ipspecs, interface=False):
+ return [
+ ip
for ipspec in ipspecs
- for ip in ipspec_to_ips(net_name, ipspec, peer, interface=interface)]
-
-def expand_auto_peer(net_name, local_name):
- clients = list(dict.fromkeys(
- peer_name
- for peer_name in config['peer']
- for peer_if_conf in peer_conf(net_name, peer_name)['if'].values()
- for peer_if_peer in peer_if_conf['peers']
- if peer_if_peer.get('name') == local_name))
+ for ip in ipspec_to_ips(peer, ipspec, interface=interface)]
- return [{'name': x, 'ips': ['{peer}']} for x in clients]
+def auto_peerspecs(peer):
+ return [
+ {'name': x, 'ips': ['{peer}']}
+ for x in list(dict.fromkeys(
+ net_peer_name
+ for net_peer_name, net_peer in peer.network.peers.items()
+ for if_ in net_peer.interfaces.values()
+ for if_peer in if_['peers']
+ if if_peer.get('name') == peer.name))]
+
+def expand_peerspecs(peer, peerspecs):
+ return [
+ x
+ for peerspec in peerspecs
+ for x in (
+ auto_peerspecs(peer) if peerspec.get('auto') else [peerspec])]
-def gc_if_wgquick_add_peer(buf, net_name, peerspec):
- peer = peer_conf(net_name, peerspec['name'])
+def gc_if_wgquick_add_peer(buf, network, peerspec):
+ peer = network.peers[peerspec['name']]
buf.write(
'\n'
- f'# {peer['name']}\n'
+ f'# {peer.name}\n'
'[Peer]\n'
f'PublicKey = {peer['pubkey']}\n')
- for ip in ipspecs_to_ips(net_name, peerspec.get('ips', ['{peer}']), peer):
+ for ip in ipspecs_to_ips(peer, peerspec.get('ips', ['{peer}'])):
buf.write(f'AllowedIPs = {ip}\n')
if (host := peer.get('host')):
port = peer.get('port', 51820)
buf.write(f'Endpoint = {host}:{port}\n')
-def gc_if_wgquick(local, net_name, if_conf):
- cm = collections.ChainMap(if_conf, local)
+def gc_if_wgquick(if_, privkeys):
buf = io.StringIO()
- privkey = config['privkey'].get(net_name, 'FIXME')
-
buf.write(
'[Interface]\n'
- f'PrivateKey = {privkey}\n')
+ f'PrivateKey = {privkeys.get(if_.network.name, 'FIXME')}\n')
for addr in ipspecs_to_ips(
- net_name, if_conf.get('ips', ['{peer/-}']), local, interface=True):
+ if_.peer, if_.get('ips', ['{peer/-}']), interface=True):
buf.write(f'Address = {addr}\n')
- if (port := cm.get('port')) != 'auto':
+ if (port := if_.get('port')):
buf.write(f'ListenPort = {port}\n')
- if (fwmark := cm.get('fwmark')):
+ if (fwmark := if_.get('fwmark')):
buf.write(f'FwMark = {fwmark}\n')
- for peerspec in if_conf['peers']:
- if peerspec.get('auto'):
- for auto_spec in expand_auto_peer(net_name, local['name']):
- gc_if_wgquick_add_peer(buf, net_name, auto_spec)
- else:
- gc_if_wgquick_add_peer(buf, net_name, peerspec)
+ for peerspec in expand_peerspecs(if_.peer, if_['peers']):
+ gc_if_wgquick_add_peer(buf, if_.network, peerspec)
return buf
-def gc_if_systemd_network(local, net_name, netif_name):
+def gc_if_systemd_network(if_):
buf = io.StringIO()
-
buf.write(
'[Match]\n'
- f'Name={netif_name}\n'
+ f'Name={if_.qualified_name}\n'
'\n'
'[Network]\n'
'IPMasquerade=both\n')
for addr in ipspecs_to_ips(
- net_name, if_conf.get('ips', ['{peer/-}']), local, interface=True):
+ if_.peer, if_.get('ips', ['{peer/-}']), interface=True):
buf.write(f'Address={addr}\n')
return buf
-def gc_if_systemd_netdev_add_peer(buf, net_name, peerspec):
- peer = peer_conf(net_name, peerspec['name'])
+def gc_if_systemd_netdev_add_peer(buf, network, peerspec):
+ peer = network.peers[peerspec['name']]
buf.write(
'\n'
- f'# {peer['name']}\n'
+ f'# {peer.name}\n'
'[WireGuardPeer]\n'
f'PublicKey={peer['pubkey']}\n')
- for ip in ipspecs_to_ips(net_name, peerspec.get('ips', ['{peer}']), peer):
+ for ip in ipspecs_to_ips(peer, peerspec.get('ips', ['{peer}'])):
buf.write(f'AllowedIPs={ip}\n')
if (host := peer.get('host')):
port = peer.get('port', 51820)
buf.write(f'Endpoint={host}:{port}\n')
-def gc_if_systemd_netdev(local, net_name, if_conf, netif_name):
- cm = collections.ChainMap(if_conf, local)
+def gc_if_systemd_netdev(if_, privkeys):
buf = io.StringIO()
- privkey = config['privkey'].get(net_name, 'FIXME')
-
buf.write(
'[NetDev]\n'
- f'Name={netif_name}\n'
+ f'Name={if_.qualified_name}\n'
'Kind=wireguard\n'
- f'Description=WireGuard tunnel {netif_name}\n'
+ f'Description=WireGuard tunnel {if_.qualified_name}\n'
'\n'
'[WireGuard]\n'
- f'PrivateKey={privkey}\n')
- if (port := cm.get('port')) != 'auto':
+ f'PrivateKey={privkeys.get(if_.network.name, 'FIXME')}\n')
+ if (port := if_.get('port')):
buf.write(f'ListenPort={port}\n')
- if (fwmark := cm.get('fwmark')):
+ if (fwmark := if_.get('fwmark')):
buf.write(f'FirewallMark={fwmark}\n')
- for peerspec in if_conf['peers']:
- if peerspec.get('auto'):
- for auto_spec in expand_auto_peer(net_name, local['name']):
- gc_if_systemd_netdev_add_peer(buf, net_name, auto_spec)
- else:
- gc_if_systemd_netdev_add_peer(buf, net_name, peerspec)
+ for peerspec in expand_peerspecs(if_.peer, if_['peers']):
+ gc_if_systemd_netdev_add_peer(buf, if_.network, peerspec)
return buf
@@ -176,106 +239,81 @@ def buf_to_file(buf, path, mode=None):
buf.seek(0)
shutil.copyfileobj(buf, f)
-def create_if_files(local, net_name, if_name, if_conf):
- cm = collections.ChainMap(if_conf, local)
- netif_name = f'{cm.get('prefix', '')}{net_name}'
- if if_name:
- netif_name += f'-{if_name}'
- file_prefix = f'out/{cm.get('file-prefix', '')}'
+def create_if_files(if_, privkeys):
+ file_prefix = f'out/{if_.get('file-prefix', '')}'
- if if_conf.get('type') == 'systemd':
+ if if_.get('type') == 'systemd':
buf_to_file(
- gc_if_systemd_netdev(local, net_name, if_conf, netif_name),
- f'{file_prefix}{netif_name}.netdev',
+ gc_if_systemd_netdev(if_, privkeys),
+ f'{file_prefix}{if_.qualified_name}.netdev',
mode=0o640)
buf_to_file(
- gc_if_systemd_network(local, net_name, netif_name),
- f'{file_prefix}{netif_name}.network')
+ gc_if_systemd_network(if_),
+ f'{file_prefix}{if_.qualified_name}.network')
else:
buf_to_file(
- gc_if_wgquick(local, net_name, if_conf),
- f'{file_prefix}{netif_name}.conf')
-
-def peer_conf(net_name, peer_name):
- if not (peer := config['peer'].get(peer_name)):
- return
-
- if (nets := peer.get('net')) and (net := nets.get(net_name)):
- return net
-
- return peer['default']
-
-def process_config_networks(raw_config):
- return {
- k: [ipaddress.ip_network(subnet) for subnet in v['subnets']]
- for k, v in raw_config['net'].items()}
-
-def process_config_peer_net(raw_config, peer_conf):
- peer_conf.setdefault('if', {})
- if (ifsets := peer_conf.get('ifsets')):
- for ifset_name in ifsets:
- for if_name, if_conf in (
- raw_config['ifset'][ifset_name]['if'].items()):
- peer_conf['if'].setdefault(if_name, copy.deepcopy(if_conf))
-
- # remove self from if peers
- ifs = peer_conf['if']
- for if_name, if_conf in list(ifs.items()):
- filt_peers = [
- x for x in if_conf['peers'] if x.get('name') != peer_conf['name']]
- if filt_peers:
- if_conf['peers'] = filt_peers
- else:
- del ifs[if_name]
-
- return peer_conf
-
-def process_config_peers(raw_config):
- peers = {}
- for peer_name, peer_conf in copy.deepcopy(raw_config['peer']).items():
- peer_conf['name'] = peer_name
- peer_conf_net = peer_conf.pop('net', {})
-
- nets = {
- net_name: process_config_peer_net(
- raw_config,
- deep_merge(copy.deepcopy(peer_conf), net_conf))
- for net_name, net_conf in peer_conf_net.items()}
- # do this last because it mutates peer_conf
- default = process_config_peer_net(raw_config, peer_conf)
-
- peers[peer_name] = { 'default': default, 'net': nets }
-
- return peers
-
-def process_config(raw_config):
- return {
- 'net': process_config_networks(raw_config),
- 'peer': process_config_peers(raw_config),
- }
-
-def get_privkeys():
+ gc_if_wgquick(if_, privkeys),
+ f'{file_prefix}{if_.qualified_name}.conf')
+
+def load_network_peer_interfaces(config, peer):
+ ifs = peer.pop('if', {})
+
+ for ifset_name in peer.pop('ifsets', []):
+ for if_name, if_conf in config['ifset'][ifset_name]['if'].items():
+ ifs.setdefault(if_name, copy.deepcopy(if_conf))
+
+ for if_name, if_conf in ifs.items():
+ # remove self from interface peers
+ peers = [x for x in if_conf['peers'] if x.get('name') != peer.name]
+ if not peers:
+ continue
+ if_conf['peers'] = peers
+
+ Interface(if_conf, if_name, peer)
+
+def load_network_peers(config, network):
+ for peer_name, peer_conf in copy.deepcopy(config['peer']).items():
+ if (peer_net_conf := peer_conf.pop('net', {}).get(network.name)):
+ peer_conf = deep_merge(peer_conf, peer_net_conf)
+ peer = Peer(peer_conf, peer_name, network)
+ load_network_peer_interfaces(config, peer)
+
+def load_networks(config):
+ nets = []
+ for net_name, net_conf in copy.deepcopy(config['net']).items():
+ net_conf['subnets'] = [
+ ipaddress.ip_network(x) for x in net_conf['subnets']]
+ net = Network(net_conf, net_name)
+ nets.append(net)
+ load_network_peers(config, net)
+ return nets
+
+def load_privkeys():
privkeys = {}
for path in pathlib.Path.home().joinpath('.wg-genconf').glob('*.privkey'):
with open(path, 'r') as f:
privkeys[path.stem] = f.read().rstrip()
return privkeys
-conf_file = sys.argv[1]
-local_name = sys.argv[2]
+def main():
+ conf_file = sys.argv[1]
+ peer_name = sys.argv[2]
+
+ with open(conf_file, 'rb') as f:
+ config = tomllib.load(f)
+ networks = load_networks(config)
+ privkeys = load_privkeys()
-with open(conf_file, 'rb') as f:
- raw_config = tomllib.load(f)
-config = process_config(raw_config)
-config['privkey'] = get_privkeys()
+ if not config['peer'].get(peer_name):
+ sys.exit(f"peer '{peer_name}' not configured")
-if not (local_peer := config['peer'].get(local_name)):
- sys.exit(f"peer '{local_name}' not configured")
+ for network in networks:
+ peer = network.peers[peer_name]
+ for if_ in peer.interfaces.values():
+ # file=false allows associating peers without creating interface
+ # config files, useful when an auto-peer interface is being
+ # separately created
+ if if_.get('file', True):
+ create_if_files(if_, privkeys)
-for net_name in local_peer['net']:
- local = peer_conf(net_name, local_name)
- for if_name, if_conf in local['if'].items():
- # file=false allows associating peers without creating a config file,
- # useful when an auto-peer interface is being separately created
- if if_conf.get('file', True):
- create_if_files(local, net_name, if_name, if_conf)
+main()