forked from uncloud/uncloud
195 lines
5.4 KiB
Python
195 lines
5.4 KiB
Python
import uuid
|
|
import ipaddress
|
|
|
|
from django.db import models
|
|
from django.contrib.auth import get_user_model
|
|
from django.core.validators import MinValueValidator, MaxValueValidator
|
|
from django.core.exceptions import FieldError, ValidationError
|
|
|
|
from uncloud_pay.models import Order
|
|
|
|
class WireGuardVPNPool(models.Model):
|
|
"""
|
|
Network address pools from which VPNs can be created
|
|
"""
|
|
|
|
class Meta:
|
|
constraints = [
|
|
models.UniqueConstraint(fields=['wg_name', 'vpn_server_hostname' ],
|
|
name='unique_interface_name_per_host')
|
|
]
|
|
|
|
|
|
# Linux interface naming is restricing to max 15 characters
|
|
wg_name = models.CharField(max_length=15)
|
|
|
|
network = models.GenericIPAddressField(unique=True)
|
|
network_mask = models.IntegerField(validators=[MinValueValidator(0),
|
|
MaxValueValidator(128)])
|
|
|
|
subnetwork_mask = models.IntegerField(validators=[
|
|
MinValueValidator(0),
|
|
MaxValueValidator(128)
|
|
])
|
|
|
|
vpn_server_hostname = models.CharField(max_length=256)
|
|
wireguard_private_key = models.CharField(max_length=48)
|
|
wireguard_public_key = models.CharField(max_length=48)
|
|
|
|
@property
|
|
def max_pool_index(self):
|
|
"""
|
|
Return the highest possible network / last network id
|
|
"""
|
|
|
|
bits = self.subnetwork_mask - self.network_mask
|
|
|
|
return (2**bits)-1
|
|
|
|
@property
|
|
def ip_network(self):
|
|
"""
|
|
Return the IP network based on our address and mask
|
|
"""
|
|
return ipaddress.ip_network(f"{self.network}/{self.network_mask}")
|
|
|
|
def __str__(self):
|
|
return f"{self.ip_network} (subnets: /{self.subnetwork_mask})"
|
|
|
|
@property
|
|
def wireguard_config(self):
|
|
wireguard_config = [ f"[Interface]\nListenPort = 51820\nPrivateKey = {self.wireguard_private_key}\n" ]
|
|
|
|
peers = []
|
|
|
|
for vpn in self.wireguardvpn_set.all():
|
|
public_key = vpn.wireguard_public_key
|
|
peer_network = f"{vpn.address}/{self.subnetwork_mask}"
|
|
owner = vpn.owner
|
|
|
|
peers.append(f"# Owner: {owner}\n[Peer]\nPublicKey = {public_key}\nAllowedIPs = {peer_network}\n\n")
|
|
|
|
wireguard_config.extend(peers)
|
|
|
|
return "\n".join(wireguard_config)
|
|
|
|
|
|
class WireGuardVPN(models.Model):
|
|
"""
|
|
Created VPNNetworks
|
|
"""
|
|
owner = models.ForeignKey(get_user_model(),
|
|
on_delete=models.CASCADE)
|
|
vpnpool = models.ForeignKey(WireGuardVPNPool,
|
|
on_delete=models.CASCADE)
|
|
|
|
pool_index = models.IntegerField(unique=True)
|
|
|
|
wireguard_public_key = models.CharField(max_length=48, unique=True)
|
|
|
|
class Meta:
|
|
constraints = [
|
|
models.UniqueConstraint(fields=['vpnpool', 'wireguard_public_key'],
|
|
name='wg_key_unique_per_pool')
|
|
]
|
|
|
|
|
|
@property
|
|
def network_mask(self):
|
|
return self.vpnpool.subnetwork_mask
|
|
|
|
@property
|
|
def vpn_server(self):
|
|
return self.vpnpool.vpn_server_hostname
|
|
|
|
@property
|
|
def vpn_server_public_key(self):
|
|
return self.vpnpool.wireguard_public_key
|
|
|
|
@property
|
|
def address(self):
|
|
"""
|
|
Locate the correct subnet in the supernet
|
|
|
|
First get the network itself
|
|
|
|
"""
|
|
|
|
net = self.vpnpool.ip_network
|
|
subnet = net[(2**(128-self.vpnpool.subnetwork_mask)) * self.pool_index]
|
|
|
|
return str(subnet)
|
|
|
|
def __str__(self):
|
|
return f"{self.address} ({self.pool_index})"
|
|
|
|
|
|
class WireGuardVPNFreeLeases(models.Model):
|
|
"""
|
|
Previously used VPNNetworks
|
|
"""
|
|
vpnpool = models.ForeignKey(WireGuardVPNPool,
|
|
on_delete=models.CASCADE)
|
|
|
|
pool_index = models.IntegerField(unique=True)
|
|
|
|
################################################################################
|
|
|
|
class MACAdress(models.Model):
|
|
default_prefix = 0x420000000000
|
|
|
|
|
|
class ReverseDNSEntry(models.Model):
|
|
"""
|
|
A reverse DNS entry
|
|
"""
|
|
owner = models.ForeignKey(get_user_model(),
|
|
on_delete=models.CASCADE)
|
|
|
|
ip_address = models.GenericIPAddressField(null=False, unique=True)
|
|
|
|
name = models.CharField(max_length=253, null=False)
|
|
|
|
@property
|
|
def reverse_pointer(self):
|
|
return ipaddress.ip_address(self.ip_address).reverse_pointer
|
|
|
|
def implement(self):
|
|
"""
|
|
The implement function implements the change
|
|
"""
|
|
|
|
# Get all DNS entries (?) / update this DNS entry
|
|
# convert to DNS name
|
|
#
|
|
pass
|
|
|
|
|
|
def save(self, *args, **kwargs):
|
|
# Product.objects.filter(config__parameters__contains='reverse_dns_network')
|
|
# FIXME: check if order is still active / not replaced
|
|
|
|
allowed = False
|
|
product = None
|
|
|
|
for order in Order.objects.filter(config__parameters__reverse_dns_network__isnull=False,
|
|
owner=self.owner):
|
|
network = order.config['parameters']['reverse_dns_network']
|
|
|
|
net = ipaddress.ip_network(network)
|
|
addr = ipaddress.ip_address(self.ip_address)
|
|
|
|
if addr in net:
|
|
allowed = True
|
|
product = order.product
|
|
break
|
|
|
|
|
|
if not allowed:
|
|
raise ValidationError(f"User {self.owner} does not have the right to create reverse DNS entry for {self.ip_address}")
|
|
|
|
super().save(*args, **kwargs)
|
|
|
|
|
|
def __str__(self):
|
|
return f"{self.ip_address} - {self.name}"
|