uncloud-mravi/ucloud/api/helper.py

151 lines
4.1 KiB
Python
Executable file

import binascii
import ipaddress
import random
import subprocess as sp
import logging
import requests
from pyotp import TOTP
from ucloud.shared import shared
from ucloud.settings import settings
logger = logging.getLogger(__name__)
def check_otp(name, realm, token):
try:
data = {
"auth_name": settings["otp"]["auth_name"],
"auth_token": TOTP(settings["otp"]["auth_seed"]).now(),
"auth_realm": settings["otp"]["auth_realm"],
"name": name,
"realm": realm,
"token": token,
}
except binascii.Error as err:
logger.error(
"Cannot compute OTP for seed: {}".format(
settings["otp"]["auth_seed"]
)
)
return 400
response = requests.post(
settings["otp"]["verification_controller_url"], json=data
)
return response.status_code
def resolve_vm_name(name, owner):
"""Return UUID of Virtual Machine of name == name and owner == owner
Input: name of vm, owner of vm.
Output: uuid of vm if found otherwise None
"""
result = next(
filter(
lambda vm: vm.value["owner"] == owner
and vm.value["name"] == name,
shared.vm_pool.vms,
),
None,
)
if result:
return result.key.split("/")[-1]
return None
def resolve_image_name(name, etcd_client):
"""Return image uuid given its name and its store
* If the provided name is not in correct format
i.e {store_name}:{image_name} return ValueError
* If no such image found then return KeyError
"""
seperator = ":"
# Ensure, user/program passed valid name that is of type string
try:
store_name_and_image_name = name.split(seperator)
"""
Examples, where it would work and where it would raise exception
"images:alpine" --> ["images", "alpine"]
"images" --> ["images"] it would raise Exception as non enough value to unpack
"images:alpine:meow" --> ["images", "alpine", "meow"] it would raise Exception
as too many values to unpack
"""
store_name, image_name = store_name_and_image_name
except Exception:
raise ValueError(
"Image name not in correct format i.e {store_name}:{image_name}"
)
images = etcd_client.get_prefix(
settings["etcd"]["image_prefix"], value_in_json=True
)
# Try to find image with name == image_name and store_name == store_name
try:
image = next(
filter(
lambda im: im.value["name"] == image_name
and im.value["store_name"] == store_name,
images,
)
)
except StopIteration:
raise KeyError("No image with name {} found.".format(name))
else:
image_uuid = image.key.split("/")[-1]
return image_uuid
def random_bytes(num=6):
return [random.randrange(256) for _ in range(num)]
def generate_mac(
uaa=False, multicast=False, oui=None, separator=":", byte_fmt="%02x"
):
mac = random_bytes()
if oui:
if type(oui) == str:
oui = [int(chunk) for chunk in oui.split(separator)]
mac = oui + random_bytes(num=6 - len(oui))
else:
if multicast:
mac[0] |= 1 # set bit 0
else:
mac[0] &= ~1 # clear bit 0
if uaa:
mac[0] &= ~(1 << 1) # clear bit 1
else:
mac[0] |= 1 << 1 # set bit 1
return separator.join(byte_fmt % b for b in mac)
def mac2ipv6(mac, prefix):
# only accept MACs separated by a colon
parts = mac.split(":")
# modify parts to match IPv6 value
parts.insert(3, "ff")
parts.insert(4, "fe")
parts[0] = "%x" % (int(parts[0], 16) ^ 2)
# format output
ipv6_parts = [str(0)] * 4
for i in range(0, len(parts), 2):
ipv6_parts.append("".join(parts[i : i + 2]))
lower_part = ipaddress.IPv6Address(":".join(ipv6_parts))
prefix = ipaddress.IPv6Address(prefix)
return str(prefix + int(lower_part))