import uuid import ipaddress from django.db import models from django.contrib.auth import get_user_model from django.core.validators import MinValueValidator, MaxValueValidator from django.core.exceptions import FieldError, ValidationError from uncloud_pay.models import Order class MACAdress(models.Model): default_prefix = 0x420000000000 class VPNPool(models.Model): """ Network address pools from which VPNs can be created """ uuid = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) network = models.GenericIPAddressField(unique=True) network_size = models.IntegerField(validators=[MinValueValidator(0), MaxValueValidator(128)]) subnetwork_size = models.IntegerField(validators=[ MinValueValidator(0), MaxValueValidator(128) ]) vpn_hostname = models.CharField(max_length=256) wireguard_private_key = models.CharField(max_length=48) @property def num_maximum_networks(self): """ sample: network_size = 40 subnetwork_size = 48 maximum_networks = 2^(48-40) 2nd sample: network_size = 8 subnetwork_size = 24 maximum_networks = 2^(24-8) """ return 2**(self.subnetwork_size - self.network_size) @property def used_networks(self): return self.vpnnetworkreservation_set.filter(vpnpool=self, status='used') @property def free_networks(self): return self.vpnnetworkreservation_set.filter(vpnpool=self, status='free') @property def num_used_networks(self): return len(self.used_networks) @property def num_free_networks(self): return self.num_maximum_networks - self.num_used_networks + len(self.free_networks) @property def next_free_network(self): if self.num_free_networks == 0: # FIXME: use right exception raise Exception("No free networks") if len(self.free_networks) > 0: return self.free_networks[0].address if len(self.used_networks) > 0: """ sample: pool = 2a0a:e5c1:200::/40 last_used = 2a0a:e5c1:204::/48 next: """ last_net = ipaddress.ip_network(self.used_networks.last().address) last_net_ip = last_net[0] if last_net_ip.version == 6: offset_to_next = 2**(128 - self.subnetwork_size) elif last_net_ip.version == 4: offset_to_next = 2**(32 - self.subnetwork_size) next_net_ip = last_net_ip + offset_to_next return str(next_net_ip) else: # first network to be created return self.network @property def wireguard_config_filename(self): return '/etc/wireguard/{}.conf'.format(self.network) @property def wireguard_config(self): wireguard_config = [ """ [Interface] ListenPort = 51820 PrivateKey = {privatekey} """.format(privatekey=self.wireguard_private_key) ] peers = [] for reservation in self.vpnnetworkreservation_set.filter(status='used'): public_key = reservation.vpnnetwork_set.first().wireguard_public_key peer_network = "{}/{}".format(reservation.address, self.subnetwork_size) owner = reservation.vpnnetwork_set.first().owner peers.append(""" # Owner: {owner} [Peer] PublicKey = {public_key} AllowedIPs = {peer_network} """.format( owner=owner, public_key=public_key, peer_network=peer_network)) wireguard_config.extend(peers) return "\n".join(wireguard_config) def configure_wireguard_vpnserver(self): """ This method is designed to run as a celery task and should not be called directly from the web """ # subprocess, ssh pass class VPNNetworkReservation(models.Model): """ This class tracks the used VPN networks. It will be deleted, when the product is cancelled. """ vpnpool = models.ForeignKey(VPNPool, on_delete=models.CASCADE) address = models.GenericIPAddressField(primary_key=True) status = models.CharField(max_length=256, default='used', choices = ( ('used', 'used'), ('free', 'free') ) ) class VPNNetwork(models.Model): """ A selected network. Used for tracking reservations / used networks """ network = models.ForeignKey(VPNNetworkReservation, on_delete=models.CASCADE, editable=False) wireguard_public_key = models.CharField(max_length=48) # default_recurring_period = RecurringPeriod.PER_365D @property def recurring_price(self): return 120 def delete(self, *args, **kwargs): self.network.status = 'free' self.network.save() super().save(*args, **kwargs) print("deleted {}".format(self)) class ReverseDNSEntry(models.Model): """ A reverse DNS entry """ owner = models.ForeignKey(get_user_model(), on_delete=models.CASCADE) ip_address = models.GenericIPAddressField(null=False, unique=True) name = models.CharField(max_length=253, null=False) def save(self, *args, **kwargs): # Product.objects.filter(config__parameters__contains='reverse_dns_network') # FIXME: check if order is still active / not replaced allowed = False for order in Order.objects.filter(config__parameters__reverse_dns_network__isnull=False, owner=self.owner): network = order.config['parameters']['reverse_dns_network'] net = ipaddress.ip_network(network) addr = ipaddress.ip_address(self.ip_address) if addr in net: allowed = True break if not allowed: raise ValidationError(f"User {self.owner} does not have the right to create reverse DNS entry for {self.ip_address}") super().save(*args, **kwargs) def __str__(self): return f"{self.ip_address} - {self.name}"