diff options
| author | David Vazgenovich Shakaryan <dvshakaryan@gmail.com> | 2025-12-16 19:31:29 -0800 |
|---|---|---|
| committer | David Vazgenovich Shakaryan <dvshakaryan@gmail.com> | 2025-12-16 19:31:29 -0800 |
| commit | edd9adcb4c1a805543e66deb600c6811628c347d (patch) | |
| tree | 2455bb35bf485708c2e34d7339121886dab7e9af | |
| parent | 603bb358285bf9b5fc6cc50e7b25d32571e68eaa (diff) | |
| download | wg-genconf-master.tar.gz wg-genconf-master.tar.xz | |
| -rwxr-xr-x | wg-genconf.py | 57 |
1 files changed, 41 insertions, 16 deletions
diff --git a/wg-genconf.py b/wg-genconf.py index a4656ca..3d410a6 100755 --- a/wg-genconf.py +++ b/wg-genconf.py @@ -10,8 +10,8 @@ import io import ipaddress import os import re -import string import shutil +import string import sys import tomllib @@ -43,7 +43,8 @@ class Peer(dict): def ip_interfaces(self, version=None): return [ ipaddress.ip_interface( - f'{x[int(self['id']) + f'{x[ + int(self['id']) if x.version == 4 else int(str(self['id']), 16)]}' f'/{x.prefixlen}') @@ -71,16 +72,17 @@ class Interface(collections.UserDict): self.peer.add_interface(self) def __contains__(self, key): - return (key in self.data or + return ( + key in self.data or (key in self._INHERITABLE and key in self.peer)) def __getitem__(self, key): try: return self.data[key] - except KeyError as e: + except KeyError: if key in self._INHERITABLE: return self.peer[key] - raise e + raise @functools.cached_property def network(self): @@ -95,7 +97,7 @@ class Interface(collections.UserDict): 'network': self.network.name, 'peer': self.peer.name, 'if': self.name, - '_if' : f'-{self.name}' if self.name else ''}) + '_if': f'-{self.name}' if self.name else ''}) def deep_merge(d, src): @@ -106,6 +108,7 @@ def deep_merge(d, src): d[k] = v return d + # given peer with ips 10.0.0.20, fc00:ff:ff:dead:beef:a1:b2:c3 # {peer4/24} = 10.0.0.0/24 # {peer4/28} = 10.0.0.16/28 @@ -120,26 +123,30 @@ def deep_merge(d, src): # # by default, this returns network addresses with host bits removed. # passing interface=True will maintain the host bits. -def ipspec_to_ips(peer, ipspec, interface=False): +def ipspec_to_ips(peer, ipspec, *, interface=False): if not (m := re.fullmatch(r'\{peer([46])?(/.+)?\}', ipspec)): return [ipspec] version = m[1] subnet = m[2] - f = (ipaddress.ip_interface if interface else - lambda x: ipaddress.ip_network(x, False)) + f = ( + ipaddress.ip_interface + if interface + else lambda x: ipaddress.ip_network(x, strict=False)) return [ str(f(x if subnet == '/-' else f'{x.ip}{subnet or ''}')) for x in peer.ip_interfaces(version)] -def ipspecs_to_ips(peer, ipspecs, interface=False): + +def ipspecs_to_ips(peer, ipspecs, *, interface=False): return [ ip for ipspec in ipspecs for ip in ipspec_to_ips(peer, ipspec, interface=interface)] + def auto_peerspecs(peer, peerspec): return [ peerspec | {'name': x} @@ -150,6 +157,7 @@ def auto_peerspecs(peer, peerspec): for if_peer in if_['peers'] if if_peer.get('name') == peer.name))] + def expand_peerspecs(peer, peerspecs): return [ x @@ -159,6 +167,7 @@ def expand_peerspecs(peer, peerspecs): if peerspec.get('auto') else [peerspec])] + def gc_if_data(if_): if_data = { 'name': if_.formatted_name, @@ -177,8 +186,10 @@ def gc_if_data(if_): 'pubkey': peer['pubkey'], 'ips': ipspecs_to_ips(peer, peerspec.get('ips', ['{peer}'])), 'endpoint': None, - 'keepalive': (peerspec['keepalive'] if 'keepalive' in peerspec else - if_.get('keepalive')), + 'keepalive': ( + peerspec['keepalive'] + if 'keepalive' in peerspec + else if_.get('keepalive')), } if (host := peer.get('host')): peer_data['endpoint'] = f'{host}:{peer.get('port', 51820)}' @@ -186,6 +197,7 @@ def gc_if_data(if_): return if_data + def gc_if_wgquick(if_data): buf = io.StringIO() buf.write( @@ -213,6 +225,7 @@ def gc_if_wgquick(if_data): return buf + def gc_if_systemd_network(if_data): buf = io.StringIO() buf.write( @@ -226,6 +239,7 @@ def gc_if_systemd_network(if_data): return buf + def gc_if_systemd_netdev(if_data): buf = io.StringIO() buf.write( @@ -256,7 +270,8 @@ def gc_if_systemd_netdev(if_data): return buf -def buf_to_file(buf, path, mode=None): + +def buf_to_file(buf, path, *, mode=None): print(f'Creating file {path}') if mode: @@ -265,6 +280,7 @@ def buf_to_file(buf, path, mode=None): buf.seek(0) shutil.copyfileobj(buf, f) + def create_if_files(if_, output_dir): if_data = gc_if_data(if_) prefix = os.path.join(output_dir, if_.get('file-prefix', '')) @@ -282,6 +298,7 @@ def create_if_files(if_, output_dir): gc_if_wgquick(if_data), f'{prefix}{if_.formatted_name}.conf') + def load_network_peer_interfaces(config, peer): ifs = peer.pop('if', {}) @@ -298,13 +315,15 @@ def load_network_peer_interfaces(config, peer): Interface(if_conf, if_name, peer) + def load_network_peers(config, network): for peer_name, peer_conf in copy.deepcopy(config['peer']).items(): if (peer_net_conf := peer_conf.pop('net', {}).get(network.name)): - peer_conf = deep_merge(peer_conf, peer_net_conf) + deep_merge(peer_conf, peer_net_conf) peer = Peer(peer_conf, peer_name, network) load_network_peer_interfaces(config, peer) + def load_networks(config): nets = [] for net_name, net_conf in copy.deepcopy(config['net']).items(): @@ -315,18 +334,21 @@ def load_networks(config): load_network_peers(config, net) return nets + @functools.cache def read_privkey(path): try: - with open(path, 'r') as f: + with open(path) as f: return f.read().rstrip() except OSError: return 'FIXME' + def get_privkey(if_): return read_privkey(if_.format_string(os.path.expanduser( if_.get('privkey-path', '~/.wg-genconf/${network}.privkey')))) + def argparse_dir(s): if not os.path.exists(s): raise argparse.ArgumentTypeError(f'{s}: no such file or directory') @@ -334,6 +356,7 @@ def argparse_dir(s): raise argparse.ArgumentTypeError(f'{s}: not a directory') return s + def main(): parser = argparse.ArgumentParser() parser.add_argument( @@ -359,4 +382,6 @@ def main(): if if_.get('file', True): create_if_files(if_, args.output_dir) -main() + +if __name__ == '__main__': + main() |
