forked from uncloud/uncloud
		
	
		
			
				
	
	
		
			151 lines
		
	
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			151 lines
		
	
	
	
		
			4.1 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
import binascii
 | 
						|
import ipaddress
 | 
						|
import random
 | 
						|
import subprocess as sp
 | 
						|
import logging
 | 
						|
import requests
 | 
						|
 | 
						|
from pyotp import TOTP
 | 
						|
 | 
						|
from uncloud.shared import shared
 | 
						|
from uncloud.settings import settings
 | 
						|
 | 
						|
logger = logging.getLogger(__name__)
 | 
						|
 | 
						|
 | 
						|
def check_otp(name, realm, token):
 | 
						|
    try:
 | 
						|
        data = {
 | 
						|
            "auth_name": settings["otp"]["auth_name"],
 | 
						|
            "auth_token": TOTP(settings["otp"]["auth_seed"]).now(),
 | 
						|
            "auth_realm": settings["otp"]["auth_realm"],
 | 
						|
            "name": name,
 | 
						|
            "realm": realm,
 | 
						|
            "token": token,
 | 
						|
        }
 | 
						|
    except binascii.Error as err:
 | 
						|
        logger.error(
 | 
						|
            "Cannot compute OTP for seed: {}".format(
 | 
						|
                settings["otp"]["auth_seed"]
 | 
						|
            )
 | 
						|
        )
 | 
						|
        return 400
 | 
						|
 | 
						|
    response = requests.post(
 | 
						|
        settings["otp"]["verification_controller_url"], json=data
 | 
						|
    )
 | 
						|
    return response.status_code
 | 
						|
 | 
						|
 | 
						|
def resolve_vm_name(name, owner):
 | 
						|
    """Return UUID of Virtual Machine of name == name and owner == owner
 | 
						|
 | 
						|
    Input: name of vm, owner of vm.
 | 
						|
    Output: uuid of vm if found otherwise None
 | 
						|
    """
 | 
						|
    result = next(
 | 
						|
        filter(
 | 
						|
            lambda vm: vm.value["owner"] == owner
 | 
						|
            and vm.value["name"] == name,
 | 
						|
            shared.vm_pool.vms,
 | 
						|
        ),
 | 
						|
        None,
 | 
						|
    )
 | 
						|
    if result:
 | 
						|
        return result.key.split("/")[-1]
 | 
						|
 | 
						|
    return None
 | 
						|
 | 
						|
 | 
						|
def resolve_image_name(name, etcd_client):
 | 
						|
    """Return image uuid given its name and its store
 | 
						|
 | 
						|
       * If the provided name is not in correct format
 | 
						|
         i.e {store_name}:{image_name} return ValueError
 | 
						|
       * If no such image found then return KeyError
 | 
						|
 | 
						|
    """
 | 
						|
 | 
						|
    seperator = ":"
 | 
						|
 | 
						|
    # Ensure, user/program passed valid name that is of type string
 | 
						|
    try:
 | 
						|
        store_name_and_image_name = name.split(seperator)
 | 
						|
 | 
						|
        """
 | 
						|
        Examples, where it would work and where it would raise exception
 | 
						|
        "images:alpine" --> ["images", "alpine"]
 | 
						|
 | 
						|
        "images" --> ["images"] it would raise Exception as non enough value to unpack
 | 
						|
 | 
						|
        "images:alpine:meow" --> ["images", "alpine", "meow"] it would raise Exception
 | 
						|
                                                              as too many values to unpack
 | 
						|
        """
 | 
						|
        store_name, image_name = store_name_and_image_name
 | 
						|
    except Exception:
 | 
						|
        raise ValueError(
 | 
						|
            "Image name not in correct format i.e {store_name}:{image_name}"
 | 
						|
        )
 | 
						|
 | 
						|
    images = etcd_client.get_prefix(
 | 
						|
        settings["etcd"]["image_prefix"], value_in_json=True
 | 
						|
    )
 | 
						|
 | 
						|
    # Try to find image with name == image_name and store_name == store_name
 | 
						|
    try:
 | 
						|
        image = next(
 | 
						|
            filter(
 | 
						|
                lambda im: im.value["name"] == image_name
 | 
						|
                and im.value["store_name"] == store_name,
 | 
						|
                images,
 | 
						|
            )
 | 
						|
        )
 | 
						|
    except StopIteration:
 | 
						|
        raise KeyError("No image with name {} found.".format(name))
 | 
						|
    else:
 | 
						|
        image_uuid = image.key.split("/")[-1]
 | 
						|
 | 
						|
    return image_uuid
 | 
						|
 | 
						|
 | 
						|
def random_bytes(num=6):
 | 
						|
    return [random.randrange(256) for _ in range(num)]
 | 
						|
 | 
						|
 | 
						|
def generate_mac(
 | 
						|
    uaa=False, multicast=False, oui=None, separator=":", byte_fmt="%02x"
 | 
						|
):
 | 
						|
    mac = random_bytes()
 | 
						|
    if oui:
 | 
						|
        if type(oui) == str:
 | 
						|
            oui = [int(chunk) for chunk in oui.split(separator)]
 | 
						|
        mac = oui + random_bytes(num=6 - len(oui))
 | 
						|
    else:
 | 
						|
        if multicast:
 | 
						|
            mac[0] |= 1  # set bit 0
 | 
						|
        else:
 | 
						|
            mac[0] &= ~1  # clear bit 0
 | 
						|
        if uaa:
 | 
						|
            mac[0] &= ~(1 << 1)  # clear bit 1
 | 
						|
        else:
 | 
						|
            mac[0] |= 1 << 1  # set bit 1
 | 
						|
    return separator.join(byte_fmt % b for b in mac)
 | 
						|
 | 
						|
 | 
						|
def mac2ipv6(mac, prefix):
 | 
						|
    # only accept MACs separated by a colon
 | 
						|
    parts = mac.split(":")
 | 
						|
 | 
						|
    # modify parts to match IPv6 value
 | 
						|
    parts.insert(3, "ff")
 | 
						|
    parts.insert(4, "fe")
 | 
						|
    parts[0] = "%x" % (int(parts[0], 16) ^ 2)
 | 
						|
 | 
						|
    # format output
 | 
						|
    ipv6_parts = [str(0)] * 4
 | 
						|
    for i in range(0, len(parts), 2):
 | 
						|
        ipv6_parts.append("".join(parts[i : i + 2]))
 | 
						|
 | 
						|
    lower_part = ipaddress.IPv6Address(":".join(ipv6_parts))
 | 
						|
    prefix = ipaddress.IPv6Address(prefix)
 | 
						|
    return str(prefix + int(lower_part))
 |