import uuid import ipaddress from django.db import models from django.contrib.auth import get_user_model from django.core.validators import MinValueValidator, MaxValueValidator from django.core.exceptions import FieldError class UncloudNetwork(models.Model): """ Storing IP networks """ network_address = models.GenericIPAddressField(null=False, unique=True) network_mask = models.IntegerField(null=False, validators=[MinValueValidator(0), MaxValueValidator(128)] ) description = models.CharField(max_length=256) @classmethod def populate_db_defaults(cls): for net, desc in [ ( "2a0a:e5c0:11::", "uncloud Billing" ), ( "2a0a:e5c0:11:1::", "uncloud Referral" ), ( "2a0a:e5c0:11:2::", "uncloud Coupon" ) ]: obj, created = cls.objects.get_or_create(network_address=net, defaults= { 'network_mask': 64, 'description': desc } ) def save(self, *args, **kwargs): if not ':' in self.network_address and self.network_mask > 32: raise FieldError("Mask cannot exceed 32 for IPv4") super().save(*args, **kwargs) def __str__(self): return f"{self.network_address}/{self.network_mask} {self.description}" class MACAdress(models.Model): default_prefix = 0x420000000000 class VPNPool(models.Model): """ Network address pools from which VPNs can be created """ uuid = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) network = models.GenericIPAddressField(unique=True) network_size = models.IntegerField(validators=[MinValueValidator(0), MaxValueValidator(128)]) subnetwork_size = models.IntegerField(validators=[ MinValueValidator(0), MaxValueValidator(128) ]) vpn_hostname = models.CharField(max_length=256) wireguard_private_key = models.CharField(max_length=48) @property def num_maximum_networks(self): """ sample: network_size = 40 subnetwork_size = 48 maximum_networks = 2^(48-40) 2nd sample: network_size = 8 subnetwork_size = 24 maximum_networks = 2^(24-8) """ return 2**(self.subnetwork_size - self.network_size) @property def used_networks(self): return self.vpnnetworkreservation_set.filter(vpnpool=self, status='used') @property def free_networks(self): return self.vpnnetworkreservation_set.filter(vpnpool=self, status='free') @property def num_used_networks(self): return len(self.used_networks) @property def num_free_networks(self): return self.num_maximum_networks - self.num_used_networks + len(self.free_networks) @property def next_free_network(self): if self.num_free_networks == 0: # FIXME: use right exception raise Exception("No free networks") if len(self.free_networks) > 0: return self.free_networks[0].address if len(self.used_networks) > 0: """ sample: pool = 2a0a:e5c1:200::/40 last_used = 2a0a:e5c1:204::/48 next: """ last_net = ipaddress.ip_network(self.used_networks.last().address) last_net_ip = last_net[0] if last_net_ip.version == 6: offset_to_next = 2**(128 - self.subnetwork_size) elif last_net_ip.version == 4: offset_to_next = 2**(32 - self.subnetwork_size) next_net_ip = last_net_ip + offset_to_next return str(next_net_ip) else: # first network to be created return self.network @property def wireguard_config_filename(self): return '/etc/wireguard/{}.conf'.format(self.network) @property def wireguard_config(self): wireguard_config = [ """ [Interface] ListenPort = 51820 PrivateKey = {privatekey} """.format(privatekey=self.wireguard_private_key) ] peers = [] for reservation in self.vpnnetworkreservation_set.filter(status='used'): public_key = reservation.vpnnetwork_set.first().wireguard_public_key peer_network = "{}/{}".format(reservation.address, self.subnetwork_size) owner = reservation.vpnnetwork_set.first().owner peers.append(""" # Owner: {owner} [Peer] PublicKey = {public_key} AllowedIPs = {peer_network} """.format( owner=owner, public_key=public_key, peer_network=peer_network)) wireguard_config.extend(peers) return "\n".join(wireguard_config) def configure_wireguard_vpnserver(self): """ This method is designed to run as a celery task and should not be called directly from the web """ # subprocess, ssh pass class VPNNetworkReservation(models.Model): """ This class tracks the used VPN networks. It will be deleted, when the product is cancelled. """ vpnpool = models.ForeignKey(VPNPool, on_delete=models.CASCADE) address = models.GenericIPAddressField(primary_key=True) status = models.CharField(max_length=256, default='used', choices = ( ('used', 'used'), ('free', 'free') ) ) class VPNNetwork(models.Model): """ A selected network. Used for tracking reservations / used networks """ network = models.ForeignKey(VPNNetworkReservation, on_delete=models.CASCADE, editable=False) wireguard_public_key = models.CharField(max_length=48) # default_recurring_period = RecurringPeriod.PER_365D @property def recurring_price(self): return 120 def delete(self, *args, **kwargs): self.network.status = 'free' self.network.save() super().save(*args, **kwargs) print("deleted {}".format(self))