import binascii import ipaddress import random import subprocess as sp import logging import requests from pyotp import TOTP from ucloud.config import vm_pool, config logger = logging.getLogger("ucloud.api.helper") def check_otp(name, realm, token): try: data = { "auth_name": config['otp']['auth_name'], "auth_token": TOTP(config['otp']['auth_seed']).now(), "auth_realm": config['otp']['auth_realm'], "name": name, "realm": realm, "token": token, } except binascii.Error as err: logger.error( "Cannot compute OTP for seed: {}".format(config['otp']['auth_seed']) ) return 400 response = requests.post( "{OTP_SERVER}{OTP_VERIFY_ENDPOINT}".format( OTP_SERVER=config['otp']['server'], OTP_VERIFY_ENDPOINT=config['otp']['verify_endpoint'] ), json=data, ) return response.status_code def resolve_vm_name(name, owner): """Return UUID of Virtual Machine of name == name and owner == owner Input: name of vm, owner of vm. Output: uuid of vm if found otherwise None """ result = next( filter( lambda vm: vm.value["owner"] == owner and vm.value["name"] == name, vm_pool.vms, ), None, ) if result: return result.key.split("/")[-1] return None def resolve_image_name(name, etcd_client): """Return image uuid given its name and its store * If the provided name is not in correct format i.e {store_name}:{image_name} return ValueError * If no such image found then return KeyError """ seperator = ":" # Ensure, user/program passed valid name that is of type string try: store_name_and_image_name = name.split(seperator) """ Examples, where it would work and where it would raise exception "images:alpine" --> ["images", "alpine"] "images" --> ["images"] it would raise Exception as non enough value to unpack "images:alpine:meow" --> ["images", "alpine", "meow"] it would raise Exception as too many values to unpack """ store_name, image_name = store_name_and_image_name except Exception: raise ValueError("Image name not in correct format i.e {store_name}:{image_name}") images = etcd_client.get_prefix(config['etcd']['image_prefix'], value_in_json=True) # Try to find image with name == image_name and store_name == store_name try: image = next(filter(lambda im: im.value['name'] == image_name and im.value['store_name'] == store_name, images)) except StopIteration: raise KeyError("No image with name {} found.".format(name)) else: image_uuid = image.key.split('/')[-1] return image_uuid def random_bytes(num=6): return [random.randrange(256) for _ in range(num)] def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='%02x'): mac = random_bytes() if oui: if type(oui) == str: oui = [int(chunk) for chunk in oui.split(separator)] mac = oui + random_bytes(num=6 - len(oui)) else: if multicast: mac[0] |= 1 # set bit 0 else: mac[0] &= ~1 # clear bit 0 if uaa: mac[0] &= ~(1 << 1) # clear bit 1 else: mac[0] |= 1 << 1 # set bit 1 return separator.join(byte_fmt % b for b in mac) def get_ip_addr(mac_address, device): """Return IP address of a device provided its mac address / link local address and the device with which it is connected. For Example, if we call get_ip_addr(mac_address="52:54:00:12:34:56", device="br0") the following two scenarios can happen 1. It would return None if we can't be able to find device whose mac_address is equal to the arg:mac_address or the mentioned arg:device does not exists or the ip address we found is local. 2. It would return ip_address of device whose mac_address is equal to arg:mac_address and is connected/neighbor of arg:device """ try: output = sp.check_output(['ip', '-6', 'neigh', 'show', 'dev', device], stderr=sp.PIPE) except sp.CalledProcessError: return None else: result = [] output = output.strip().decode("utf-8") output = output.split("\n") for entry in output: entry = entry.split() if entry: ip = ipaddress.ip_address(entry[0]) mac = entry[2] if ip.is_global and mac_address == mac: result.append(ip) return result def mac2ipv6(mac, prefix): # only accept MACs separated by a colon parts = mac.split(":") # modify parts to match IPv6 value parts.insert(3, "ff") parts.insert(4, "fe") parts[0] = "%x" % (int(parts[0], 16) ^ 2) # format output ipv6_parts = [str(0)] * 4 for i in range(0, len(parts), 2): ipv6_parts.append("".join(parts[i:i + 2])) lower_part = ipaddress.IPv6Address(":".join(ipv6_parts)) prefix = ipaddress.IPv6Address(prefix) return str(prefix + int(lower_part))