uncloud/archive/uncloud_etcd_based/uncloud/api/helper.py

149 lines
4.0 KiB
Python
Executable File

import binascii
import ipaddress
import random
import logging
import requests
from pyotp import TOTP
from uncloud.common.shared import shared
logger = logging.getLogger(__name__)
def check_otp(name, realm, token):
try:
data = {
"auth_name": shared.settings["otp"]["auth_name"],
"auth_token": TOTP(shared.settings["otp"]["auth_seed"]).now(),
"auth_realm": shared.settings["otp"]["auth_realm"],
"name": name,
"realm": realm,
"token": token,
}
except binascii.Error as err:
logger.error(
"Cannot compute OTP for seed: {}".format(
shared.settings["otp"]["auth_seed"]
)
)
return 400
response = requests.post(
shared.settings["otp"]["verification_controller_url"], json=data
)
return response.status_code
def resolve_vm_name(name, owner):
"""Return UUID of Virtual Machine of name == name and owner == owner
Input: name of vm, owner of vm.
Output: uuid of vm if found otherwise None
"""
result = next(
filter(
lambda vm: vm.value["owner"] == owner
and vm.value["name"] == name,
shared.vm_pool.vms,
),
None,
)
if result:
return result.key.split("/")[-1]
return None
def resolve_image_name(name, etcd_client):
"""Return image uuid given its name and its store
* If the provided name is not in correct format
i.e {store_name}:{image_name} return ValueError
* If no such image found then return KeyError
"""
seperator = ":"
# Ensure, user/program passed valid name that is of type string
try:
store_name_and_image_name = name.split(seperator)
"""
Examples, where it would work and where it would raise exception
"images:alpine" --> ["images", "alpine"]
"images" --> ["images"] it would raise Exception as non enough value to unpack
"images:alpine:meow" --> ["images", "alpine", "meow"] it would raise Exception
as too many values to unpack
"""
store_name, image_name = store_name_and_image_name
except Exception:
raise ValueError(
"Image name not in correct format i.e {store_name}:{image_name}"
)
images = etcd_client.get_prefix(
shared.settings["etcd"]["image_prefix"], value_in_json=True
)
# Try to find image with name == image_name and store_name == store_name
try:
image = next(
filter(
lambda im: im.value["name"] == image_name
and im.value["store_name"] == store_name,
images,
)
)
except StopIteration:
raise KeyError("No image with name {} found.".format(name))
else:
image_uuid = image.key.split("/")[-1]
return image_uuid
def random_bytes(num=6):
return [random.randrange(256) for _ in range(num)]
def generate_mac(uaa=False, multicast=False, oui=None, separator=":", byte_fmt="%02x"):
mac = random_bytes()
if oui:
if type(oui) == str:
oui = [int(chunk) for chunk in oui.split(separator)]
mac = oui + random_bytes(num=6 - len(oui))
else:
if multicast:
mac[0] |= 1 # set bit 0
else:
mac[0] &= ~1 # clear bit 0
if uaa:
mac[0] &= ~(1 << 1) # clear bit 1
else:
mac[0] |= 1 << 1 # set bit 1
return separator.join(byte_fmt % b for b in mac)
def mac2ipv6(mac, prefix):
# only accept MACs separated by a colon
parts = mac.split(":")
# modify parts to match IPv6 value
parts.insert(3, "ff")
parts.insert(4, "fe")
parts[0] = "%x" % (int(parts[0], 16) ^ 2)
# format output
ipv6_parts = [str(0)] * 4
for i in range(0, len(parts), 2):
ipv6_parts.append("".join(parts[i : i + 2]))
lower_part = ipaddress.IPv6Address(":".join(ipv6_parts))
prefix = ipaddress.IPv6Address(prefix)
return str(prefix + int(lower_part))