#!/usr/bin/env python3 import os import sys import json import pathlib import argparse import subprocess import ipaddress from ipaddress import IPv4Address, IPv4Network import dataclasses from dataclasses import dataclass UDP_PORT_START = 5000 UDP_PORT_ALLOC_INC = 100 NODE_TYPE_HOST = "host" NODE_TYPE_ROUTER = "router" NETWORK_PREFIX_FMT = "10.{}.0.0/24" SESSION_PREFIX = "vnet-" START_SHELL = "/bin/bash" VHOST_BINARY_NAME = "vhost" VROUTER_BINARY_NAME = "vrouter" class PortAllocator(): UDP_PORT_HEAD = UDP_PORT_START def __init__(self): self.next_port = PortAllocator.UDP_PORT_HEAD PortAllocator.UDP_PORT_HEAD += UDP_PORT_ALLOC_INC def get_next(self): ret = self.next_port self.next_port += 1 return ret class IPAllocator(): IP_PREFIX_HEAD = 0 def __init__(self, prefix: IPv4Network): self.prefix = prefix self.hosts = self.prefix.hosts() # Generator def next_ip(self) -> IPv4Address: try: next_addr = next(self.hosts) return next_addr except StopIteration: raise ValueError(f"Out of addresses in prefix {self.prefix}") @classmethod def _next_prefix(cls): prefix = IPv4Network(NETWORK_PREFIX_FMT.format(cls.IP_PREFIX_HEAD)) cls.IP_PREFIX_HEAD += 1 return prefix @classmethod def make_next(cls): return cls(cls._next_prefix()) @dataclass class Interface(): name: str addr: IPv4Address network: 'Network' udp_addr: str = "" def prefix(self) -> IPv4Network: return self.network.alloc.prefix def ip_cidr_format(self): return "{}/{}".format(self.addr, self.prefix().prefixlen) @dataclass class Node(): def __init__(self, name: str, type: str, path: str = ""): self.name = name self.node_type = type self.path = path self.interfaces: dict[str, Interface] = {} self.if_index = 0 def _next_ifname(self): if_name = "if{}".format(self.if_index) self.if_index += 1 return if_name def add_iface(self, ip: IPv4Address, network: 'Network', udp_addr: str): if_name = self._next_ifname() iface = Interface(name=if_name, addr=ip, network=network, udp_addr=udp_addr) self.interfaces[if_name] = iface def is_router(self): return self.node_type == NODE_TYPE_ROUTER def get_ip_on_network(self, target: 'Network') -> tuple[IPv4Address, str]: for _, iface in self.interfaces.items(): if iface.network == target: return iface.addr, iface.udp_addr raise ValueError("No matching interface found") def get_neighbor_router_ips(self) -> list[IPv4Address]: ret = [] if not self.is_router(): return ret for _, iface in self.interfaces.items(): for node in iface.network.links: if node.is_router() and node != self: neigh_ip, _ = node.get_ip_on_network(iface.network) ret.append(neigh_ip) return ret def get_neighboring_routers(self) -> list[tuple['Node', 'Interface']]: routers = [] for _, iface in self.interfaces.items(): for node in iface.network.links: if node.is_router() and node != self: routers.append((node, iface)) return routers def write_config(self, output_file): with open(output_file, "w") as fd: fd.write(f"# Auto-generated configuration for {self.name}\n\n") for _, iface in self.interfaces.items(): this_net = iface.network fd.write("interface {} {} {} # to network {}\n" .format(iface.name, iface.ip_cidr_format(), iface.udp_addr, this_net.name)) for neighbor in this_net.links: if neighbor.name == self.name: continue neighbor_ip, neighbor_udp_addr = neighbor.get_ip_on_network(this_net) fd.write("neighbor {} at {} via {} # {}\n" .format(neighbor_ip, neighbor_udp_addr, iface.name, neighbor.name)) fd.write("\n") fd.write("\n") if self.node_type == NODE_TYPE_ROUTER: fd.write("routing rip\n\n") # prefixes = [i.prefix() for i in self.interfaces.values() \ # if i.network.should_advertise(self)] # if len(prefixes) > 0: # fd.write("# Prefixes this router should advertise\n") # for p in prefixes: # fd.write(f"rip originate prefix {p}\n") # fd.write("\n") neighbor_router_ips = self.get_neighbor_router_ips() if len(neighbor_router_ips) > 0: fd.write("# Neighbor routers that should be sent RIP messages\n") for nr in neighbor_router_ips: fd.write(f"rip advertise-to {nr}\n") elif self.node_type == NODE_TYPE_HOST: routers = self.get_neighboring_routers() if len(routers) == 0: raise ValueError(f"No neighboring router found for host {self.name}") elif len(routers) > 1: print(f"Warning: multiple routers found for host {self.name}, selecting one") default_router, default_iface = routers[0] default_ip, _ = default_router.get_ip_on_network(default_iface.network) fd.write("# Default route\n") fd.write("route 0.0.0.0/0 via {}\n".format(default_ip)) else: raise ValueError("Invalid node type") def __eq__(self, other: 'Node'): return self.name == other.name def __str__(self): return f"Node({self.name})" def __repr__(self): return f"Node({self.name})" @dataclass class Network(): name: str links: list[Node] advertise_from: list[Node] alloc: IPAllocator = dataclasses.field(default_factory=IPAllocator.make_next) def should_advertise(self, node: Node): return node in self.advertise_from @dataclass class NetConfig(): nodes: list[Node] networks: list[Network] def build(self): udp_port_alloc = PortAllocator() # Assign prefixes for all networks for net in self.networks: for node in net.links: assigned_ip = net.alloc.next_ip() udp_port = udp_port_alloc.get_next() udp_addr = "127.0.0.1:{}".format(udp_port) node.add_iface(assigned_ip, net, udp_addr) def write_links(self, output_dir): for node in self.nodes: target_file = f"{str(output_dir)}/{node.name}.lnx" print(f"Writing {target_file}") node.write_config(target_file) def write_device_files(self, output_dir): nodes_file = f"{str(output_dir)}/nodes.json" binaries_file = f"{str(output_dir)}/binaries.example.json" device_types = {n.name: n.node_type for n in self.nodes} binary_paths = {} for node in self.nodes: binary_paths[node.name] = { "binary_path": f"./{VHOST_BINARY_NAME}" \ if node.node_type == NODE_TYPE_HOST else f"./{VROUTER_BINARY_NAME}" if node.node_type == NODE_TYPE_ROUTER else "" } write_json(device_types, nodes_file) write_json(binary_paths, binaries_file) # Custom parsing because we don't want to require any non-native libraries @classmethod def from_json(cls, json_data): def _get(d, k): if k not in d: raise ValueError(f"Missing key {k} in {d}") return d[k] nodes = {_get(d, "name"): Node(**d) for d in _get(json_data, "nodes")} def _get_node(name): if name not in nodes: raise ValueError(f"No node definition found for {name}") return nodes[name] networks = [] for net in _get(json_data, "networks"): links = [_get_node(n) for n in _get(net, "links")] advertise_from = [_get_node(n) for n in _get(net, "advertise-routes-from")] \ if "advertise-routes-from" in json_data else [] network = Network(name=_get(net, "name"), links=links, advertise_from=advertise_from) networks.append(network) return NetConfig(nodes=list(nodes.values()), networks=networks) def load_json(input_file): with open(input_file, "r") as fd: json_data = json.load(fd) return json_data def write_json(d, target_file): with open(target_file, "w") as fd: json.dump(d, fd, indent=True, sort_keys=True) def main(input_args): parser = argparse.ArgumentParser() parser.add_argument("net_json_file") parser.add_argument("output_dir") args = parser.parse_args(input_args) input_file = args.net_json_file output_path = pathlib.Path(args.output_dir) json_data = load_json(input_file) nc = NetConfig.from_json(json_data) nc.build() if not output_path.exists(): print(f"Output directory {str(output_path)} does not exist, creating") os.mkdir(output_path) nc.write_links(output_path) nc.write_device_files(output_path) if __name__ == "__main__": main(sys.argv[1:])