uncloud-mravi/uncloud_net/models.py
2020-10-11 22:32:08 +02:00

220 lines
6.5 KiB
Python

import uuid
import ipaddress
from django.db import models
from django.contrib.auth import get_user_model
from django.core.validators import MinValueValidator, MaxValueValidator
from django.core.exceptions import FieldError
class UncloudNetwork(models.Model):
"""
Storing IP networks
"""
network_address = models.GenericIPAddressField(null=False, unique=True)
network_mask = models.IntegerField(null=False,
validators=[MinValueValidator(0),
MaxValueValidator(128)]
)
description = models.CharField(max_length=256)
@classmethod
def populate_db_defaults(cls):
for net, desc in [
( "2a0a:e5c0:11::", "uncloud Billing" ),
( "2a0a:e5c0:11:1::", "uncloud Referral" )
]:
obj, created = cls.objects.get_or_create(network_address=net,
defaults= {
'network_mask': 64,
'description': desc
}
)
def save(self, *args, **kwargs):
if not ':' in self.network_address and self.network_mask > 32:
raise FieldError("Mask cannot exceed 32 for IPv4")
super().save(*args, **kwargs)
def __str__(self):
return f"{self.network_address}/{self.network_mask} {self.description}"
class MACAdress(models.Model):
default_prefix = 0x420000000000
class VPNPool(models.Model):
"""
Network address pools from which VPNs can be created
"""
uuid = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
network = models.GenericIPAddressField(unique=True)
network_size = models.IntegerField(validators=[MinValueValidator(0),
MaxValueValidator(128)])
subnetwork_size = models.IntegerField(validators=[
MinValueValidator(0),
MaxValueValidator(128)
])
vpn_hostname = models.CharField(max_length=256)
wireguard_private_key = models.CharField(max_length=48)
@property
def num_maximum_networks(self):
"""
sample:
network_size = 40
subnetwork_size = 48
maximum_networks = 2^(48-40)
2nd sample:
network_size = 8
subnetwork_size = 24
maximum_networks = 2^(24-8)
"""
return 2**(self.subnetwork_size - self.network_size)
@property
def used_networks(self):
return self.vpnnetworkreservation_set.filter(vpnpool=self, status='used')
@property
def free_networks(self):
return self.vpnnetworkreservation_set.filter(vpnpool=self, status='free')
@property
def num_used_networks(self):
return len(self.used_networks)
@property
def num_free_networks(self):
return self.num_maximum_networks - self.num_used_networks + len(self.free_networks)
@property
def next_free_network(self):
if self.num_free_networks == 0:
# FIXME: use right exception
raise Exception("No free networks")
if len(self.free_networks) > 0:
return self.free_networks[0].address
if len(self.used_networks) > 0:
"""
sample:
pool = 2a0a:e5c1:200::/40
last_used = 2a0a:e5c1:204::/48
next:
"""
last_net = ipaddress.ip_network(self.used_networks.last().address)
last_net_ip = last_net[0]
if last_net_ip.version == 6:
offset_to_next = 2**(128 - self.subnetwork_size)
elif last_net_ip.version == 4:
offset_to_next = 2**(32 - self.subnetwork_size)
next_net_ip = last_net_ip + offset_to_next
return str(next_net_ip)
else:
# first network to be created
return self.network
@property
def wireguard_config_filename(self):
return '/etc/wireguard/{}.conf'.format(self.network)
@property
def wireguard_config(self):
wireguard_config = [
"""
[Interface]
ListenPort = 51820
PrivateKey = {privatekey}
""".format(privatekey=self.wireguard_private_key) ]
peers = []
for reservation in self.vpnnetworkreservation_set.filter(status='used'):
public_key = reservation.vpnnetwork_set.first().wireguard_public_key
peer_network = "{}/{}".format(reservation.address, self.subnetwork_size)
owner = reservation.vpnnetwork_set.first().owner
peers.append("""
# Owner: {owner}
[Peer]
PublicKey = {public_key}
AllowedIPs = {peer_network}
""".format(
owner=owner,
public_key=public_key,
peer_network=peer_network))
wireguard_config.extend(peers)
return "\n".join(wireguard_config)
def configure_wireguard_vpnserver(self):
"""
This method is designed to run as a celery task and should
not be called directly from the web
"""
# subprocess, ssh
pass
class VPNNetworkReservation(models.Model):
"""
This class tracks the used VPN networks. It will be deleted, when the product is cancelled.
"""
vpnpool = models.ForeignKey(VPNPool,
on_delete=models.CASCADE)
address = models.GenericIPAddressField(primary_key=True)
status = models.CharField(max_length=256,
default='used',
choices = (
('used', 'used'),
('free', 'free')
)
)
class VPNNetwork(models.Model):
"""
A selected network. Used for tracking reservations / used networks
"""
network = models.ForeignKey(VPNNetworkReservation,
on_delete=models.CASCADE,
editable=False)
wireguard_public_key = models.CharField(max_length=48)
# default_recurring_period = RecurringPeriod.PER_365D
@property
def recurring_price(self):
return 120
def delete(self, *args, **kwargs):
self.network.status = 'free'
self.network.save()
super().save(*args, **kwargs)
print("deleted {}".format(self))