uncloud/uncloud_net/models.py

186 lines
5.1 KiB
Python

import uuid
import ipaddress
from django.db import models
from django.contrib.auth import get_user_model
from django.core.validators import MinValueValidator, MaxValueValidator
from django.core.exceptions import FieldError, ValidationError
from uncloud_pay.models import Order
class WireGuardVPNPool(models.Model):
"""
Network address pools from which VPNs can be created
"""
class Meta:
constraints = [
models.UniqueConstraint(fields=['wg_name', 'vpn_server_hostname' ],
name='unique_interface_name_per_host')
]
# Linux interface naming is restricing to max 15 characters
wg_name = models.CharField(max_length=15)
network = models.GenericIPAddressField(unique=True)
network_mask = models.IntegerField(validators=[MinValueValidator(0),
MaxValueValidator(128)])
subnetwork_mask = models.IntegerField(validators=[
MinValueValidator(0),
MaxValueValidator(128)
])
vpn_server_hostname = models.CharField(max_length=256)
wireguard_private_key = models.CharField(max_length=48)
wireguard_public_key = models.CharField(max_length=48)
@property
def max_pool_index(self):
"""
Return the highest possible network / last network id
"""
bits = self.subnetwork_mask - self.network_mask
return (2**bits)-1
@property
def ip_network(self):
return ipaddress.ip_network(f"{self.network}/{self.network_mask}")
def __str__(self):
return f"{self.ip_network} (subnets: /{self.subnetwork_mask})"
@property
def wireguard_config(self):
wireguard_config = [ f"[Interface]\nListenPort = 51820\nPrivateKey = {self.wireguard_private_key}\n" ]
peers = []
for vpn in self.wireguardvpn_set.all():
public_key = vpn.wireguard_public_key
peer_network = f"{vpn.address}/{self.subnetwork_mask}"
owner = vpn.owner
peers.append(f"# Owner: {owner}\n[Peer]\nPublicKey = {public_key}\nAllowedIPs = {peer_network}\n\n")
wireguard_config.extend(peers)
return "\n".join(wireguard_config)
class WireGuardVPN(models.Model):
"""
Created VPNNetworks
"""
owner = models.ForeignKey(get_user_model(),
on_delete=models.CASCADE)
vpnpool = models.ForeignKey(WireGuardVPNPool,
on_delete=models.CASCADE)
pool_index = models.IntegerField(unique=True)
wireguard_public_key = models.CharField(max_length=48, unique=True)
@property
def network_mask(self):
return self.vpnpool.subnetwork_mask
@property
def vpn_server(self):
return self.vpnpool.vpn_server_hostname
@property
def vpn_server_public_key(self):
return self.vpnpool.wireguard_public_key
@property
def address(self):
"""
Locate the correct subnet in the supernet
First get the network itself
"""
net = self.vpnpool.ip_network
subnet = net[(2**(128-self.vpnpool.subnetwork_mask)) * self.pool_index]
return str(subnet)
def __str__(self):
return f"{self.address} ({self.pool_index})"
class WireGuardVPNFreeLeases(models.Model):
"""
Previously used VPNNetworks
"""
vpnpool = models.ForeignKey(WireGuardVPNPool,
on_delete=models.CASCADE)
pool_index = models.IntegerField(unique=True)
################################################################################
class MACAdress(models.Model):
default_prefix = 0x420000000000
class ReverseDNSEntry(models.Model):
"""
A reverse DNS entry
"""
owner = models.ForeignKey(get_user_model(),
on_delete=models.CASCADE)
ip_address = models.GenericIPAddressField(null=False, unique=True)
name = models.CharField(max_length=253, null=False)
@property
def reverse_pointer(self):
return ipaddress.ip_address(self.ip_address).reverse_pointer
def implement(self):
"""
The implement function implements the change
"""
# Get all DNS entries (?) / update this DNS entry
# convert to DNS name
#
pass
def save(self, *args, **kwargs):
# Product.objects.filter(config__parameters__contains='reverse_dns_network')
# FIXME: check if order is still active / not replaced
allowed = False
product = None
for order in Order.objects.filter(config__parameters__reverse_dns_network__isnull=False,
owner=self.owner):
network = order.config['parameters']['reverse_dns_network']
net = ipaddress.ip_network(network)
addr = ipaddress.ip_address(self.ip_address)
if addr in net:
allowed = True
product = order.product
break
if not allowed:
raise ValidationError(f"User {self.owner} does not have the right to create reverse DNS entry for {self.ip_address}")
super().save(*args, **kwargs)
def __str__(self):
return f"{self.ip_address} - {self.name}"