import uuid import ipaddress from django.db import models from django.contrib.auth import get_user_model from django.core.validators import MinValueValidator, MaxValueValidator from django.core.exceptions import FieldError, ValidationError from uncloud_pay.models import Order class WireGuardVPNPool(models.Model): """ Network address pools from which VPNs can be created """ class Meta: constraints = [ models.UniqueConstraint(fields=['wg_name', 'vpn_server_hostname' ], name='unique_interface_name_per_host') ] # Linux interface naming is restricing to max 15 characters wg_name = models.CharField(max_length=15) network = models.GenericIPAddressField(unique=True) network_mask = models.IntegerField(validators=[MinValueValidator(0), MaxValueValidator(128)]) subnetwork_mask = models.IntegerField(validators=[ MinValueValidator(0), MaxValueValidator(128) ]) vpn_server_hostname = models.CharField(max_length=256) wireguard_private_key = models.CharField(max_length=48) wireguard_public_key = models.CharField(max_length=48) @property def max_pool_index(self): """ Return the highest possible network / last network id """ bits = self.subnetwork_mask - self.network_mask return (2**bits)-1 @property def ip_network(self): return ipaddress.ip_network(f"{self.network}/{self.network_mask}") def __str__(self): return f"{self.ip_network} (subnets: /{self.subnetwork_mask})" @property def wireguard_config(self): wireguard_config = [ f"[Interface]\nListenPort = 51820\nPrivateKey = {self.wireguard_private_key}\n" ] peers = [] for vpn in self.wireguardvpn_set.all(): public_key = vpn.wireguard_public_key peer_network = f"{vpn.address}/{self.subnetwork_mask}" owner = vpn.owner peers.append(f"# Owner: {owner}\n[Peer]\nPublicKey = {public_key}\nAllowedIPs = {peer_network}\n\n") wireguard_config.extend(peers) return "\n".join(wireguard_config) class WireGuardVPN(models.Model): """ Created VPNNetworks """ owner = models.ForeignKey(get_user_model(), on_delete=models.CASCADE) vpnpool = models.ForeignKey(WireGuardVPNPool, on_delete=models.CASCADE) pool_index = models.IntegerField(unique=True) wireguard_public_key = models.CharField(max_length=48, unique=True) class Meta: constraints = [ models.UniqueConstraint(fields=['vpnpool', 'wireguard_public_key'], name='wg_key_unique_per_pool') ] @property def network_mask(self): return self.vpnpool.subnetwork_mask @property def vpn_server(self): return self.vpnpool.vpn_server_hostname @property def vpn_server_public_key(self): return self.vpnpool.wireguard_public_key @property def address(self): """ Locate the correct subnet in the supernet First get the network itself """ net = self.vpnpool.ip_network subnet = net[(2**(128-self.vpnpool.subnetwork_mask)) * self.pool_index] return str(subnet) def __str__(self): return f"{self.address} ({self.pool_index})" class WireGuardVPNFreeLeases(models.Model): """ Previously used VPNNetworks """ vpnpool = models.ForeignKey(WireGuardVPNPool, on_delete=models.CASCADE) pool_index = models.IntegerField(unique=True) ################################################################################ class MACAdress(models.Model): default_prefix = 0x420000000000 class ReverseDNSEntry(models.Model): """ A reverse DNS entry """ owner = models.ForeignKey(get_user_model(), on_delete=models.CASCADE) ip_address = models.GenericIPAddressField(null=False, unique=True) name = models.CharField(max_length=253, null=False) @property def reverse_pointer(self): return ipaddress.ip_address(self.ip_address).reverse_pointer def implement(self): """ The implement function implements the change """ # Get all DNS entries (?) / update this DNS entry # convert to DNS name # pass def save(self, *args, **kwargs): # Product.objects.filter(config__parameters__contains='reverse_dns_network') # FIXME: check if order is still active / not replaced allowed = False product = None for order in Order.objects.filter(config__parameters__reverse_dns_network__isnull=False, owner=self.owner): network = order.config['parameters']['reverse_dns_network'] net = ipaddress.ip_network(network) addr = ipaddress.ip_address(self.ip_address) if addr in net: allowed = True product = order.product break if not allowed: raise ValidationError(f"User {self.owner} does not have the right to create reverse DNS entry for {self.ip_address}") super().save(*args, **kwargs) def __str__(self): return f"{self.ip_address} - {self.name}"