summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xwg-genconf.py57
1 files changed, 41 insertions, 16 deletions
diff --git a/wg-genconf.py b/wg-genconf.py
index a4656ca..3d410a6 100755
--- a/wg-genconf.py
+++ b/wg-genconf.py
@@ -10,8 +10,8 @@ import io
import ipaddress
import os
import re
-import string
import shutil
+import string
import sys
import tomllib
@@ -43,7 +43,8 @@ class Peer(dict):
def ip_interfaces(self, version=None):
return [
ipaddress.ip_interface(
- f'{x[int(self['id'])
+ f'{x[
+ int(self['id'])
if x.version == 4
else int(str(self['id']), 16)]}'
f'/{x.prefixlen}')
@@ -71,16 +72,17 @@ class Interface(collections.UserDict):
self.peer.add_interface(self)
def __contains__(self, key):
- return (key in self.data or
+ return (
+ key in self.data or
(key in self._INHERITABLE and key in self.peer))
def __getitem__(self, key):
try:
return self.data[key]
- except KeyError as e:
+ except KeyError:
if key in self._INHERITABLE:
return self.peer[key]
- raise e
+ raise
@functools.cached_property
def network(self):
@@ -95,7 +97,7 @@ class Interface(collections.UserDict):
'network': self.network.name,
'peer': self.peer.name,
'if': self.name,
- '_if' : f'-{self.name}' if self.name else ''})
+ '_if': f'-{self.name}' if self.name else ''})
def deep_merge(d, src):
@@ -106,6 +108,7 @@ def deep_merge(d, src):
d[k] = v
return d
+
# given peer with ips 10.0.0.20, fc00:ff:ff:dead:beef:a1:b2:c3
# {peer4/24} = 10.0.0.0/24
# {peer4/28} = 10.0.0.16/28
@@ -120,26 +123,30 @@ def deep_merge(d, src):
#
# by default, this returns network addresses with host bits removed.
# passing interface=True will maintain the host bits.
-def ipspec_to_ips(peer, ipspec, interface=False):
+def ipspec_to_ips(peer, ipspec, *, interface=False):
if not (m := re.fullmatch(r'\{peer([46])?(/.+)?\}', ipspec)):
return [ipspec]
version = m[1]
subnet = m[2]
- f = (ipaddress.ip_interface if interface else
- lambda x: ipaddress.ip_network(x, False))
+ f = (
+ ipaddress.ip_interface
+ if interface
+ else lambda x: ipaddress.ip_network(x, strict=False))
return [
str(f(x if subnet == '/-' else f'{x.ip}{subnet or ''}'))
for x in peer.ip_interfaces(version)]
-def ipspecs_to_ips(peer, ipspecs, interface=False):
+
+def ipspecs_to_ips(peer, ipspecs, *, interface=False):
return [
ip
for ipspec in ipspecs
for ip in ipspec_to_ips(peer, ipspec, interface=interface)]
+
def auto_peerspecs(peer, peerspec):
return [
peerspec | {'name': x}
@@ -150,6 +157,7 @@ def auto_peerspecs(peer, peerspec):
for if_peer in if_['peers']
if if_peer.get('name') == peer.name))]
+
def expand_peerspecs(peer, peerspecs):
return [
x
@@ -159,6 +167,7 @@ def expand_peerspecs(peer, peerspecs):
if peerspec.get('auto')
else [peerspec])]
+
def gc_if_data(if_):
if_data = {
'name': if_.formatted_name,
@@ -177,8 +186,10 @@ def gc_if_data(if_):
'pubkey': peer['pubkey'],
'ips': ipspecs_to_ips(peer, peerspec.get('ips', ['{peer}'])),
'endpoint': None,
- 'keepalive': (peerspec['keepalive'] if 'keepalive' in peerspec else
- if_.get('keepalive')),
+ 'keepalive': (
+ peerspec['keepalive']
+ if 'keepalive' in peerspec
+ else if_.get('keepalive')),
}
if (host := peer.get('host')):
peer_data['endpoint'] = f'{host}:{peer.get('port', 51820)}'
@@ -186,6 +197,7 @@ def gc_if_data(if_):
return if_data
+
def gc_if_wgquick(if_data):
buf = io.StringIO()
buf.write(
@@ -213,6 +225,7 @@ def gc_if_wgquick(if_data):
return buf
+
def gc_if_systemd_network(if_data):
buf = io.StringIO()
buf.write(
@@ -226,6 +239,7 @@ def gc_if_systemd_network(if_data):
return buf
+
def gc_if_systemd_netdev(if_data):
buf = io.StringIO()
buf.write(
@@ -256,7 +270,8 @@ def gc_if_systemd_netdev(if_data):
return buf
-def buf_to_file(buf, path, mode=None):
+
+def buf_to_file(buf, path, *, mode=None):
print(f'Creating file {path}')
if mode:
@@ -265,6 +280,7 @@ def buf_to_file(buf, path, mode=None):
buf.seek(0)
shutil.copyfileobj(buf, f)
+
def create_if_files(if_, output_dir):
if_data = gc_if_data(if_)
prefix = os.path.join(output_dir, if_.get('file-prefix', ''))
@@ -282,6 +298,7 @@ def create_if_files(if_, output_dir):
gc_if_wgquick(if_data),
f'{prefix}{if_.formatted_name}.conf')
+
def load_network_peer_interfaces(config, peer):
ifs = peer.pop('if', {})
@@ -298,13 +315,15 @@ def load_network_peer_interfaces(config, peer):
Interface(if_conf, if_name, peer)
+
def load_network_peers(config, network):
for peer_name, peer_conf in copy.deepcopy(config['peer']).items():
if (peer_net_conf := peer_conf.pop('net', {}).get(network.name)):
- peer_conf = deep_merge(peer_conf, peer_net_conf)
+ deep_merge(peer_conf, peer_net_conf)
peer = Peer(peer_conf, peer_name, network)
load_network_peer_interfaces(config, peer)
+
def load_networks(config):
nets = []
for net_name, net_conf in copy.deepcopy(config['net']).items():
@@ -315,18 +334,21 @@ def load_networks(config):
load_network_peers(config, net)
return nets
+
@functools.cache
def read_privkey(path):
try:
- with open(path, 'r') as f:
+ with open(path) as f:
return f.read().rstrip()
except OSError:
return 'FIXME'
+
def get_privkey(if_):
return read_privkey(if_.format_string(os.path.expanduser(
if_.get('privkey-path', '~/.wg-genconf/${network}.privkey'))))
+
def argparse_dir(s):
if not os.path.exists(s):
raise argparse.ArgumentTypeError(f'{s}: no such file or directory')
@@ -334,6 +356,7 @@ def argparse_dir(s):
raise argparse.ArgumentTypeError(f'{s}: not a directory')
return s
+
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
@@ -359,4 +382,6 @@ def main():
if if_.get('file', True):
create_if_files(if_, args.output_dir)
-main()
+
+if __name__ == '__main__':
+ main()