diff --git a/.gitignore b/.gitignore index 690d98e..55adfaf 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,7 @@ .idea .vscode -.env __pycache__ docs/build -*/log.txt \ No newline at end of file +logs.txt \ No newline at end of file diff --git a/Pipfile b/Pipfile index 22120cd..ec5b001 100644 --- a/Pipfile +++ b/Pipfile @@ -5,6 +5,7 @@ verify_ssl = true [dev-packages] prospector = {extras = ["with_everything"],version = "*"} +pylama = "*" [packages] python-decouple = "*" @@ -12,7 +13,6 @@ requests = "*" flask = "*" flask-restful = "*" bitmath = "*" -ucloud-common = {editable = true,git = "git+https://code.ungleich.ch/ucloud/ucloud_common.git",ref = "wip"} etcd3-wrapper = {editable = true,git = "git+https://code.ungleich.ch/ungleich-public/etcd3_wrapper.git",ref = "wip"} python-etcd3 = {editable = true,git = "git+https://github.com/kragniz/python-etcd3.git"} pyotp = "*" diff --git a/Pipfile.lock b/Pipfile.lock index fa02525..b9373d5 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "5e4aa65086afdf9ac2f1479e9e35684f767dfbbd13877c4e4a23dd471aef6c13" + "sha256": "f43a93c020eb20212b437fcc62882db03bfa93f4678eb930e31343d687c805ed" }, "pipfile-spec": 6, "requires": { @@ -379,10 +379,10 @@ }, "pynetbox": { "hashes": [ - "sha256:e171380b36bedb7e0cd6a735fe8193d5809b373897b6905a2de43342761426c7" + "sha256:09525a29f1ac8c1a54772d6e2b94a55b1db6ba6a1c5b07f7af6a6ce232b1f7d5" ], "index": "pypi", - "version": "==4.0.8" + "version": "==4.1.0" }, "pyotp": { "hashes": [ @@ -401,10 +401,10 @@ }, "python-decouple": { "hashes": [ - "sha256:1317df14b43efee4337a4aa02914bf004f010cd56d6c4bd894e6474ec8c4fe2d" + "sha256:55c546b85b0c47a15a47a4312d451a437f7344a9be3e001660bccd93b637de95" ], "index": "pypi", - "version": "==3.1" + "version": "==3.3" }, "python-etcd3": { "editable": true, @@ -522,11 +522,6 @@ ], "version": "==6.0.0" }, - "ucloud-common": { - "editable": true, - "git": "https://code.ungleich.ch/ucloud/ucloud_common.git", - "ref": "eba92e5d6723093a3cc2999ae1f5c284e65dc809" - }, "urllib3": { "hashes": [ "sha256:a8a318824cc77d1fd4b2bec2ded92646630d7fe8619497b142c84a9e6f5a7293", @@ -775,6 +770,14 @@ ], "version": "==2.4.2" }, + "pylama": { + "hashes": [ + "sha256:9bae53ef9c1a431371d6a8dca406816a60d547147b60a4934721898f553b7d8f", + "sha256:fd61c11872d6256b019ef1235be37b77c922ef37ac9797df6bd489996dddeb15" + ], + "index": "pypi", + "version": "==7.7.1" + }, "pylint": { "hashes": [ "sha256:5d77031694a5fb97ea95e828c8d10fc770a1df6eb3906067aaed42201a8a6a09", diff --git a/__init__.py b/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/api/__init__.py b/api/__init__.py new file mode 100644 index 0000000..eea436a --- /dev/null +++ b/api/__init__.py @@ -0,0 +1,3 @@ +import logging + +logger = logging.getLogger(__name__) diff --git a/api/common_fields.py b/api/common_fields.py index c2152c9..6a68763 100755 --- a/api/common_fields.py +++ b/api/common_fields.py @@ -1,7 +1,6 @@ import os -from config import etcd_client as client -from config import VM_PREFIX +from config import etcd_client, env_vars class Optional: @@ -49,6 +48,6 @@ class VmUUIDField(Field): self.validation = self.vm_uuid_validation def vm_uuid_validation(self): - r = client.get(os.path.join(VM_PREFIX, self.uuid)) + r = etcd_client.get(os.path.join(env_vars.get('VM_PREFIX'), self.uuid)) if not r: self.add_error("VM with uuid {} does not exists".format(self.uuid)) diff --git a/api/config.py b/api/config.py deleted file mode 100644 index b9e7b82..0000000 --- a/api/config.py +++ /dev/null @@ -1,33 +0,0 @@ -import logging - -from etcd3_wrapper import Etcd3Wrapper -from decouple import config - - -from ucloud_common.vm import VmPool -from ucloud_common.host import HostPool -from ucloud_common.request import RequestPool - -logging.basicConfig( - level=logging.DEBUG, - filename="log.txt", - filemode="a", - format="%(asctime)s: %(levelname)s - %(message)s", - datefmt="%d-%b-%y %H:%M:%S", -) - - -WITHOUT_CEPH = config("WITHOUT_CEPH", False, cast=bool) -VM_PREFIX = config("VM_PREFIX") -HOST_PREFIX = config("HOST_PREFIX") -REQUEST_PREFIX = config("REQUEST_PREFIX") -FILE_PREFIX = config("FILE_PREFIX") -IMAGE_PREFIX = config("IMAGE_PREFIX") -IMAGE_STORE_PREFIX = config("IMAGE_STORE_PREFIX") -NETWORK_PREFIX = config("NETWORK_PREFIX") - -etcd_client = Etcd3Wrapper(host=config("ETCD_URL")) - -VM_POOL = VmPool(etcd_client, VM_PREFIX) -HOST_POOL = HostPool(etcd_client, HOST_PREFIX) -REQUEST_POOL = RequestPool(etcd_client, REQUEST_PREFIX) diff --git a/api/create_image_store.py b/api/create_image_store.py index 796cc43..cddbacb 100755 --- a/api/create_image_store.py +++ b/api/create_image_store.py @@ -1,10 +1,8 @@ import json import os - from uuid import uuid4 -from config import etcd_client as client -from config import IMAGE_STORE_PREFIX +from config import etcd_client, env_vars data = { "is_public": True, @@ -14,4 +12,4 @@ data = { "attributes": {"list": [], "key": [], "pool": "images"}, } -client.put(os.path.join(IMAGE_STORE_PREFIX, uuid4().hex), json.dumps(data)) +etcd_client.put(os.path.join(env_vars.get('IMAGE_STORE_PREFIX'), uuid4().hex), json.dumps(data)) diff --git a/api/helper.py b/api/helper.py index 5f27c22..a45bd16 100755 --- a/api/helper.py +++ b/api/helper.py @@ -1,20 +1,20 @@ import binascii -import requests +import ipaddress import random import subprocess as sp -import ipaddress -from decouple import config +import requests from pyotp import TOTP -from config import VM_POOL, etcd_client, IMAGE_PREFIX + +from config import vm_pool, env_vars def check_otp(name, realm, token): try: data = { - "auth_name": config("AUTH_NAME", ""), - "auth_token": TOTP(config("AUTH_SEED", "")).now(), - "auth_realm": config("AUTH_REALM", ""), + "auth_name": env_vars.get("AUTH_NAME"), + "auth_token": TOTP(env_vars.get("AUTH_SEED")).now(), + "auth_realm": env_vars.get("AUTH_REALM"), "name": name, "realm": realm, "token": token, @@ -24,8 +24,8 @@ def check_otp(name, realm, token): response = requests.get( "{OTP_SERVER}{OTP_VERIFY_ENDPOINT}".format( - OTP_SERVER=config("OTP_SERVER", ""), - OTP_VERIFY_ENDPOINT=config("OTP_VERIFY_ENDPOINT", "verify"), + OTP_SERVER=env_vars.get("OTP_SERVER", ""), + OTP_VERIFY_ENDPOINT=env_vars.get("OTP_VERIFY_ENDPOINT", "verify"), ), json=data, ) @@ -41,7 +41,7 @@ def resolve_vm_name(name, owner): result = next( filter( lambda vm: vm.value["owner"] == owner and vm.value["name"] == name, - VM_POOL.vms, + vm_pool.vms, ), None, ) @@ -61,7 +61,7 @@ def resolve_image_name(name, etcd_client): """ seperator = ":" - + # Ensure, user/program passed valid name that is of type string try: store_name_and_image_name = name.split(seperator) @@ -79,7 +79,7 @@ def resolve_image_name(name, etcd_client): except Exception: raise ValueError("Image name not in correct format i.e {store_name}:{image_name}") - images = etcd_client.get_prefix(IMAGE_PREFIX, value_in_json=True) + images = etcd_client.get_prefix(env_vars.get('IMAGE_PREFIX'), value_in_json=True) # Try to find image with name == image_name and store_name == store_name try: @@ -89,7 +89,7 @@ def resolve_image_name(name, etcd_client): raise KeyError("No image with name {} found.".format(name)) else: image_uuid = image.key.split('/')[-1] - + return image_uuid @@ -102,16 +102,16 @@ def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt=' if oui: if type(oui) == str: oui = [int(chunk) for chunk in oui.split(separator)] - mac = oui + random_bytes(num=6-len(oui)) + mac = oui + random_bytes(num=6 - len(oui)) else: if multicast: - mac[0] |= 1 # set bit 0 + mac[0] |= 1 # set bit 0 else: - mac[0] &= ~1 # clear bit 0 + mac[0] &= ~1 # clear bit 0 if uaa: - mac[0] &= ~(1 << 1) # clear bit 1 + mac[0] &= ~(1 << 1) # clear bit 1 else: - mac[0] |= 1 << 1 # set bit 1 + mac[0] |= 1 << 1 # set bit 1 return separator.join(byte_fmt % b for b in mac) @@ -128,7 +128,7 @@ def get_ip_addr(mac_address, device): and is connected/neighbor of arg:device """ try: - output = sp.check_output(['ip','-6','neigh', 'show', 'dev', device], stderr=sp.PIPE) + output = sp.check_output(['ip', '-6', 'neigh', 'show', 'dev', device], stderr=sp.PIPE) except sp.CalledProcessError: return None else: @@ -145,25 +145,6 @@ def get_ip_addr(mac_address, device): return result -def increment_etcd_counter(etcd_client, key): - kv = etcd_client.get(key) - - if kv: - counter = int(kv.value) - counter = counter + 1 - else: - counter = 1 - - etcd_client.put(key, str(counter)) - return counter - - -def get_etcd_counter(etcd_client, key): - kv = etcd_client.get(key) - if kv: - return int(kv.value) - return None - def mac2ipv6(mac, prefix): # only accept MACs separated by a colon parts = mac.split(":") @@ -174,10 +155,10 @@ def mac2ipv6(mac, prefix): parts[0] = "%x" % (int(parts[0], 16) ^ 2) # format output - ipv6Parts = [str(0)]*4 + ipv6Parts = [str(0)] * 4 for i in range(0, len(parts), 2): - ipv6Parts.append("".join(parts[i:i+2])) - + ipv6Parts.append("".join(parts[i:i + 2])) + lower_part = ipaddress.IPv6Address(":".join(ipv6Parts)) prefix = ipaddress.IPv6Address(prefix) return str(prefix + int(lower_part)) diff --git a/api/main.py b/api/main.py index a5258cc..e621ce1 100644 --- a/api/main.py +++ b/api/main.py @@ -1,35 +1,19 @@ import json -import subprocess import os -import pynetbox -import decouple - -import schemas - +import subprocess from uuid import uuid4 +import pynetbox from flask import Flask, request from flask_restful import Resource, Api -from ucloud_common.vm import VMStatus -from ucloud_common.request import RequestEntry, RequestType - -from helper import (generate_mac, get_ip_addr, get_etcd_counter, - increment_etcd_counter, mac2ipv6) - -from config import ( - etcd_client, - WITHOUT_CEPH, - VM_PREFIX, - HOST_PREFIX, - FILE_PREFIX, - IMAGE_PREFIX, - NETWORK_PREFIX, - logging, - REQUEST_POOL, - VM_POOL, - HOST_POOL, -) +from common import counters +from common.request import RequestEntry, RequestType +from common.vm import VMStatus +from config import (etcd_client, request_pool, vm_pool, host_pool, env_vars) +from . import schemas +from .helper import generate_mac, mac2ipv6 +from api import logger app = Flask(__name__) api = Api(app) @@ -42,12 +26,12 @@ class CreateVM(Resource): validator = schemas.CreateVMSchema(data) if validator.is_valid(): vm_uuid = uuid4().hex - vm_key = os.path.join(VM_PREFIX, vm_uuid) + vm_key = os.path.join(env_vars.get("VM_PREFIX"), vm_uuid) specs = { - 'cpu': validator.specs['cpu'], - 'ram': validator.specs['ram'], - 'os-ssd': validator.specs['os-ssd'], - 'hdd': validator.specs['hdd'] + "cpu": validator.specs["cpu"], + "ram": validator.specs["ram"], + "os-ssd": validator.specs["os-ssd"], + "hdd": validator.specs["hdd"], } macs = [generate_mac() for i in range(len(data["network"]))] vm_entry = { @@ -61,15 +45,16 @@ class CreateVM(Resource): "log": [], "vnc_socket": "", "network": list(zip(data["network"], macs)), - "metadata": { - "ssh-keys": [] - }, + "metadata": {"ssh-keys": []}, } etcd_client.put(vm_key, vm_entry, value_in_json=True) # Create ScheduleVM Request - r = RequestEntry.from_scratch(type=RequestType.ScheduleVM, uuid=vm_uuid) - REQUEST_POOL.put(r) + r = RequestEntry.from_scratch( + type=RequestType.ScheduleVM, uuid=vm_uuid, + request_prefix=env_vars.get("REQUEST_PREFIX") + ) + request_pool.put(r) return {"message": "VM Creation Queued"}, 200 return validator.get_errors(), 400 @@ -81,13 +66,21 @@ class VmStatus(Resource): data = request.json validator = schemas.VMStatusSchema(data) if validator.is_valid(): - vm = VM_POOL.get(os.path.join(VM_PREFIX, data["uuid"])) + vm = vm_pool.get( + os.path.join(env_vars.get("VM_PREFIX"), data["uuid"]) + ) vm_value = vm.value.copy() vm_value["ip"] = [] for network_and_mac in vm.network: network_name, mac = network_and_mac - network = etcd_client.get(os.path.join(NETWORK_PREFIX, data["name"], network_name), - value_in_json=True) + network = etcd_client.get( + os.path.join( + env_vars.get("NETWORK_PREFIX"), + data["name"], + network_name, + ), + value_in_json=True, + ) ipv6_addr = network.value.get("ipv6").split("::")[0] + "::" vm_value["ip"].append(mac2ipv6(mac, ipv6_addr)) vm.value = vm_value @@ -102,7 +95,9 @@ class CreateImage(Resource): data = request.json validator = schemas.CreateImageSchema(data) if validator.is_valid(): - file_entry = etcd_client.get(os.path.join(FILE_PREFIX, data["uuid"])) + file_entry = etcd_client.get( + os.path.join(env_vars.get("FILE_PREFIX"), data["uuid"]) + ) file_entry_value = json.loads(file_entry.value) image_entry_json = { @@ -114,7 +109,8 @@ class CreateImage(Resource): "visibility": "public", } etcd_client.put( - os.path.join(IMAGE_PREFIX, data["uuid"]), json.dumps(image_entry_json) + os.path.join(env_vars.get("IMAGE_PREFIX"), data["uuid"]), + json.dumps(image_entry_json), ) return {"message": "Image queued for creation."} @@ -124,15 +120,18 @@ class CreateImage(Resource): class ListPublicImages(Resource): @staticmethod def get(): - images = etcd_client.get_prefix(IMAGE_PREFIX, value_in_json=True) + images = etcd_client.get_prefix( + env_vars.get("IMAGE_PREFIX"), value_in_json=True + ) r = {} r["images"] = [] for image in images: - image_key = "{}:{}".format(image.value["store_name"], image.value["name"]) - r["images"].append({ - "name":image_key, - "status": image.value["status"] - }) + image_key = "{}:{}".format( + image.value["store_name"], image.value["name"] + ) + r["images"].append( + {"name": image_key, "status": image.value["status"]} + ) return r, 200 @@ -143,34 +142,47 @@ class VMAction(Resource): validator = schemas.VmActionSchema(data) if validator.is_valid(): - vm_entry = VM_POOL.get(os.path.join(VM_PREFIX, data["uuid"])) + vm_entry = vm_pool.get( + os.path.join(env_vars.get("VM_PREFIX"), data["uuid"]) + ) action = data["action"] if action == "start": vm_entry.status = VMStatus.requested_start - VM_POOL.put(vm_entry) + vm_pool.put(vm_entry) action = "schedule" if action == "delete" and vm_entry.hostname == "": try: - path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1 :] + path_without_protocol = vm_entry.path[ + vm_entry.path.find(":") + 1: + ] - if WITHOUT_CEPH: + if env_vars.get("WITHOUT_CEPH"): command_to_delete = [ - "rm", "-rf", + "rm", + "-rf", os.path.join("/var/vm", vm_entry.uuid), ] else: - command_to_delete = ["rbd", "rm", path_without_protocol] + command_to_delete = [ + "rbd", + "rm", + path_without_protocol, + ] - subprocess.check_output(command_to_delete, stderr=subprocess.PIPE) + subprocess.check_output( + command_to_delete, stderr=subprocess.PIPE + ) except subprocess.CalledProcessError as e: if "No such file" in e.stderr.decode("utf-8"): etcd_client.client.delete(vm_entry.key) return {"message": "VM successfully deleted"} else: - logging.exception(e) - return {"message": "Some error occurred while deleting VM"} + logger.exception(e) + return { + "message": "Some error occurred while deleting VM" + } else: etcd_client.client.delete(vm_entry.key) return {"message": "VM successfully deleted"} @@ -179,8 +191,9 @@ class VMAction(Resource): type="{}VM".format(action.title()), uuid=data["uuid"], hostname=vm_entry.hostname, + request_prefix=env_vars.get("REQUEST_PREFIX") ) - REQUEST_POOL.put(r) + request_pool.put(r) return {"message": "VM {} Queued".format(action.title())}, 200 else: return validator.get_errors(), 400 @@ -193,15 +206,18 @@ class VMMigration(Resource): validator = schemas.VmMigrationSchema(data) if validator.is_valid(): - vm = VM_POOL.get(data["uuid"]) + vm = vm_pool.get(data["uuid"]) r = RequestEntry.from_scratch( type=RequestType.ScheduleVM, uuid=vm.uuid, - destination=os.path.join(HOST_PREFIX, data["destination"]), + destination=os.path.join( + env_vars.get("HOST_PREFIX"), data["destination"] + ), migration=True, + request_prefix=env_vars.get("REQUEST_PREFIX") ) - REQUEST_POOL.put(r) + request_pool.put(r) return {"message": "VM Migration Initialization Queued"}, 200 else: return validator.get_errors(), 400 @@ -214,7 +230,9 @@ class ListUserVM(Resource): validator = schemas.OTPSchema(data) if validator.is_valid(): - vms = etcd_client.get_prefix(VM_PREFIX, value_in_json=True) + vms = etcd_client.get_prefix( + env_vars.get("VM_PREFIX"), value_in_json=True + ) return_vms = [] user_vms = filter(lambda v: v.value["owner"] == data["name"], vms) for vm in user_vms: @@ -246,9 +264,13 @@ class ListUserFiles(Resource): validator = schemas.OTPSchema(data) if validator.is_valid(): - files = etcd_client.get_prefix(FILE_PREFIX, value_in_json=True) + files = etcd_client.get_prefix( + env_vars.get("FILE_PREFIX"), value_in_json=True + ) return_files = [] - user_files = list(filter(lambda f: f.value["owner"] == data["name"], files)) + user_files = list( + filter(lambda f: f.value["owner"] == data["name"], files) + ) for file in user_files: return_files.append( { @@ -267,7 +289,7 @@ class CreateHost(Resource): data = request.json validator = schemas.CreateHostSchema(data) if validator.is_valid(): - host_key = os.path.join(HOST_PREFIX, uuid4().hex) + host_key = os.path.join(env_vars.get("HOST_PREFIX"), uuid4().hex) host_entry = { "specs": data["specs"], "hostname": data["hostname"], @@ -284,7 +306,7 @@ class CreateHost(Resource): class ListHost(Resource): @staticmethod def get(): - hosts = HOST_POOL.hosts + hosts = host_pool.hosts r = { host.key: { "status": host.status, @@ -305,21 +327,38 @@ class GetSSHKeys(Resource): if not validator.key_name.value: # {user_prefix}/{realm}/{name}/key/ - etcd_key = os.path.join(decouple.config("USER_PREFIX"), data["realm"], - data["name"], "key") - etcd_entry = etcd_client.get_prefix(etcd_key, value_in_json=True) - - keys = {key.key.split("/")[-1]: key.value for key in etcd_entry} + etcd_key = os.path.join( + env_vars.get('USER_PREFIX'), + data["realm"], + data["name"], + "key", + ) + etcd_entry = etcd_client.get_prefix( + etcd_key, value_in_json=True + ) + + keys = { + key.key.split("/")[-1]: key.value for key in etcd_entry + } return {"keys": keys} else: # {user_prefix}/{realm}/{name}/key/{key_name} - etcd_key = os.path.join(decouple.config("USER_PREFIX"), data["realm"], - data["name"], "key", data["key_name"]) + etcd_key = os.path.join( + env_vars.get('USER_PREFIX'), + data["realm"], + data["name"], + "key", + data["key_name"], + ) etcd_entry = etcd_client.get(etcd_key, value_in_json=True) - + if etcd_entry: - return {"keys": {etcd_entry.key.split("/")[-1]: etcd_entry.value}} + return { + "keys": { + etcd_entry.key.split("/")[-1]: etcd_entry.value + } + } else: return {"keys": {}} else: @@ -332,13 +371,22 @@ class AddSSHKey(Resource): data = request.json validator = schemas.AddSSHSchema(data) if validator.is_valid(): - + # {user_prefix}/{realm}/{name}/key/{key_name} - etcd_key = os.path.join(USER_PREFIX, data["realm"], data["name"], - "key", data["key_name"]) + etcd_key = os.path.join( + env_vars.get("USER_PREFIX"), + data["realm"], + data["name"], + "key", + data["key_name"], + ) etcd_entry = etcd_client.get(etcd_key, value_in_json=True) if etcd_entry: - return {"message": "Key with name '{}' already exists".format(data["key_name"])} + return { + "message": "Key with name '{}' already exists".format( + data["key_name"] + ) + } else: # Key Not Found. It implies user' haven't added any key yet. etcd_client.put(etcd_key, data["key"], value_in_json=True) @@ -353,16 +401,25 @@ class RemoveSSHKey(Resource): data = request.json validator = schemas.RemoveSSHSchema(data) if validator.is_valid(): - + # {user_prefix}/{realm}/{name}/key/{key_name} - etcd_key = os.path.join(USER_PREFIX, data["realm"], data["name"], - "key", data["key_name"]) + etcd_key = os.path.join( + env_vars.get("USER_PREFIX"), + data["realm"], + data["name"], + "key", + data["key_name"], + ) etcd_entry = etcd_client.get(etcd_key, value_in_json=True) if etcd_entry: etcd_client.client.delete(etcd_key) return {"message": "Key successfully removed."} else: - return {"message": "No Key with name '{}' Exists at all.".format(data["key_name"])} + return { + "message": "No Key with name '{}' Exists at all.".format( + data["key_name"] + ) + } else: return validator.get_errors(), 400 @@ -374,29 +431,42 @@ class CreateNetwork(Resource): validator = schemas.CreateNetwork(data) if validator.is_valid(): - + network_entry = { - "id": increment_etcd_counter(etcd_client, "/v1/counter/vxlan"), + "id": counters.increment_etcd_counter( + etcd_client, "/v1/counter/vxlan" + ), "type": data["type"], } if validator.user.value: - nb = pynetbox.api(url=decouple.config("NETBOX_URL"), - token=decouple.config("NETBOX_TOKEN")) - nb_prefix = nb.ipam.prefixes.get(prefix=decouple.config("PREFIX")) + nb = pynetbox.api( + url=env_vars.get("NETBOX_URL"), + token=env_vars.get("NETBOX_TOKEN"), + ) + nb_prefix = nb.ipam.prefixes.get( + prefix=env_vars.get("PREFIX") + ) - prefix = nb_prefix.available_prefixes.create(data= - { - "prefix_length": decouple.config("PREFIX_LENGTH", cast=int), - "description": "{}'s network \"{}\"".format(data["name"], - data["network_name"]), - "is_pool": True + prefix = nb_prefix.available_prefixes.create( + data={ + "prefix_length": env_vars.get( + "PREFIX_LENGTH", cast=int + ), + "description": '{}\'s network "{}"'.format( + data["name"], data["network_name"] + ), + "is_pool": True, } ) network_entry["ipv6"] = prefix["prefix"] else: network_entry["ipv6"] = "fd00::/64" - - network_key = os.path.join(NETWORK_PREFIX, data["name"], data["network_name"]) + + network_key = os.path.join( + env_vars.get("NETWORK_PREFIX"), + data["name"], + data["network_name"], + ) etcd_client.put(network_key, network_entry, value_in_json=True) return {"message": "Network successfully added."} else: @@ -410,7 +480,9 @@ class ListUserNetwork(Resource): validator = schemas.OTPSchema(data) if validator.is_valid(): - prefix = os.path.join(NETWORK_PREFIX, data["name"]) + prefix = os.path.join( + env_vars.get("NETWORK_PREFIX"), data["name"] + ) networks = etcd_client.get_prefix(prefix, value_in_json=True) user_networks = [] for net in networks: @@ -443,5 +515,20 @@ api.add_resource(ListHost, "/host/list") api.add_resource(CreateNetwork, "/network/create") -if __name__ == "__main__": + +def main(): + data = { + "is_public": True, + "type": "ceph", + "name": "images", + "description": "first ever public image-store", + "attributes": {"list": [], "key": [], "pool": "images"}, + } + + etcd_client.put(os.path.join(env_vars.get('IMAGE_STORE_PREFIX'), uuid4().hex), json.dumps(data)) + app.run(host="::", debug=True) + + +if __name__ == "__main__": + main() diff --git a/api/schemas.py b/api/schemas.py index 70aed2f..28a1bc1 100755 --- a/api/schemas.py +++ b/api/schemas.py @@ -16,21 +16,15 @@ inflexible for our purpose. import json import os + import bitmath -import helper - -from ucloud_common.host import HostPool, HostStatus -from ucloud_common.vm import VmPool, VMStatus - -from common_fields import Field, VmUUIDField, Optional -from helper import check_otp, resolve_vm_name -from config import etcd_client as client -from config import (HOST_PREFIX, VM_PREFIX, IMAGE_PREFIX, - FILE_PREFIX, IMAGE_STORE_PREFIX, NETWORK_PREFIX) - -HOST_POOL = HostPool(client, HOST_PREFIX) -VM_POOL = VmPool(client, VM_PREFIX) +from common.host import HostStatus +from common.vm import VMStatus +from config import etcd_client, env_vars, vm_pool, host_pool +from . import helper +from .common_fields import Field, VmUUIDField +from .helper import check_otp, resolve_vm_name class BaseSchema: @@ -108,14 +102,14 @@ class CreateImageSchema(BaseSchema): super().__init__(data, fields) def file_uuid_validation(self): - file_entry = client.get(os.path.join(FILE_PREFIX, self.uuid.value)) + file_entry = etcd_client.get(os.path.join(env_vars.get('FILE_PREFIX'), self.uuid.value)) if file_entry is None: self.add_error( "Image File with uuid '{}' Not Found".format(self.uuid.value) ) def image_store_name_validation(self): - image_stores = list(client.get_prefix(IMAGE_STORE_PREFIX)) + image_stores = list(etcd_client.get_prefix(env_vars.get('IMAGE_STORE_PREFIX'))) image_store = next( filter( @@ -205,7 +199,7 @@ class CreateHostSchema(OTPSchema): class CreateVMSchema(OTPSchema): def __init__(self, data): self.parsed_specs = {} - + # Fields self.specs = Field("specs", dict, data.get("specs", KeyError)) self.vm_name = Field("vm_name", str, data.get("vm_name", KeyError)) @@ -222,10 +216,9 @@ class CreateVMSchema(OTPSchema): super().__init__(data=data, fields=fields) - def image_validation(self): try: - image_uuid = helper.resolve_image_name(self.image.value, client) + image_uuid = helper.resolve_image_name(self.image.value, etcd_client) except Exception as e: self.add_error(str(e)) else: @@ -237,17 +230,17 @@ class CreateVMSchema(OTPSchema): 'VM with same name "{}" already exists'.format(self.vm_name.value) ) - def network_validation(self): + def network_validation(self): _network = self.network.value if _network: for net in _network: - network = client.get(os.path.join(NETWORK_PREFIX, - self.name.value, - net), value_in_json=True) + network = etcd_client.get(os.path.join(env_vars.get('NETWORK_PREFIX'), + self.name.value, + net), value_in_json=True) if not network: - self.add_error("Network with name {} does not exists"\ - .format(net)) + self.add_error("Network with name {} does not exists" \ + .format(net)) def specs_validation(self): ALLOWED_BASE = 10 @@ -316,7 +309,7 @@ class VMStatusSchema(OTPSchema): super().__init__(data, fields) def validation(self): - vm = VM_POOL.get(self.uuid.value) + vm = vm_pool.get(self.uuid.value) if not ( vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" ): @@ -349,7 +342,7 @@ class VmActionSchema(OTPSchema): ) def validation(self): - vm = VM_POOL.get(self.uuid.value) + vm = vm_pool.get(self.uuid.value) if not ( vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" ): @@ -389,14 +382,14 @@ class VmMigrationSchema(OTPSchema): def destination_validation(self): host_key = self.destination.value - host = HOST_POOL.get(host_key) + host = host_pool.get(host_key) if not host: self.add_error("No Such Host ({}) exists".format(self.destination.value)) elif host.status != HostStatus.alive: self.add_error("Destination Host is dead") def validation(self): - vm = VM_POOL.get(self.uuid.value) + vm = vm_pool.get(self.uuid.value) if not ( vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" ): @@ -405,7 +398,7 @@ class VmMigrationSchema(OTPSchema): if vm.status != VMStatus.running: self.add_error("Can't migrate non-running VM") - if vm.hostname == os.path.join(HOST_PREFIX, self.destination.value): + if vm.hostname == os.path.join(env_vars.get('HOST_PREFIX'), self.destination.value): self.add_error("Destination host couldn't be same as Source Host") @@ -445,15 +438,15 @@ class CreateNetwork(OTPSchema): fields = [self.network_name, self.type, self.user] super().__init__(data, fields=fields) - + def network_name_validation(self): - network = client.get(os.path.join(NETWORK_PREFIX, - self.name.value, - self.network_name.value), - value_in_json=True) + network = etcd_client.get(os.path.join(env_vars.get('NETWORK_PREFIX'), + self.name.value, + self.network_name.value), + value_in_json=True) if network: - self.add_error("Network with name {} already exists"\ - .format(self.network_name.value)) + self.add_error("Network with name {} already exists" \ + .format(self.network_name.value)) def network_type_validation(self): supported_network_types = ["vxlan"] diff --git a/common/__init__.py b/common/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/common/classes.py b/common/classes.py new file mode 100644 index 0000000..2cea033 --- /dev/null +++ b/common/classes.py @@ -0,0 +1,48 @@ +from decouple import Config, RepositoryEnv, UndefinedValueError +from etcd3_wrapper import EtcdEntry + + +class EnvironmentVariables: + def __init__(self, env_file): + try: + env_config = Config(RepositoryEnv(env_file)) + except FileNotFoundError: + print("{} does not exists".format(env_file)) + exit(1) + else: + self.config = env_config + + def get(self, *args, **kwargs): + """Return value of var from env_vars""" + try: + value = self.config.get(*args, **kwargs) + except UndefinedValueError as e: + print(e) + exit(1) + else: + return value + + +class SpecificEtcdEntryBase: + def __init__(self, e: EtcdEntry): + self.key = e.key + + for k in e.value.keys(): + self.__setattr__(k, e.value[k]) + + def original_keys(self): + r = dict(self.__dict__) + if "key" in r: + del r["key"] + return r + + @property + def value(self): + return self.original_keys() + + @value.setter + def value(self, v): + self.__dict__ = v + + def __repr__(self): + return str(dict(self.__dict__)) diff --git a/common/counters.py b/common/counters.py new file mode 100644 index 0000000..066a870 --- /dev/null +++ b/common/counters.py @@ -0,0 +1,21 @@ +from etcd3_wrapper import Etcd3Wrapper + + +def increment_etcd_counter(etcd_client: Etcd3Wrapper, key): + kv = etcd_client.get(key) + + if kv: + counter = int(kv.value) + counter = counter + 1 + else: + counter = 1 + + etcd_client.put(key, str(counter)) + return counter + + +def get_etcd_counter(etcd_client: Etcd3Wrapper, key): + kv = etcd_client.get(key) + if kv: + return int(kv.value) + return None diff --git a/common/helpers.py b/common/helpers.py new file mode 100644 index 0000000..c0d64e4 --- /dev/null +++ b/common/helpers.py @@ -0,0 +1,39 @@ +import logging +import socket + +from os.path import join as join_path + + +def create_package_loggers(packages, base_path, mode="a"): + loggers = {} + for pkg in packages: + logger = logging.getLogger(pkg) + logger_handler = logging.FileHandler( + join_path(base_path, "{}.txt".format(pkg)), + mode=mode + ) + logger.setLevel(logging.DEBUG) + logger_handler.setFormatter(logging.Formatter(fmt="%(asctime)s: %(levelname)s - %(message)s", + datefmt="%d-%b-%y %H:%M:%S")) + logger.addHandler(logger_handler) + loggers[pkg] = logger + + +# TODO: Should be removed as soon as migration +# mechanism is finalized inside ucloud +def get_ipv4_address(): + # If host is connected to internet + # Return IPv4 address of machine + # Otherwise, return 127.0.0.1 + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + try: + s.connect(("8.8.8.8", 80)) + except socket.timeout: + address = "127.0.0.1" + except Exception as e: + logging.getLogger().exception(e) + address = "127.0.0.1" + else: + address = s.getsockname()[0] + + return address diff --git a/common/host.py b/common/host.py new file mode 100644 index 0000000..ccbf7a8 --- /dev/null +++ b/common/host.py @@ -0,0 +1,67 @@ +import time +from datetime import datetime +from os.path import join +from typing import List + +from .classes import SpecificEtcdEntryBase + + +class HostStatus: + """Possible Statuses of ucloud host.""" + + alive = "ALIVE" + dead = "DEAD" + + +class HostEntry(SpecificEtcdEntryBase): + """Represents Host Entry Structure and its supporting methods.""" + + def __init__(self, e): + self.specs = None # type: dict + self.hostname = None # type: str + self.status = None # type: str + self.last_heartbeat = None # type: str + + super().__init__(e) + + def update_heartbeat(self): + self.status = HostStatus.alive + self.last_heartbeat = time.strftime("%Y-%m-%d %H:%M:%S") + + def is_alive(self): + last_heartbeat = datetime.strptime(self.last_heartbeat, "%Y-%m-%d %H:%M:%S") + delta = datetime.now() - last_heartbeat + if delta.total_seconds() > 60: + return False + return True + + def declare_dead(self): + self.status = HostStatus.dead + self.last_heartbeat = time.strftime("%Y-%m-%d %H:%M:%S") + + +class HostPool: + def __init__(self, etcd_client, host_prefix): + self.client = etcd_client + self.prefix = host_prefix + + @property + def hosts(self) -> List[HostEntry]: + _hosts = self.client.get_prefix(self.prefix, value_in_json=True) + return [HostEntry(host) for host in _hosts] + + def get(self, key): + if not key.startswith(self.prefix): + key = join(self.prefix, key) + v = self.client.get(key, value_in_json=True) + if v: + return HostEntry(v) + return None + + def put(self, obj: HostEntry): + self.client.put(obj.key, obj.value, value_in_json=True) + + def by_status(self, status, _hosts=None): + if _hosts is None: + _hosts = self.hosts + return list(filter(lambda x: x.status == status, _hosts)) diff --git a/common/request.py b/common/request.py new file mode 100644 index 0000000..cadac80 --- /dev/null +++ b/common/request.py @@ -0,0 +1,46 @@ +import json +from os.path import join +from uuid import uuid4 + +from etcd3_wrapper.etcd3_wrapper import PsuedoEtcdEntry + +from .classes import SpecificEtcdEntryBase + + +class RequestType: + CreateVM = "CreateVM" + ScheduleVM = "ScheduleVM" + StartVM = "StartVM" + StopVM = "StopVM" + InitVMMigration = "InitVMMigration" + TransferVM = "TransferVM" + DeleteVM = "DeleteVM" + + +class RequestEntry(SpecificEtcdEntryBase): + + def __init__(self, e): + self.type = None # type: str + self.migration = None # type: bool + self.destination = None # type: str + self.uuid = None # type: str + self.hostname = None # type: str + super().__init__(e) + + @classmethod + def from_scratch(cls, request_prefix, **kwargs): + e = PsuedoEtcdEntry(join(request_prefix, uuid4().hex), + value=json.dumps(kwargs).encode("utf-8"), value_in_json=True) + return cls(e) + + +class RequestPool: + def __init__(self, etcd_client, request_prefix): + self.client = etcd_client + self.prefix = request_prefix + + def put(self, obj: RequestEntry): + if not obj.key.startswith(self.prefix): + obj.key = join(self.prefix, obj.key) + + self.client.put(obj.key, obj.value, value_in_json=True) diff --git a/common/vm.py b/common/vm.py new file mode 100644 index 0000000..c778fac --- /dev/null +++ b/common/vm.py @@ -0,0 +1,110 @@ +from contextlib import contextmanager +from datetime import datetime +from os.path import join + +from .classes import SpecificEtcdEntryBase + + +class VMStatus: + # Must be only assigned to brand new VM + requested_new = "REQUESTED_NEW" + + # Only Assigned to already created vm + requested_start = "REQUESTED_START" + + # These all are for running vms + requested_shutdown = "REQUESTED_SHUTDOWN" + requested_migrate = "REQUESTED_MIGRATE" + requested_delete = "REQUESTED_DELETE" + # either its image is not found or user requested + # to delete it + deleted = "DELETED" + + stopped = "STOPPED" # After requested_shutdown + killed = "KILLED" # either host died or vm died itself + + running = "RUNNING" + + error = "ERROR" # An error occurred that cannot be resolved automatically + + +class VMEntry(SpecificEtcdEntryBase): + + def __init__(self, e): + self.owner = None # type: str + self.specs = None # type: dict + self.hostname = None # type: str + self.status = None # type: str + self.image_uuid = None # type: str + self.log = None # type: list + self.in_migration = None # type: bool + + super().__init__(e) + + @property + def uuid(self): + return self.key.split("/")[-1] + + def declare_killed(self): + self.hostname = "" + self.in_migration = False + if self.status == VMStatus.running: + self.status = VMStatus.killed + + def declare_stopped(self): + self.hostname = "" + self.in_migration = False + self.status = VMStatus.stopped + + def add_log(self, msg): + self.log = self.log[:5] + self.log.append("{} - {}".format(datetime.now().isoformat(), msg)) + + @property + def path(self): + return "rbd:uservms/{}".format(self.uuid) + + +class VmPool: + def __init__(self, etcd_client, vm_prefix): + self.client = etcd_client + self.prefix = vm_prefix + + @property + def vms(self): + _vms = self.client.get_prefix(self.prefix, value_in_json=True) + return [VMEntry(vm) for vm in _vms] + + def by_host(self, host, _vms=None): + if _vms is None: + _vms = self.vms + return list(filter(lambda x: x.hostname == host, _vms)) + + def by_status(self, status, _vms=None): + if _vms is None: + _vms = self.vms + return list(filter(lambda x: x.status == status, _vms)) + + def except_status(self, status, _vms=None): + if _vms is None: + _vms = self.vms + return list(filter(lambda x: x.status != status, _vms)) + + def get(self, key): + if not key.startswith(self.prefix): + key = join(self.prefix, key) + v = self.client.get(key, value_in_json=True) + if v: + return VMEntry(v) + return None + + def put(self, obj: VMEntry): + self.client.put(obj.key, obj.value, value_in_json=True) + + @contextmanager + def get_put(self, key) -> VMEntry: + # Updates object at key on exit + obj = self.get(key) + yield obj + if obj: + self.put(obj) diff --git a/config.py b/config.py new file mode 100644 index 0000000..5729fed --- /dev/null +++ b/config.py @@ -0,0 +1,19 @@ +from etcd3_wrapper import Etcd3Wrapper + +from common.classes import EnvironmentVariables +from common.host import HostPool +from common.request import RequestPool +from common.vm import VmPool + +env_vars = EnvironmentVariables('/etc/ucloud/ucloud.conf') + +etcd_wrapper_args = () +etcd_wrapper_kwargs = {"host": env_vars.get("ETCD_URL")} + +etcd_client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs) + +host_pool = HostPool(etcd_client, env_vars.get('HOST_PREFIX')) +vm_pool = VmPool(etcd_client, env_vars.get('VM_PREFIX')) +request_pool = RequestPool(etcd_client, env_vars.get('REQUEST_PREFIX')) + +running_vms = [] diff --git a/docs/__init__.py b/docs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/docs/source/__init__.py b/docs/source/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/docs/source/conf.py b/docs/source/conf.py index d08b5b4..64509c4 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -21,7 +21,6 @@ project = 'ucloud' copyright = '2019, ungleich' author = 'ungleich' - # -- General configuration --------------------------------------------------- # Add any Sphinx extension module names here, as strings. They can be @@ -39,7 +38,6 @@ templates_path = ['_templates'] # This pattern also affects html_static_path and html_extra_path. exclude_patterns = [] - # -- Options for HTML output ------------------------------------------------- # The theme to use for HTML and HTML Help pages. See the documentation for @@ -50,4 +48,4 @@ html_theme = 'alabaster' # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] \ No newline at end of file +html_static_path = ['_static'] diff --git a/docs/source/index.rst b/docs/source/index.rst index b4dd510..0307de8 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -1,7 +1,7 @@ .. ucloud documentation master file, created by - sphinx-quickstart on Mon Nov 11 19:08:16 2019. - You can adapt this file completely to your liking, but it should at least - contain the root `toctree` directive. +sphinx-quickstart on Mon Nov 11 19:08:16 2019. +You can adapt this file completely to your liking, but it should at least +contain the root `toctree` directive. Welcome to ucloud's documentation! ================================== @@ -15,7 +15,7 @@ Welcome to ucloud's documentation! usage/usage-for-admins usage/usage-for-users usage/how-to-create-an-os-image-for-ucloud - + misc/todo Indices and tables ================== diff --git a/docs/source/introduction/installation.rst b/docs/source/introduction/installation.rst index 3428f90..b271ab9 100644 --- a/docs/source/introduction/installation.rst +++ b/docs/source/introduction/installation.rst @@ -36,6 +36,9 @@ Enable Edge Repos, Update and Upgrade Install Dependencies ~~~~~~~~~~~~~~~~~~~~ +.. note:: + The installation and configuration of a production grade etcd cluster + is out of scope of this manual. So, we will install etcd with default configuration. .. code-block:: sh :linenos: diff --git a/docs/source/misc/todo.rst b/docs/source/misc/todo.rst new file mode 100644 index 0000000..3b85e89 --- /dev/null +++ b/docs/source/misc/todo.rst @@ -0,0 +1,7 @@ +TODO +==== + +* Check for :code:`etcd3.exceptions.ConnectionFailedError` when calling some etcd operation to + avoid crashing whole application. +* Throw KeyError instead of returning None when some key is not found in etcd. +* Expose more details in ListUserFiles. \ No newline at end of file diff --git a/docs/source/usage/usage-for-admins.rst b/docs/source/usage/usage-for-admins.rst index b5a46a4..3c20fb4 100644 --- a/docs/source/usage/usage-for-admins.rst +++ b/docs/source/usage/usage-for-admins.rst @@ -55,7 +55,28 @@ To start host we created earlier, execute the following command Create OS Image --------------- -First, we need to upload the file. + +Create ucloud-init ready OS image (Optional) +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +This step is optional if you just want to test ucloud. However, sooner or later +you want to create OS images with ucloud-init to properly +contexualize VMs. + +1. Start a VM with OS image on which you want to install ucloud-init +2. Execute the following command on the started VM + + .. code-block:: sh + + apk add git + git clone https://code.ungleich.ch/ucloud/ucloud-init.git + cd ucloud-init + sh ./install.sh +3. Congratulations. Your image is now ucloud-init ready. + + +Upload Sample OS Image +~~~~~~~~~~~~~~~~~~~~~~ +Execute the following to upload the sample OS image file. .. code-block:: sh @@ -63,7 +84,7 @@ First, we need to upload the file. (cd /var/www/admin && wget http://[2a0a:e5c0:2:12:0:f0ff:fea9:c3d9]/alpine-untouched.qcow2) Run File Scanner and Image Scanner ------------------------------------- +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Currently, our uploaded file *alpine-untouched.qcow2* is not tracked by ucloud. We can only make images from tracked files. So, we need to track the file by running File Scanner diff --git a/filescanner/__init__.py b/filescanner/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/filescanner/main.py b/filescanner/main.py index 6495886..d1ffa46 100755 --- a/filescanner/main.py +++ b/filescanner/main.py @@ -1,14 +1,13 @@ -import os import glob +import os import pathlib -import time -import hashlib import subprocess as sp - -from decouple import config -from etcd3_wrapper import Etcd3Wrapper +import time from uuid import uuid4 +from etcd3_wrapper import Etcd3Wrapper + +from config import env_vars def getxattr(file, attr): @@ -22,7 +21,7 @@ def getxattr(file, attr): value = value.decode("utf-8") except sp.CalledProcessError: value = None - + return value @@ -32,8 +31,8 @@ def setxattr(file, attr, value): attr = "user." + attr sp.check_output(['setfattr', file, - '--name', attr, - '--value', str(value)]) + '--name', attr, + '--value', str(value)]) def sha512sum(file: str): @@ -68,12 +67,13 @@ except Exception as e: print('Make sure you have getfattr and setfattr available') exit(1) + def main(): - BASE_DIR = config("BASE_DIR") + BASE_DIR = env_vars.get("BASE_DIR") - FILE_PREFIX = config("FILE_PREFIX") + FILE_PREFIX = env_vars.get("FILE_PREFIX") - etcd_client = Etcd3Wrapper(host=config("ETCD_URL")) + etcd_client = Etcd3Wrapper(host=env_vars.get("ETCD_URL")) # Recursively Get All Files and Folder below BASE_DIR files = glob.glob("{}/**".format(BASE_DIR), recursive=True) @@ -125,4 +125,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/host/__init__.py b/host/__init__.py new file mode 100644 index 0000000..eea436a --- /dev/null +++ b/host/__init__.py @@ -0,0 +1,3 @@ +import logging + +logger = logging.getLogger(__name__) diff --git a/host/config.py b/host/config.py deleted file mode 100755 index c2dbb06..0000000 --- a/host/config.py +++ /dev/null @@ -1,36 +0,0 @@ -import logging - -from etcd3_wrapper import Etcd3Wrapper -from ucloud_common.vm import VmPool -from ucloud_common.host import HostPool -from ucloud_common.request import RequestPool -from decouple import config - -WITHOUT_CEPH = config("WITHOUT_CEPH", False, cast=bool) - -logging.basicConfig( - level=logging.DEBUG, - filename="log.txt", - filemode="a", - format="%(asctime)s: %(levelname)s - %(message)s", - datefmt="%d-%b-%y %H:%M:%S", -) - -etcd_wrapper_args = () -etcd_wrapper_kwargs = {"host": config("ETCD_URL")} - -etcd_client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs) - -HOST_PREFIX = config("HOST_PREFIX") -NETWORK_PREFIX = config("NETWORK_PREFIX") -VM_PREFIX = config("VM_PREFIX") -REQUEST_PREFIX = config("REQUEST_PREFIX") -VM_DIR = config("VM_DIR") -IMAGE_DIR = config("IMAGE_DIR") - - -host_pool = HostPool(etcd_client, HOST_PREFIX) -vm_pool = VmPool(etcd_client, VM_PREFIX) -request_pool = RequestPool(etcd_client, REQUEST_PREFIX) - -running_vms = [] diff --git a/host/main.py b/host/main.py index 9d9f396..5b5e620 100755 --- a/host/main.py +++ b/host/main.py @@ -1,31 +1,29 @@ import argparse -import time -import os -import sys -import virtualmachine import multiprocessing as mp +import os +import time -from ucloud_common.host import HostEntry -from ucloud_common.request import RequestEntry, RequestType - -from config import (vm_pool, host_pool, request_pool, - etcd_client, logging, running_vms, - etcd_wrapper_args, etcd_wrapper_kwargs, - REQUEST_PREFIX, HOST_PREFIX, - WITHOUT_CEPH, VM_DIR, HostPool) from etcd3_wrapper import Etcd3Wrapper -import etcd3 + +from common.request import RequestEntry, RequestType +from config import (vm_pool, request_pool, + etcd_client, running_vms, + etcd_wrapper_args, etcd_wrapper_kwargs, + HostPool, env_vars) +from . import virtualmachine +from host import logger def update_heartbeat(host): client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs) - host_pool = HostPool(client, HOST_PREFIX) + host_pool = HostPool(client, env_vars.get('HOST_PREFIX')) this_host = next(filter(lambda h: h.hostname == host, host_pool.hosts), None) - + while True: this_host.update_heartbeat() host_pool.put(this_host) time.sleep(10) + def maintenance(host): # To capture vm running according to running_vms list @@ -65,31 +63,25 @@ def maintenance(host): running_vms.remove(_vm) -def main(): - argparser = argparse.ArgumentParser() - argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1") - args = argparser.parse_args() +def main(hostname): + assert env_vars.get('WITHOUT_CEPH') and os.path.isdir(env_vars.get('VM_DIR')), ( + "You have set env_vars.get('WITHOUT_CEPH') to True. So, the vm directory mentioned" + " in .env file must exists. But, it don't.") - assert WITHOUT_CEPH and os.path.isdir(VM_DIR), ( - "You have set WITHOUT_CEPH to True. So, the vm directory mentioned" - " in .env file must exists. But, it don't." ) - - mp.set_start_method('spawn') - heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(args.hostname,)) + heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,)) - host_pool = HostPool(etcd_client, HOST_PREFIX) - host = next(filter(lambda h: h.hostname == args.hostname, host_pool.hosts), None) + host_pool = HostPool(etcd_client, env_vars.get('HOST_PREFIX')) + host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) assert host is not None, "No such host" try: heartbeat_updating_process.start() except Exception as e: - logging.info("No Need To Go Further. Our heartbeat updating mechanism is not working") - logging.exception(e) + logger.info("No Need To Go Further. Our heartbeat updating mechanism is not working") + logger.exception(e) exit(-1) - - logging.info("%s Session Started %s", '*' * 5, '*' * 5) + logger.info("%s Session Started %s", '*' * 5, '*' * 5) # It is seen that under heavy load, timeout event doesn't come # in a predictive manner (which is intentional because we give @@ -99,22 +91,21 @@ def main(): # update the heart beat in a predictive manner we start Heart # beat updating mechanism in separated thread - for events_iterator in [ - etcd_client.get_prefix(REQUEST_PREFIX, value_in_json=True), - etcd_client.watch_prefix(REQUEST_PREFIX, timeout=10, value_in_json=True), + etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True), + etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=10, value_in_json=True), ]: for request_event in events_iterator: request_event = RequestEntry(request_event) if request_event.type == "TIMEOUT": - logging.info("Timeout Event") + logger.info("Timeout Event") maintenance(host) continue # If the event is directed toward me OR I am destination of a InitVMMigration - if (request_event.hostname == host.key or request_event.destination == host.key): - logging.debug("EVENT: %s", request_event) + if request_event.hostname == host.key or request_event.destination == host.key: + logger.debug("EVENT: %s", request_event) request_pool.client.client.delete(request_event.key) vm_entry = vm_pool.get(request_event.uuid) @@ -135,11 +126,14 @@ def main(): elif request_event.type == RequestType.TransferVM: virtualmachine.transfer(request_event) else: - logging.info("VM Entry missing") - - logging.info("Running VMs %s", running_vms) + logger.info("VM Entry missing") + + logger.info("Running VMs %s", running_vms) if __name__ == "__main__": - main() - + argparser = argparse.ArgumentParser() + argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1") + args = argparser.parse_args() + mp.set_start_method('spawn') + main(args.hostname) diff --git a/host/qmp/__init__.py b/host/qmp/__init__.py index 97669bc..ba15838 100755 --- a/host/qmp/__init__.py +++ b/host/qmp/__init__.py @@ -15,24 +15,23 @@ import errno import logging import os -import subprocess -import re import shutil import socket +import subprocess import tempfile from . import qmp - LOG = logging.getLogger(__name__) # Mapping host architecture to any additional architectures it can # support which often includes its 32 bit cousin. ADDITIONAL_ARCHES = { - "x86_64" : "i386", - "aarch64" : "armhf" + "x86_64": "i386", + "aarch64": "armhf" } + def kvm_available(target_arch=None): host_arch = os.uname()[4] if target_arch and target_arch != host_arch: @@ -56,10 +55,12 @@ class QEMUMachineAddDeviceError(QEMUMachineError): failures reported by the QEMU binary itself. """ + class MonitorResponseError(qmp.QMPError): """ Represents erroneous QMP monitor reply """ + def __init__(self, reply): try: desc = reply["error"]["desc"] @@ -108,7 +109,7 @@ class QEMUMachine(object): self._qemu_log_file = None self._popen = None self._binary = binary - self._args = list(args) # Force copy args in case we modify them + self._args = list(args) # Force copy args in case we modify them self._wrapper = wrapper self._events = [] self._iolog = None @@ -453,10 +454,11 @@ class QEMUMachine(object): See event_match for details. timeout: QEMUMonitorProtocol.pull_event timeout parameter. """ + def _match(event): for name, match in events: if (event['event'] == name and - self.event_match(event, match)): + self.event_match(event, match)): return True return False diff --git a/host/qmp/__pycache__/__init__.cpython-37.pyc b/host/qmp/__pycache__/__init__.cpython-37.pyc deleted file mode 100755 index 3f11efc..0000000 Binary files a/host/qmp/__pycache__/__init__.cpython-37.pyc and /dev/null differ diff --git a/host/qmp/__pycache__/qmp.cpython-37.pyc b/host/qmp/__pycache__/qmp.cpython-37.pyc deleted file mode 100755 index e9f7c94..0000000 Binary files a/host/qmp/__pycache__/qmp.cpython-37.pyc and /dev/null differ diff --git a/host/qmp/qmp.py b/host/qmp/qmp.py index 5c8cf6a..bf35d71 100755 --- a/host/qmp/qmp.py +++ b/host/qmp/qmp.py @@ -8,10 +8,10 @@ # This work is licensed under the terms of the GNU GPL, version 2. See # the COPYING file in the top-level directory. -import json import errno -import socket +import json import logging +import socket class QMPError(Exception): @@ -31,7 +31,6 @@ class QMPTimeoutError(QMPError): class QEMUMonitorProtocol(object): - #: Logger object for debugging messages logger = logging.getLogger('QMP') #: Socket's error class diff --git a/host/virtualmachine.py b/host/virtualmachine.py index a989f3f..80e9846 100755 --- a/host/virtualmachine.py +++ b/host/virtualmachine.py @@ -6,30 +6,24 @@ import errno import os +import random import subprocess as sp import tempfile import time -import random -import ipaddress - from functools import wraps from os.path import join -from typing import Union from string import Template +from typing import Union import bitmath import sshtunnel -import qmp - -from decouple import config -from ucloud_common.helpers import get_ipv4_address -from ucloud_common.request import RequestEntry, RequestType -from ucloud_common.vm import VMEntry, VMStatus - -from config import (WITHOUT_CEPH, VM_PREFIX, VM_DIR, IMAGE_DIR, - NETWORK_PREFIX, etcd_client, logging, - request_pool, running_vms, vm_pool) +from common.helpers import get_ipv4_address +from common.request import RequestEntry, RequestType +from common.vm import VMEntry, VMStatus +from config import etcd_client, request_pool, running_vms, vm_pool, env_vars +from . import qmp +from host import logger class VM: def __init__(self, key, handle, vnc_socket_file): @@ -43,7 +37,7 @@ class VM: def create_dev(script, _id, dev, ip=None): command = [script, _id, dev] - if ip: + if ip: command.append(ip) try: output = sp.check_output(command, stderr=sp.PIPE) @@ -57,13 +51,13 @@ def create_dev(script, _id, dev, ip=None): def create_vxlan_br_tap(_id, _dev, ip=None): network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'), - _id=_id, dev=_dev) + _id=_id, dev=_dev) if vxlan: bridge = create_dev(script=os.path.join(network_script_base, 'create-bridge.sh'), _id=_id, dev=vxlan, ip=ip) if bridge: tap = create_dev(script=os.path.join(network_script_base, 'create-tap.sh'), - _id=str(random.randint(1, 100000)), dev=bridge) + _id=str(random.randint(1, 100000)), dev=bridge) if tap: return tap @@ -77,35 +71,35 @@ def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt=' if oui: if type(oui) == str: oui = [int(chunk) for chunk in oui.split(separator)] - mac = oui + random_bytes(num=6-len(oui)) + mac = oui + random_bytes(num=6 - len(oui)) else: if multicast: - mac[0] |= 1 # set bit 0 + mac[0] |= 1 # set bit 0 else: - mac[0] &= ~1 # clear bit 0 + mac[0] &= ~1 # clear bit 0 if uaa: - mac[0] &= ~(1 << 1) # clear bit 1 + mac[0] &= ~(1 << 1) # clear bit 1 else: - mac[0] |= 1 << 1 # set bit 1 + mac[0] |= 1 << 1 # set bit 1 return separator.join(byte_fmt % b for b in mac) def update_radvd_conf(etcd_client): network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') - networks = { - net.value['ipv6']:net.value['id'] - for net in etcd_client.get_prefix('/v1/network/', value_in_json=True) - if net.value.get('ipv6') - } + networks = { + net.value['ipv6']: net.value['id'] + for net in etcd_client.get_prefix('/v1/network/', value_in_json=True) + if net.value.get('ipv6') + } radvd_template = open(os.path.join(network_script_base, 'radvd-template.conf'), 'r').read() radvd_template = Template(radvd_template) content = [radvd_template.safe_substitute(bridge='br{}'.format(networks[net]), - prefix=net) + prefix=net) for net in networks if networks.get(net)] - + with open('/etc/radvd.conf', 'w') as radvd_conf: radvd_conf.writelines(content) @@ -113,7 +107,7 @@ def update_radvd_conf(etcd_client): def get_start_command_args( - vm_entry, vnc_sock_filename: str, migration=False, migration_port=4444, + vm_entry, vnc_sock_filename: str, migration=False, migration_port=4444, ): threads_per_core = 1 vm_memory = int(bitmath.parse_string(vm_entry.specs["ram"]).to_MB()) @@ -121,9 +115,9 @@ def get_start_command_args( vm_uuid = vm_entry.uuid vm_networks = vm_entry.network - if WITHOUT_CEPH: + if env_vars.get('WITHOUT_CEPH'): command = "-drive file={},format=raw,if=virtio,cache=none".format( - os.path.join(VM_DIR, vm_uuid) + os.path.join(env_vars.get('VM_DIR'), vm_uuid) ) else: command = "-drive file=rbd:uservms/{},format=raw,if=virtio,cache=none".format( @@ -138,24 +132,24 @@ def get_start_command_args( if migration: command += " -incoming tcp:0:{}".format(migration_port) - + tap = None for network_and_mac in vm_networks: network_name, mac = network_and_mac - - _key = os.path.join(NETWORK_PREFIX, vm_entry.owner, network_name) + + _key = os.path.join(env_vars.get('NETWORK_PREFIX'), vm_entry.owner, network_name) network = etcd_client.get(_key, value_in_json=True) network_type = network.value["type"] network_id = str(network.value["id"]) network_ipv6 = network.value["ipv6"] if network_type == "vxlan": - tap = create_vxlan_br_tap(network_id, config("VXLAN_PHY_DEV"), network_ipv6) + tap = create_vxlan_br_tap(network_id, env_vars.get("VXLAN_PHY_DEV"), network_ipv6) update_radvd_conf(etcd_client) - - command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no"\ - " -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}"\ - .format(tap=tap, net_id=network_id, mac=mac) + + command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" \ + " -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}" \ + .format(tap=tap, net_id=network_id, mac=mac) return command.split(" ") @@ -189,15 +183,15 @@ def need_running_vm(func): if vm: try: status = vm.handle.command("query-status") - logging.debug("VM Status Check - %s", status) + logger.debug("VM Status Check - %s", status) except Exception as exception: - logging.info("%s failed - VM %s %s", func.__name__, e, exception) + logger.info("%s failed - VM %s %s", func.__name__, e, exception) else: return func(e) return None else: - logging.info("%s failed because VM %s is not running", func.__name__, e.key) + logger.info("%s failed because VM %s is not running", func.__name__, e.key) return None return wrapper @@ -206,18 +200,18 @@ def need_running_vm(func): def create(vm_entry: VMEntry): vm_hdd = int(bitmath.parse_string(vm_entry.specs["os-ssd"]).to_MB()) - if WITHOUT_CEPH: + if env_vars.get('WITHOUT_CEPH'): _command_to_create = [ "cp", - os.path.join(IMAGE_DIR, vm_entry.image_uuid), - os.path.join(VM_DIR, vm_entry.uuid), + os.path.join(env_vars.get('IMAGE_DIR'), vm_entry.image_uuid), + os.path.join(env_vars.get('VM_DIR'), vm_entry.uuid), ] _command_to_extend = [ "qemu-img", "resize", "-f", "raw", - os.path.join(VM_DIR, vm_entry.uuid), + os.path.join(env_vars.get('VM_DIR'), vm_entry.uuid), "{}M".format(vm_hdd), ] else: @@ -240,22 +234,22 @@ def create(vm_entry: VMEntry): sp.check_output(_command_to_create) except sp.CalledProcessError as e: if e.returncode == errno.EEXIST: - logging.debug("Image for vm %s exists", vm_entry.uuid) + logger.debug("Image for vm %s exists", vm_entry.uuid) # File Already exists. No Problem Continue return # This exception catches all other exceptions # i.e FileNotFound (BaseImage), pool Does Not Exists etc. - logging.exception(e) + logger.exception(e) vm_entry.status = "ERROR" else: try: sp.check_output(_command_to_extend) except Exception as e: - logging.exception(e) + logger.exception(e) else: - logging.info("New VM Created") + logger.info("New VM Created") def start(vm_entry: VMEntry): @@ -263,7 +257,7 @@ def start(vm_entry: VMEntry): # VM already running. No need to proceed further. if _vm: - logging.info("VM %s already running", vm_entry.uuid) + logger.info("VM %s already running", vm_entry.uuid) return else: create(vm_entry) @@ -282,19 +276,19 @@ def stop(vm_entry): def delete(vm_entry): - logging.info("Deleting VM | %s", vm_entry) + logger.info("Deleting VM | %s", vm_entry) stop(vm_entry) - path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1 :] + path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1:] - if WITHOUT_CEPH: - vm_deletion_command = ["rm", os.path.join(VM_DIR, vm_entry.uuid)] + if env_vars.get('WITHOUT_CEPH'): + vm_deletion_command = ["rm", os.path.join(env_vars.get('VM_DIR'), vm_entry.uuid)] else: vm_deletion_command = ["rbd", "rm", path_without_protocol] try: sp.check_output(vm_deletion_command) except Exception as e: - logging.exception(e) + logger.exception(e) else: etcd_client.client.delete(vm_entry.key) @@ -307,20 +301,20 @@ def transfer(request_event): _host, _port = request_event.parameters["host"], request_event.parameters["port"] _uuid = request_event.uuid _destination = request_event.destination_host_key - vm = get_vm(running_vms, join(VM_PREFIX, _uuid)) + vm = get_vm(running_vms, join(env_vars.get('VM_PREFIX'), _uuid)) if vm: tunnel = sshtunnel.SSHTunnelForwarder( (_host, 22), - ssh_username=config("ssh_username"), - ssh_pkey=config("ssh_pkey"), - ssh_private_key_password=config("ssh_private_key_password"), + ssh_username=env_vars.get("ssh_username"), + ssh_pkey=env_vars.get("ssh_pkey"), + ssh_private_key_password=env_vars.get("ssh_private_key_password"), remote_bind_address=("127.0.0.1", _port), ) try: tunnel.start() except sshtunnel.BaseSSHTunnelForwarderError: - logging.exception("Couldn't establish connection to (%s, 22)", _host) + logger.exception("Couldn't establish connection to (%s, 22)", _host) else: vm.handle.command( "migrate", uri="tcp:{}:{}".format(_host, tunnel.local_bind_port) @@ -356,7 +350,7 @@ def init_migration(vm_entry, destination_host_key): if _vm: # VM already running. No need to proceed further. - logging.info("%s Already running", _vm.key) + logger.info("%s Already running", _vm.key) return launch_vm(vm_entry, migration=True, migration_port=4444, @@ -364,13 +358,13 @@ def init_migration(vm_entry, destination_host_key): def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_key=None): - logging.info("Starting %s", vm_entry.key) + logger.info("Starting %s", vm_entry.key) vm = create_vm_object(vm_entry, migration=migration, migration_port=migration_port) try: vm.handle.launch() except Exception as e: - logging.exception(e) + logger.exception(e) if migration: # We don't care whether MachineError or any other error occurred @@ -392,6 +386,7 @@ def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_k parameters={"host": get_ipv4_address(), "port": 4444}, uuid=vm_entry.uuid, destination_host_key=destination_host_key, + request_prefix=env_vars.get("REQUEST_PREFIX") ) request_pool.put(r) else: @@ -400,4 +395,3 @@ def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_k vm_entry.add_log("Started successfully") vm_pool.put(vm_entry) - \ No newline at end of file diff --git a/imagescanner/__init__.py b/imagescanner/__init__.py new file mode 100644 index 0000000..eea436a --- /dev/null +++ b/imagescanner/__init__.py @@ -0,0 +1,3 @@ +import logging + +logger = logging.getLogger(__name__) diff --git a/imagescanner/config.py b/imagescanner/config.py deleted file mode 100755 index 3ccc06b..0000000 --- a/imagescanner/config.py +++ /dev/null @@ -1,22 +0,0 @@ -import logging - -from etcd3_wrapper import Etcd3Wrapper -from decouple import config - -BASE_PATH = config("BASE_DIR", "/var/www") -WITHOUT_CEPH = config("WITHOUT_CEPH", False, cast=bool) -ETCD_URL = config("ETCD_URL") -IMAGE_PREFIX = config("IMAGE_PREFIX") -IMAGE_STORE_PREFIX = config("IMAGE_STORE_PREFIX") -IMAGE_DIR = config("IMAGE_DIR") - -logging.basicConfig( - level=logging.DEBUG, - filename="log.txt", - filemode="a", - format="%(asctime)s: %(levelname)s - %(message)s", - datefmt="%d-%b-%y %H:%M:%S", -) - - -client = Etcd3Wrapper(host=ETCD_URL) diff --git a/imagescanner/main.py b/imagescanner/main.py index 146e756..97da589 100755 --- a/imagescanner/main.py +++ b/imagescanner/main.py @@ -1,11 +1,10 @@ -import os import json +import os import subprocess import sys -from config import (logging, client, IMAGE_DIR, - BASE_PATH, WITHOUT_CEPH, - IMAGE_PREFIX, IMAGE_STORE_PREFIX) +from config import etcd_client, env_vars +from imagescanner import logger def qemu_img_type(path): @@ -13,7 +12,7 @@ def qemu_img_type(path): try: qemu_img_info = subprocess.check_output(qemu_img_info_command) except Exception as e: - logging.exception(e) + logger.exception(e) return None else: qemu_img_info = json.loads(qemu_img_info.decode("utf-8")) @@ -21,12 +20,12 @@ def qemu_img_type(path): def main(): - # If you are using WITHOUT_CEPH FLAG in .env - # then please make sure that IMAGE_DIR directory + # If you are using env_vars.get('WITHOUT_CEPH') FLAG in .env + # then please make sure that env_vars.get('IMAGE_DIR') directory # exists otherwise this script would fail - if WITHOUT_CEPH and not os.path.isdir(IMAGE_DIR): - print("You have set WITHOUT_CEPH to True. So," - "the {} must exists. But, it don't".format(IMAGE_DIR)) + if env_vars.get('WITHOUT_CEPH') and not os.path.isdir(env_vars.get('IMAGE_DIR')): + print("You have set env_vars.get('WITHOUT_CEPH') to True. So," + "the {} must exists. But, it don't".format(env_vars.get('IMAGE_DIR'))) sys.exit(1) try: @@ -36,7 +35,7 @@ def main(): sys.exit(1) # We want to get images entries that requests images to be created - images = client.get_prefix(IMAGE_PREFIX, value_in_json=True) + images = etcd_client.get_prefix(env_vars.get('IMAGE_PREFIX'), value_in_json=True) images_to_be_created = list(filter(lambda im: im.value['status'] == 'TO_BE_CREATED', images)) for image in images_to_be_created: @@ -45,9 +44,9 @@ def main(): image_owner = image.value['owner'] image_filename = image.value['filename'] image_store_name = image.value['store_name'] - image_full_path = os.path.join(BASE_PATH, image_owner, image_filename) + image_full_path = os.path.join(env_vars.get('BASE_DIR'), image_owner, image_filename) - image_stores = client.get_prefix(IMAGE_STORE_PREFIX, value_in_json=True) + image_stores = etcd_client.get_prefix(env_vars.get('IMAGE_STORE_PREFIX'), value_in_json=True) user_image_store = next(filter( lambda s, store_name=image_store_name: s.value["name"] == store_name, image_stores @@ -56,27 +55,25 @@ def main(): image_store_pool = user_image_store.value['attributes']['pool'] except Exception as e: - logging.exception(e) + logger.exception(e) else: # At least our basic data is available qemu_img_convert_command = ["qemu-img", "convert", "-f", "qcow2", "-O", "raw", image_full_path, "image.raw"] - - if WITHOUT_CEPH: - image_import_command = ["mv", "image.raw", os.path.join(IMAGE_DIR, image_uuid)] + if env_vars.get('WITHOUT_CEPH'): + image_import_command = ["mv", "image.raw", os.path.join(env_vars.get('IMAGE_DIR'), image_uuid)] snapshot_creation_command = ["true"] snapshot_protect_command = ["true"] else: image_import_command = ["rbd", "import", "image.raw", "{}/{}".format(image_store_pool, image_uuid)] snapshot_creation_command = ["rbd", "snap", "create", - "{}/{}@protected".format(image_store_pool, image_uuid)] + "{}/{}@protected".format(image_store_pool, image_uuid)] snapshot_protect_command = ["rbd", "snap", "protect", "{}/{}@protected".format(image_store_pool, image_uuid)] - # First check whether the image is qcow2 if qemu_img_type(image_full_path) == "qcow2": @@ -90,19 +87,18 @@ def main(): # Create and Protect Snapshot subprocess.check_output(snapshot_creation_command) subprocess.check_output(snapshot_protect_command) - + except Exception as e: - logging.exception(e) - + logger.exception(e) + else: # Everything is successfully done image.value["status"] = "CREATED" - client.put(image.key, json.dumps(image.value)) + etcd_client.put(image.key, json.dumps(image.value)) else: # The user provided image is either not found or of invalid format image.value["status"] = "INVALID_IMAGE" - client.put(image.key, json.dumps(image.value)) - + etcd_client.put(image.key, json.dumps(image.value)) try: os.remove("image.raw") @@ -111,4 +107,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/metadata/__init__.py b/metadata/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/metadata/config.py b/metadata/config.py deleted file mode 100644 index 05cc113..0000000 --- a/metadata/config.py +++ /dev/null @@ -1,22 +0,0 @@ -import logging - -from etcd3_wrapper import Etcd3Wrapper -from decouple import config - -from ucloud_common.vm import VmPool - -logging.basicConfig( - level=logging.DEBUG, - filename="log.txt", - filemode="a", - format="%(asctime)s: %(levelname)s - %(message)s", - datefmt="%d-%b-%y %H:%M:%S", -) - - -VM_PREFIX = config("VM_PREFIX") -USER_PREFIX = config("USER_PREFIX") - -etcd_client = Etcd3Wrapper(host=config("ETCD_URL")) - -VM_POOL = VmPool(etcd_client, VM_PREFIX) diff --git a/metadata/main.py b/metadata/main.py index 22a4e62..7176d41 100644 --- a/metadata/main.py +++ b/metadata/main.py @@ -2,14 +2,15 @@ import os from flask import Flask, request from flask_restful import Resource, Api -from config import etcd_client, VM_POOL, USER_PREFIX + +from config import etcd_client, env_vars, vm_pool app = Flask(__name__) api = Api(app) def get_vm_entry(mac_addr): - return next(filter(lambda vm: mac_addr in list(zip(*vm.network))[1], VM_POOL.vms), None) + return next(filter(lambda vm: mac_addr in list(zip(*vm.network))[1], vm_pool.vms), None) # https://stackoverflow.com/questions/37140846/how-to-convert-ipv6-link-local-address-to-mac-address-in-python @@ -43,8 +44,8 @@ class Root(Resource): return {'message': 'Metadata for such VM does not exists.'}, 404 else: - # {user_prefix}/{realm}/{name}/key - etcd_key = os.path.join(USER_PREFIX, data.value['owner_realm'], + # {env_vars.get('USER_PREFIX')}/{realm}/{name}/key + etcd_key = os.path.join(env_vars.get('USER_PREFIX'), data.value['owner_realm'], data.value['owner'], 'key') etcd_entry = etcd_client.get_prefix(etcd_key, value_in_json=True) user_personal_ssh_keys = [key.value for key in etcd_entry] @@ -81,5 +82,10 @@ class Root(Resource): api.add_resource(Root, '/') -if __name__ == '__main__': + +def main(): app.run(debug=True, host="::", port="80") + + +if __name__ == '__main__': + main() diff --git a/network/__init__.py b/network/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scheduler/__init__.py b/scheduler/__init__.py new file mode 100644 index 0000000..95e1be0 --- /dev/null +++ b/scheduler/__init__.py @@ -0,0 +1,3 @@ +import logging + +logger = logging.getLogger(__name__) \ No newline at end of file diff --git a/scheduler/config.py b/scheduler/config.py deleted file mode 100755 index 81e8503..0000000 --- a/scheduler/config.py +++ /dev/null @@ -1,25 +0,0 @@ -import logging - -from decouple import config -from etcd3_wrapper import Etcd3Wrapper -from ucloud_common.vm import VmPool -from ucloud_common.host import HostPool -from ucloud_common.request import RequestPool - -logging.basicConfig( - level=logging.DEBUG, - filename="log.txt", - filemode="a", - format="%(asctime)s: %(levelname)s - %(message)s", - datefmt="%d-%b-%y %H:%M:%S", -) - -vm_prefix = config("VM_PREFIX") -host_prefix = config("HOST_PREFIX") -request_prefix = config("REQUEST_PREFIX") - -etcd_client = Etcd3Wrapper(host=config("ETCD_URL")) - -vm_pool = VmPool(etcd_client, vm_prefix) -host_pool = HostPool(etcd_client, host_prefix) -request_pool = RequestPool(etcd_client, request_prefix) \ No newline at end of file diff --git a/scheduler/helper.py b/scheduler/helper.py index 577bc91..81b5869 100755 --- a/scheduler/helper.py +++ b/scheduler/helper.py @@ -1,21 +1,15 @@ -import bitmath - from collections import Counter from functools import reduce -from ucloud_common.vm import VmPool, VMStatus -from ucloud_common.host import HostPool, HostStatus -from ucloud_common.request import RequestEntry, RequestPool, RequestType +import bitmath -from decouple import config -from config import etcd_client as client - -vm_pool = VmPool(client, config("VM_PREFIX")) -host_pool = HostPool(client, config("HOST_PREFIX")) -request_pool = RequestPool(client, config("REQUEST_PREFIX")) +from common.host import HostStatus +from common.request import RequestEntry, RequestType +from common.vm import VMStatus +from config import vm_pool, host_pool, request_pool, env_vars -def accumulated_specs(vms_specs): +def accumulated_specs(vms_specs): if not vms_specs: return {} return reduce((lambda x, y: Counter(x) + Counter(y)), vms_specs) @@ -23,7 +17,7 @@ def accumulated_specs(vms_specs): def remaining_resources(host_specs, vms_specs): # Return remaining resources host_specs - vms - + _vms_specs = Counter(vms_specs) _remaining = Counter(host_specs) @@ -69,7 +63,7 @@ def get_suitable_host(vm_specs, hosts=None): # Find out remaining resources after # host_specs - already running vm_specs remaining = remaining_resources(host.specs, running_vms_accumulated_specs) - + # Find out remaining - new_vm_specs remaining = remaining_resources(remaining, vm_specs) @@ -111,7 +105,8 @@ def assign_host(vm): r = RequestEntry.from_scratch(type=RequestType.StartVM, uuid=vm.uuid, - hostname=vm.hostname) + hostname=vm.hostname, + request_prefix=env_vars.get("REQUEST_PREFIX")) request_pool.put(r) vm.log.append("VM scheduled for starting") diff --git a/scheduler/main.py b/scheduler/main.py index 3fb0d1b..507ac44 100755 --- a/scheduler/main.py +++ b/scheduler/main.py @@ -4,28 +4,26 @@ # 2. Introduce a status endpoint of the scheduler - # maybe expose a prometheus compatible output -import logging - -from ucloud_common.request import RequestEntry, RequestType - -from config import etcd_client as client -from config import (host_pool, request_pool, vm_pool, request_prefix) -from helper import (get_suitable_host, dead_host_mitigation, dead_host_detection, - assign_host, NoSuitableHostFound) +from common.request import RequestEntry, RequestType +from config import etcd_client +from config import host_pool, request_pool, vm_pool, env_vars +from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection, + assign_host, NoSuitableHostFound) +from scheduler import logger def main(): - logging.info("%s SESSION STARTED %s", '*' * 5, '*' * 5) + logger.info("%s SESSION STARTED %s", '*' * 5, '*' * 5) pending_vms = [] for request_iterator in [ - client.get_prefix(request_prefix, value_in_json=True), - client.watch_prefix(request_prefix, timeout=5, value_in_json=True), + etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True), + etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=5, value_in_json=True), ]: for request_event in request_iterator: request_entry = RequestEntry(request_event) - logging.debug("%s, %s", request_entry.key, request_entry.value) + logger.debug("%s, %s", request_entry.key, request_entry.value) # Never Run time critical mechanism inside timeout # mechanism because timeout mechanism only comes @@ -35,9 +33,9 @@ def main(): # Detect hosts that are dead and set their status # to "DEAD", and their VMs' status to "KILLED" - logging.debug("TIMEOUT event occured") + logger.debug("TIMEOUT event occured") dead_hosts = dead_host_detection() - logging.debug("Dead hosts: %s", dead_hosts) + logger.debug("Dead hosts: %s", dead_hosts) dead_host_mitigation(dead_hosts) # If there are VMs that weren't assigned a host @@ -49,15 +47,16 @@ def main(): pending_vm_entry = pending_vms.pop() r = RequestEntry.from_scratch(type="ScheduleVM", uuid=pending_vm_entry.uuid, - hostname=pending_vm_entry.hostname) + hostname=pending_vm_entry.hostname, + request_prefix=env_vars.get("REQUEST_PREFIX")) request_pool.put(r) elif request_entry.type == RequestType.ScheduleVM: vm_entry = vm_pool.get(request_entry.uuid) if vm_entry is None: - logging.info("Trying to act on {} but it is deleted".format(request_entry.uuid)) + logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid)) continue - client.client.delete(request_entry.key) # consume Request + etcd_client.client.delete(request_entry.key) # consume Request # If the Request is about a VM which is labelled as "migration" # and has a destination @@ -67,12 +66,13 @@ def main(): get_suitable_host(vm_specs=vm_entry.specs, hosts=[host_pool.get(request_entry.destination)]) except NoSuitableHostFound: - logging.info("Requested destination host doesn't have enough capacity" + logger.info("Requested destination host doesn't have enough capacity" "to hold %s", vm_entry.uuid) else: r = RequestEntry.from_scratch(type=RequestType.InitVMMigration, uuid=request_entry.uuid, - destination=request_entry.destination) + destination=request_entry.destination, + request_prefix=env_vars.get("REQUEST_PREFIX")) request_pool.put(r) # If the Request is about a VM that just want to get started/created @@ -86,7 +86,7 @@ def main(): vm_pool.put(vm_entry) pending_vms.append(vm_entry) - logging.info("No Resource Left. Emailing admin....") + logger.info("No Resource Left. Emailing admin....") if __name__ == "__main__": diff --git a/scheduler/tests/__init__.py b/scheduler/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scheduler/tests/test_basics.py b/scheduler/tests/test_basics.py index 227e84b..ef82fc0 100755 --- a/scheduler/tests/test_basics.py +++ b/scheduler/tests/test_basics.py @@ -1,13 +1,10 @@ -import unittest -import sys import json import multiprocessing -import time - +import sys +import unittest from datetime import datetime from os.path import dirname - BASE_DIR = dirname(dirname(__file__)) sys.path.insert(0, BASE_DIR) @@ -15,13 +12,12 @@ from main import ( accumulated_specs, remaining_resources, VmPool, - dead_host_detection, - dead_host_mitigation, main, ) from config import etcd_client + class TestFunctions(unittest.TestCase): @classmethod def setUpClass(cls): diff --git a/scheduler/tests/test_dead_host_mechanism.py b/scheduler/tests/test_dead_host_mechanism.py index 33bed23..0b403ef 100755 --- a/scheduler/tests/test_dead_host_mechanism.py +++ b/scheduler/tests/test_dead_host_mechanism.py @@ -1,24 +1,18 @@ -import unittest import sys -import json -import multiprocessing -import time - +import unittest from datetime import datetime from os.path import dirname + BASE_DIR = dirname(dirname(__file__)) sys.path.insert(0, BASE_DIR) from main import ( - accumulated_specs, - remaining_resources, - VmPool, dead_host_detection, dead_host_mitigation, - main, config ) + class TestDeadHostMechanism(unittest.TestCase): def setUp(self): self.client = config.etcd_client diff --git a/ucloud.py b/ucloud.py index 5886502..8774fa3 100644 --- a/ucloud.py +++ b/ucloud.py @@ -1,16 +1,50 @@ import argparse -import subprocess as sp -arg_parser = argparse.ArgumentParser(prog='ucloud', - description='Open Source Cloud Management Software') -arg_parser.add_argument('component', - choices=['api', 'scheduler', 'host', - 'filescanner','imagescanner', - 'metadata']) -arg_parser.add_argument('component_args', nargs='*') -args = arg_parser.parse_args() +import multiprocessing as mp +import logging -try: - command = ['pipenv', 'run', 'python', 'main.py', *args.component_args] - sp.run(command, cwd=args.component) -except Exception as error: - print(error) +from os.path import join as join_path + +if __name__ == "__main__": + arg_parser = argparse.ArgumentParser(prog='ucloud', + description='Open Source Cloud Management Software') + arg_parser.add_argument('component', + choices=['api', 'scheduler', 'host', + 'filescanner', 'imagescanner', + 'metadata']) + arg_parser.add_argument('component_args', nargs='*') + args = arg_parser.parse_args() + + logging.basicConfig( + level=logging.DEBUG, + filename=join_path("logs.txt"), + filemode="a", + format="%(name)s %(asctime)s: %(levelname)s - %(message)s", + datefmt="%d-%b-%y %H:%M:%S", + ) + + if args.component == 'api': + from api.main import main + + main() + elif args.component == 'host': + from host.main import main + + hostname = args.component_args + mp.set_start_method('spawn') + main(*hostname) + elif args.component == 'scheduler': + from scheduler.main import main + + main() + elif args.component == 'filescanner': + from filescanner.main import main + + main() + elif args.component == 'imagescanner': + from imagescanner.main import main + + main() + elif args.component == 'metadata': + from metadata.main import main + + main()