uncloud-mravi/api/helper.py

166 lines
5 KiB
Python
Raw Normal View History

import binascii
import requests
import random
import subprocess as sp
import ipaddress
from decouple import config
from pyotp import TOTP
from config import VM_POOL, etcd_client, IMAGE_PREFIX
def check_otp(name, realm, token):
try:
data = {
"auth_name": config("AUTH_NAME", ""),
"auth_token": TOTP(config("AUTH_SEED", "")).now(),
"auth_realm": config("AUTH_REALM", ""),
"name": name,
"realm": realm,
"token": token,
}
except binascii.Error:
return 400
response = requests.get(
"{OTP_SERVER}{OTP_VERIFY_ENDPOINT}".format(
OTP_SERVER=config("OTP_SERVER", ""),
OTP_VERIFY_ENDPOINT=config("OTP_VERIFY_ENDPOINT", "verify"),
),
json=data,
)
return response.status_code
def resolve_vm_name(name, owner):
"""Return UUID of Virtual Machine of name == name and owner == owner
Input: name of vm, owner of vm.
Output: uuid of vm if found otherwise None
"""
result = next(
filter(
lambda vm: vm.value["owner"] == owner and vm.value["name"] == name,
VM_POOL.vms,
),
None,
)
if result:
return result.key.split("/")[-1]
return None
def resolve_image_name(name, etcd_client):
"""Return image uuid given its name and its store
* If the provided name is not in correct format
i.e {store_name}:{image_name} return ValueError
* If no such image found then return KeyError
"""
seperator = ":"
# Ensure, user/program passed valid name that is of type string
try:
store_name_and_image_name = name.split(seperator)
"""
Examples, where it would work and where it would raise exception
"images:alpine" --> ["images", "alpine"]
"images" --> ["images"] it would raise Exception as non enough value to unpack
"images:alpine:meow" --> ["images", "alpine", "meow"] it would raise Exception
as too many values to unpack
"""
store_name, image_name = store_name_and_image_name
except Exception:
raise ValueError("Image name not in correct format i.e {store_name}:{image_name}")
images = etcd_client.get_prefix(IMAGE_PREFIX, value_in_json=True)
# Try to find image with name == image_name and store_name == store_name
try:
image = next(filter(lambda im: im.value['name'] == image_name \
and im.value['store_name'] == store_name, images))
except StopIteration:
raise KeyError("No image with name {} found.".format(name))
else:
image_uuid = image.key.split('/')[-1]
return image_uuid
def random_bytes(num=6):
return [random.randrange(256) for _ in range(num)]
def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='%02x'):
mac = random_bytes()
if oui:
if type(oui) == str:
oui = [int(chunk) for chunk in oui.split(separator)]
mac = oui + random_bytes(num=6-len(oui))
else:
if multicast:
mac[0] |= 1 # set bit 0
else:
mac[0] &= ~1 # clear bit 0
if uaa:
mac[0] &= ~(1 << 1) # clear bit 1
else:
mac[0] |= 1 << 1 # set bit 1
return separator.join(byte_fmt % b for b in mac)
def get_ip_addr(mac_address, device):
"""Return IP address of a device provided its mac address / link local address
and the device with which it is connected.
For Example, if we call get_ip_addr(mac_address="52:54:00:12:34:56", device="br0")
the following two scenarios can happen
1. It would return None if we can't be able to find device whose mac_address is equal
to the arg:mac_address or the mentioned arg:device does not exists or the ip address
we found is local.
2. It would return ip_address of device whose mac_address is equal to arg:mac_address
and is connected/neighbor of arg:device
"""
try:
output = sp.check_output(['ip','-6','neigh', 'show', 'dev', device], stderr=sp.PIPE)
except sp.CalledProcessError:
return None
else:
result = []
output = output.strip().decode("utf-8")
output = output.split("\n")
for entry in output:
entry = entry.split()
if entry:
ip = ipaddress.ip_address(entry[0])
mac = entry[2]
if ip.is_global and mac_address == mac:
result.append(ip)
return result
def increment_etcd_counter(etcd_client, key):
kv = etcd_client.get(key)
if kv:
counter = int(kv.value)
counter = counter + 1
else:
counter = 1
etcd_client.put(key, str(counter))
return counter
def get_etcd_counter(etcd_client, key):
kv = etcd_client.get(key)
if kv:
return int(kv.value)
return None