From 9bdf4d2180a0748ec4ffd5cf83b104e99152ac08 Mon Sep 17 00:00:00 2001 From: meow Date: Mon, 30 Dec 2019 14:35:07 +0500 Subject: [PATCH] Shutdown Source VM (PAUSED) on successfull migration + blackened all .py files --- setup.py | 71 +++-- ucloud/api/common_fields.py | 16 +- ucloud/api/create_image_store.py | 5 +- ucloud/api/helper.py | 44 ++- ucloud/api/main.py | 178 ++++++++---- ucloud/api/schemas.py | 275 ++++++++++++------ ucloud/common/etcd_wrapper.py | 46 ++- ucloud/common/host.py | 4 +- ucloud/common/logging.py | 12 +- ucloud/common/network.py | 40 ++- ucloud/common/request.py | 8 +- ucloud/common/schemas.py | 10 +- ucloud/common/storage_handlers.py | 53 ++-- ucloud/common/vm.py | 11 +- ucloud/configure/main.py | 66 +++-- ucloud/docs/source/conf.py | 14 +- ucloud/filescanner/main.py | 15 +- ucloud/host/main.py | 82 ++++-- ucloud/host/qmp/__init__.py | 173 ++++++----- ucloud/host/qmp/qmp.py | 22 +- ucloud/host/virtualmachine.py | 220 +++++++++----- ucloud/imagescanner/main.py | 83 ++++-- ucloud/metadata/main.py | 48 +-- ucloud/scheduler/__init__.py | 2 +- ucloud/scheduler/helper.py | 50 +++- ucloud/scheduler/main.py | 49 +++- ucloud/scheduler/tests/test_basics.py | 41 ++- .../tests/test_dead_host_mechanism.py | 30 +- ucloud/settings/__init__.py | 82 ++++-- ucloud/shared/__init__.py | 10 +- ucloud/vmm/__init__.py | 185 ++++++++---- 31 files changed, 1307 insertions(+), 638 deletions(-) diff --git a/setup.py b/setup.py index 51d21b8..956656b 100644 --- a/setup.py +++ b/setup.py @@ -7,41 +7,48 @@ with open("README.md", "r") as fh: try: import ucloud.version + version = ucloud.version.VERSION except: import subprocess - c = subprocess.check_output(['git', 'describe']) + + c = subprocess.check_output(["git", "describe"]) version = c.decode("utf-8").strip() -setup(name='ucloud', - version=version, - description='All ucloud server components.', - url='https://code.ungleich.ch/ucloud/ucloud', - long_description=long_description, - long_description_content_type='text/markdown', - classifiers=[ - 'Development Status :: 3 - Alpha', - 'License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)', - 'Programming Language :: Python :: 3' - ], - author='ungleich', - author_email='technik@ungleich.ch', - packages=find_packages(), - install_requires=[ - 'requests', - 'Flask>=1.1.1', - 'flask-restful', - 'bitmath', - 'pyotp', - 'sshtunnel', - 'sphinx', - 'pynetbox', - 'colorama', - 'sphinx-rtd-theme', - 'etcd3 @ https://github.com/kragniz/python-etcd3/tarball/master#egg=etcd3', - 'werkzeug', 'marshmallow' - ], - scripts=['scripts/ucloud'], - data_files=[(os.path.expanduser('~/ucloud/'), ['conf/ucloud.conf'])], - zip_safe=False) +setup( + name="ucloud", + version=version, + description="All ucloud server components.", + url="https://code.ungleich.ch/ucloud/ucloud", + long_description=long_description, + long_description_content_type="text/markdown", + classifiers=[ + "Development Status :: 3 - Alpha", + "License :: OSI Approved :: GNU General Public License v3 or later (GPLv3+)", + "Programming Language :: Python :: 3", + ], + author="ungleich", + author_email="technik@ungleich.ch", + packages=find_packages(), + install_requires=[ + "requests", + "Flask>=1.1.1", + "flask-restful", + "bitmath", + "pyotp", + "sshtunnel", + "sphinx", + "pynetbox", + "colorama", + "sphinx-rtd-theme", + "etcd3 @ https://github.com/kragniz/python-etcd3/tarball/master#egg=etcd3", + "werkzeug", + "marshmallow", + ], + scripts=["scripts/ucloud"], + data_files=[ + (os.path.expanduser("~/ucloud/"), ["conf/ucloud.conf"]) + ], + zip_safe=False, +) diff --git a/ucloud/api/common_fields.py b/ucloud/api/common_fields.py index a793d26..93f9e06 100755 --- a/ucloud/api/common_fields.py +++ b/ucloud/api/common_fields.py @@ -20,12 +20,16 @@ class Field: def is_valid(self): if self.value == KeyError: - self.add_error("'{}' field is a required field".format(self.name)) + self.add_error( + "'{}' field is a required field".format(self.name) + ) else: if isinstance(self.value, Optional): pass elif not isinstance(self.value, self.type): - self.add_error("Incorrect Type for '{}' field".format(self.name)) + self.add_error( + "Incorrect Type for '{}' field".format(self.name) + ) else: self.validation() @@ -49,6 +53,10 @@ class VmUUIDField(Field): self.validation = self.vm_uuid_validation def vm_uuid_validation(self): - r = shared.etcd_client.get(os.path.join(settings['etcd']['vm_prefix'], self.uuid)) + r = shared.etcd_client.get( + os.path.join(settings["etcd"]["vm_prefix"], self.uuid) + ) if not r: - self.add_error("VM with uuid {} does not exists".format(self.uuid)) + self.add_error( + "VM with uuid {} does not exists".format(self.uuid) + ) diff --git a/ucloud/api/create_image_store.py b/ucloud/api/create_image_store.py index 978a182..a433ce3 100755 --- a/ucloud/api/create_image_store.py +++ b/ucloud/api/create_image_store.py @@ -14,4 +14,7 @@ data = { "attributes": {"list": [], "key": [], "pool": "images"}, } -shared.etcd_client.put(os.path.join(settings['etcd']['image_store_prefix'], uuid4().hex), json.dumps(data)) +shared.etcd_client.put( + os.path.join(settings["etcd"]["image_store_prefix"], uuid4().hex), + json.dumps(data), +) diff --git a/ucloud/api/helper.py b/ucloud/api/helper.py index e275e46..a77a151 100755 --- a/ucloud/api/helper.py +++ b/ucloud/api/helper.py @@ -16,21 +16,23 @@ logger = logging.getLogger(__name__) def check_otp(name, realm, token): try: data = { - "auth_name": settings['otp']['auth_name'], - "auth_token": TOTP(settings['otp']['auth_seed']).now(), - "auth_realm": settings['otp']['auth_realm'], + "auth_name": settings["otp"]["auth_name"], + "auth_token": TOTP(settings["otp"]["auth_seed"]).now(), + "auth_realm": settings["otp"]["auth_realm"], "name": name, "realm": realm, "token": token, } except binascii.Error as err: logger.error( - "Cannot compute OTP for seed: {}".format(settings['otp']['auth_seed']) + "Cannot compute OTP for seed: {}".format( + settings["otp"]["auth_seed"] + ) ) return 400 response = requests.post( - settings['otp']['verification_controller_url'], json=data + settings["otp"]["verification_controller_url"], json=data ) return response.status_code @@ -43,7 +45,8 @@ def resolve_vm_name(name, owner): """ result = next( filter( - lambda vm: vm.value["owner"] == owner and vm.value["name"] == name, + lambda vm: vm.value["owner"] == owner + and vm.value["name"] == name, shared.vm_pool.vms, ), None, @@ -80,18 +83,27 @@ def resolve_image_name(name, etcd_client): """ store_name, image_name = store_name_and_image_name except Exception: - raise ValueError("Image name not in correct format i.e {store_name}:{image_name}") + raise ValueError( + "Image name not in correct format i.e {store_name}:{image_name}" + ) - images = etcd_client.get_prefix(settings['etcd']['image_prefix'], value_in_json=True) + images = etcd_client.get_prefix( + settings["etcd"]["image_prefix"], value_in_json=True + ) # Try to find image with name == image_name and store_name == store_name try: - image = next(filter(lambda im: im.value['name'] == image_name - and im.value['store_name'] == store_name, images)) + image = next( + filter( + lambda im: im.value["name"] == image_name + and im.value["store_name"] == store_name, + images, + ) + ) except StopIteration: raise KeyError("No image with name {} found.".format(name)) else: - image_uuid = image.key.split('/')[-1] + image_uuid = image.key.split("/")[-1] return image_uuid @@ -100,7 +112,9 @@ def random_bytes(num=6): return [random.randrange(256) for _ in range(num)] -def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='%02x'): +def generate_mac( + uaa=False, multicast=False, oui=None, separator=":", byte_fmt="%02x" +): mac = random_bytes() if oui: if type(oui) == str: @@ -131,7 +145,9 @@ def get_ip_addr(mac_address, device): and is connected/neighbor of arg:device """ try: - output = sp.check_output(['ip', '-6', 'neigh', 'show', 'dev', device], stderr=sp.PIPE) + output = sp.check_output( + ["ip", "-6", "neigh", "show", "dev", device], stderr=sp.PIPE + ) except sp.CalledProcessError: return None else: @@ -160,7 +176,7 @@ def mac2ipv6(mac, prefix): # format output ipv6_parts = [str(0)] * 4 for i in range(0, len(parts), 2): - ipv6_parts.append("".join(parts[i:i + 2])) + ipv6_parts.append("".join(parts[i : i + 2])) lower_part = ipaddress.IPv6Address(":".join(ipv6_parts)) prefix = ipaddress.IPv6Address(prefix) diff --git a/ucloud/api/main.py b/ucloud/api/main.py index 91cbead..c63babf 100644 --- a/ucloud/api/main.py +++ b/ucloud/api/main.py @@ -43,7 +43,7 @@ def handle_exception(e): return e # now you're handling non-HTTP exceptions only - return {'message': 'Server Error'}, 500 + return {"message": "Server Error"}, 500 class CreateVM(Resource): @@ -55,7 +55,7 @@ class CreateVM(Resource): validator = schemas.CreateVMSchema(data) if validator.is_valid(): vm_uuid = uuid4().hex - vm_key = join_path(settings['etcd']['vm_prefix'], vm_uuid) + vm_key = join_path(settings["etcd"]["vm_prefix"], vm_uuid) specs = { "cpu": validator.specs["cpu"], "ram": validator.specs["ram"], @@ -63,8 +63,12 @@ class CreateVM(Resource): "hdd": validator.specs["hdd"], } macs = [generate_mac() for _ in range(len(data["network"]))] - tap_ids = [counters.increment_etcd_counter(shared.etcd_client, "/v1/counter/tap") - for _ in range(len(data["network"]))] + tap_ids = [ + counters.increment_etcd_counter( + shared.etcd_client, "/v1/counter/tap" + ) + for _ in range(len(data["network"])) + ] vm_entry = { "name": data["vm_name"], "owner": data["name"], @@ -77,14 +81,15 @@ class CreateVM(Resource): "vnc_socket": "", "network": list(zip(data["network"], macs, tap_ids)), "metadata": {"ssh-keys": []}, - "in_migration": False + "in_migration": False, } shared.etcd_client.put(vm_key, vm_entry, value_in_json=True) # Create ScheduleVM Request r = RequestEntry.from_scratch( - type=RequestType.ScheduleVM, uuid=vm_uuid, - request_prefix=settings['etcd']['request_prefix'] + type=RequestType.ScheduleVM, + uuid=vm_uuid, + request_prefix=settings["etcd"]["request_prefix"], ) shared.request_pool.put(r) @@ -99,7 +104,7 @@ class VmStatus(Resource): validator = schemas.VMStatusSchema(data) if validator.is_valid(): vm = shared.vm_pool.get( - join_path(settings['etcd']['vm_prefix'], data["uuid"]) + join_path(settings["etcd"]["vm_prefix"], data["uuid"]) ) vm_value = vm.value.copy() vm_value["ip"] = [] @@ -107,13 +112,15 @@ class VmStatus(Resource): network_name, mac, tap = network_mac_and_tap network = shared.etcd_client.get( join_path( - settings['etcd']['network_prefix'], + settings["etcd"]["network_prefix"], data["name"], network_name, ), value_in_json=True, ) - ipv6_addr = network.value.get("ipv6").split("::")[0] + "::" + ipv6_addr = ( + network.value.get("ipv6").split("::")[0] + "::" + ) vm_value["ip"].append(mac2ipv6(mac, ipv6_addr)) vm.value = vm_value return vm.value @@ -128,7 +135,7 @@ class CreateImage(Resource): validator = schemas.CreateImageSchema(data) if validator.is_valid(): file_entry = shared.etcd_client.get( - join_path(settings['etcd']['file_prefix'], data["uuid"]) + join_path(settings["etcd"]["file_prefix"], data["uuid"]) ) file_entry_value = json.loads(file_entry.value) @@ -141,7 +148,9 @@ class CreateImage(Resource): "visibility": "public", } shared.etcd_client.put( - join_path(settings['etcd']['image_prefix'], data["uuid"]), + join_path( + settings["etcd"]["image_prefix"], data["uuid"] + ), json.dumps(image_entry_json), ) @@ -153,11 +162,9 @@ class ListPublicImages(Resource): @staticmethod def get(): images = shared.etcd_client.get_prefix( - settings['etcd']['image_prefix'], value_in_json=True + settings["etcd"]["image_prefix"], value_in_json=True ) - r = { - "images": [] - } + r = {"images": []} for image in images: image_key = "{}:{}".format( image.value["store_name"], image.value["name"] @@ -176,7 +183,7 @@ class VMAction(Resource): if validator.is_valid(): vm_entry = shared.vm_pool.get( - join_path(settings['etcd']['vm_prefix'], data["uuid"]) + join_path(settings["etcd"]["vm_prefix"], data["uuid"]) ) action = data["action"] @@ -184,13 +191,19 @@ class VMAction(Resource): action = "schedule" if action == "delete" and vm_entry.hostname == "": - if shared.storage_handler.is_vm_image_exists(vm_entry.uuid): - r_status = shared.storage_handler.delete_vm_image(vm_entry.uuid) + if shared.storage_handler.is_vm_image_exists( + vm_entry.uuid + ): + r_status = shared.storage_handler.delete_vm_image( + vm_entry.uuid + ) if r_status: shared.etcd_client.client.delete(vm_entry.key) return {"message": "VM successfully deleted"} else: - logger.error("Some Error Occurred while deleting VM") + logger.error( + "Some Error Occurred while deleting VM" + ) return {"message": "VM deletion unsuccessfull"} else: shared.etcd_client.client.delete(vm_entry.key) @@ -200,10 +213,13 @@ class VMAction(Resource): type="{}VM".format(action.title()), uuid=data["uuid"], hostname=vm_entry.hostname, - request_prefix=settings['etcd']['request_prefix'] + request_prefix=settings["etcd"]["request_prefix"], ) shared.request_pool.put(r) - return {"message": "VM {} Queued".format(action.title())}, 200 + return ( + {"message": "VM {} Queued".format(action.title())}, + 200, + ) else: return validator.get_errors(), 400 @@ -216,15 +232,21 @@ class VMMigration(Resource): if validator.is_valid(): vm = shared.vm_pool.get(data["uuid"]) - r = RequestEntry.from_scratch(type=RequestType.InitVMMigration, - uuid=vm.uuid, - hostname=join_path( - settings['etcd']['host_prefix'], validator.destination.value - ), - request_prefix=settings['etcd']['request_prefix']) + r = RequestEntry.from_scratch( + type=RequestType.InitVMMigration, + uuid=vm.uuid, + hostname=join_path( + settings["etcd"]["host_prefix"], + validator.destination.value, + ), + request_prefix=settings["etcd"]["request_prefix"], + ) shared.request_pool.put(r) - return {"message": "VM Migration Initialization Queued"}, 200 + return ( + {"message": "VM Migration Initialization Queued"}, + 200, + ) else: return validator.get_errors(), 400 @@ -237,10 +259,12 @@ class ListUserVM(Resource): if validator.is_valid(): vms = shared.etcd_client.get_prefix( - settings['etcd']['vm_prefix'], value_in_json=True + settings["etcd"]["vm_prefix"], value_in_json=True ) return_vms = [] - user_vms = filter(lambda v: v.value["owner"] == data["name"], vms) + user_vms = filter( + lambda v: v.value["owner"] == data["name"], vms + ) for vm in user_vms: return_vms.append( { @@ -249,9 +273,7 @@ class ListUserVM(Resource): "specs": vm.value["specs"], "status": vm.value["status"], "hostname": vm.value["hostname"], - "vnc_socket": None - if vm.value.get("vnc_socket", None) is None - else vm.value["vnc_socket"], + "vnc_socket": vm.value.get("vnc_socket", None), } ) if return_vms: @@ -270,11 +292,13 @@ class ListUserFiles(Resource): if validator.is_valid(): files = shared.etcd_client.get_prefix( - settings['etcd']['file_prefix'], value_in_json=True + settings["etcd"]["file_prefix"], value_in_json=True ) return_files = [] user_files = list( - filter(lambda f: f.value["owner"] == data["name"], files) + filter( + lambda f: f.value["owner"] == data["name"], files + ) ) for file in user_files: return_files.append( @@ -294,14 +318,18 @@ class CreateHost(Resource): data = request.json validator = schemas.CreateHostSchema(data) if validator.is_valid(): - host_key = join_path(settings['etcd']['host_prefix'], uuid4().hex) + host_key = join_path( + settings["etcd"]["host_prefix"], uuid4().hex + ) host_entry = { "specs": data["specs"], "hostname": data["hostname"], "status": "DEAD", "last_heartbeat": "", } - shared.etcd_client.put(host_key, host_entry, value_in_json=True) + shared.etcd_client.put( + host_key, host_entry, value_in_json=True + ) return {"message": "Host Created"}, 200 @@ -333,7 +361,7 @@ class GetSSHKeys(Resource): # {user_prefix}/{realm}/{name}/key/ etcd_key = join_path( - settings['etcd']['user_prefix'], + settings["etcd"]["user_prefix"], data["realm"], data["name"], "key", @@ -343,25 +371,30 @@ class GetSSHKeys(Resource): ) keys = { - key.key.split("/")[-1]: key.value for key in etcd_entry + key.key.split("/")[-1]: key.value + for key in etcd_entry } return {"keys": keys} else: # {user_prefix}/{realm}/{name}/key/{key_name} etcd_key = join_path( - settings['etcd']['user_prefix'], + settings["etcd"]["user_prefix"], data["realm"], data["name"], "key", data["key_name"], ) - etcd_entry = shared.etcd_client.get(etcd_key, value_in_json=True) + etcd_entry = shared.etcd_client.get( + etcd_key, value_in_json=True + ) if etcd_entry: return { "keys": { - etcd_entry.key.split("/")[-1]: etcd_entry.value + etcd_entry.key.split("/")[ + -1 + ]: etcd_entry.value } } else: @@ -379,13 +412,15 @@ class AddSSHKey(Resource): # {user_prefix}/{realm}/{name}/key/{key_name} etcd_key = join_path( - settings['etcd']['user_prefix'], + settings["etcd"]["user_prefix"], data["realm"], data["name"], "key", data["key_name"], ) - etcd_entry = shared.etcd_client.get(etcd_key, value_in_json=True) + etcd_entry = shared.etcd_client.get( + etcd_key, value_in_json=True + ) if etcd_entry: return { "message": "Key with name '{}' already exists".format( @@ -394,7 +429,9 @@ class AddSSHKey(Resource): } else: # Key Not Found. It implies user' haven't added any key yet. - shared.etcd_client.put(etcd_key, data["key"], value_in_json=True) + shared.etcd_client.put( + etcd_key, data["key"], value_in_json=True + ) return {"message": "Key added successfully"} else: return validator.get_errors(), 400 @@ -409,13 +446,15 @@ class RemoveSSHKey(Resource): # {user_prefix}/{realm}/{name}/key/{key_name} etcd_key = join_path( - settings['etcd']['user_prefix'], + settings["etcd"]["user_prefix"], data["realm"], data["name"], "key", data["key_name"], ) - etcd_entry = shared.etcd_client.get(etcd_key, value_in_json=True) + etcd_entry = shared.etcd_client.get( + etcd_key, value_in_json=True + ) if etcd_entry: shared.etcd_client.client.delete(etcd_key) return {"message": "Key successfully removed."} @@ -446,15 +485,17 @@ class CreateNetwork(Resource): if validator.user.value: try: nb = pynetbox.api( - url=settings['netbox']['url'], - token=settings['netbox']['token'], + url=settings["netbox"]["url"], + token=settings["netbox"]["token"], ) nb_prefix = nb.ipam.prefixes.get( - prefix=settings['network']['prefix'] + prefix=settings["network"]["prefix"] ) prefix = nb_prefix.available_prefixes.create( data={ - "prefix_length": int(settings['network']['prefix_length']), + "prefix_length": int( + settings["network"]["prefix_length"] + ), "description": '{}\'s network "{}"'.format( data["name"], data["network_name"] ), @@ -463,18 +504,22 @@ class CreateNetwork(Resource): ) except Exception as err: app.logger.error(err) - return {"message": "Error occured while creating network."} + return { + "message": "Error occured while creating network." + } else: network_entry["ipv6"] = prefix["prefix"] else: network_entry["ipv6"] = "fd00::/64" network_key = join_path( - settings['etcd']['network_prefix'], - data['name'], - data['network_name'], + settings["etcd"]["network_prefix"], + data["name"], + data["network_name"], + ) + shared.etcd_client.put( + network_key, network_entry, value_in_json=True ) - shared.etcd_client.put(network_key, network_entry, value_in_json=True) return {"message": "Network successfully added."} else: return validator.get_errors(), 400 @@ -488,9 +533,11 @@ class ListUserNetwork(Resource): if validator.is_valid(): prefix = join_path( - settings['etcd']['network_prefix'], data["name"] + settings["etcd"]["network_prefix"], data["name"] + ) + networks = shared.etcd_client.get_prefix( + prefix, value_in_json=True ) - networks = shared.etcd_client.get_prefix(prefix, value_in_json=True) user_networks = [] for net in networks: net.value["name"] = net.key.split("/")[-1] @@ -524,7 +571,11 @@ api.add_resource(CreateNetwork, "/network/create") def main(): - image_stores = list(shared.etcd_client.get_prefix(settings['etcd']['image_store_prefix'], value_in_json=True)) + image_stores = list( + shared.etcd_client.get_prefix( + settings["etcd"]["image_store_prefix"], value_in_json=True + ) + ) if len(image_stores) == 0: data = { "is_public": True, @@ -534,7 +585,12 @@ def main(): "attributes": {"list": [], "key": [], "pool": "images"}, } - shared.etcd_client.put(join_path(settings['etcd']['image_store_prefix'], uuid4().hex), json.dumps(data)) + shared.etcd_client.put( + join_path( + settings["etcd"]["image_store_prefix"], uuid4().hex + ), + json.dumps(data), + ) app.run(host="::", debug=True) diff --git a/ucloud/api/schemas.py b/ucloud/api/schemas.py index d639be4..a848a7d 100755 --- a/ucloud/api/schemas.py +++ b/ucloud/api/schemas.py @@ -80,7 +80,12 @@ class OTPSchema(BaseSchema): super().__init__(data=data, fields=_fields) def validation(self): - if check_otp(self.name.value, self.realm.value, self.token.value) != 200: + if ( + check_otp( + self.name.value, self.realm.value, self.token.value + ) + != 200 + ): self.add_error("Wrong Credentials") @@ -92,7 +97,9 @@ class CreateImageSchema(BaseSchema): # Fields self.uuid = Field("uuid", str, data.get("uuid", KeyError)) self.name = Field("name", str, data.get("name", KeyError)) - self.image_store = Field("image_store", str, data.get("image_store", KeyError)) + self.image_store = Field( + "image_store", str, data.get("image_store", KeyError) + ) # Validations self.uuid.validation = self.file_uuid_validation @@ -103,34 +110,52 @@ class CreateImageSchema(BaseSchema): super().__init__(data, fields) def file_uuid_validation(self): - file_entry = shared.etcd_client.get(os.path.join(settings['etcd']['file_prefix'], self.uuid.value)) + file_entry = shared.etcd_client.get( + os.path.join( + settings["etcd"]["file_prefix"], self.uuid.value + ) + ) if file_entry is None: self.add_error( - "Image File with uuid '{}' Not Found".format(self.uuid.value) + "Image File with uuid '{}' Not Found".format( + self.uuid.value + ) ) def image_store_name_validation(self): - image_stores = list(shared.etcd_client.get_prefix(settings['etcd']['image_store_prefix'])) + image_stores = list( + shared.etcd_client.get_prefix( + settings["etcd"]["image_store_prefix"] + ) + ) image_store = next( filter( - lambda s: json.loads(s.value)["name"] == self.image_store.value, + lambda s: json.loads(s.value)["name"] + == self.image_store.value, image_stores, ), None, ) if not image_store: - self.add_error("Store '{}' does not exists".format(self.image_store.value)) + self.add_error( + "Store '{}' does not exists".format( + self.image_store.value + ) + ) # Host Operations + class CreateHostSchema(OTPSchema): def __init__(self, data): self.parsed_specs = {} # Fields self.specs = Field("specs", dict, data.get("specs", KeyError)) - self.hostname = Field("hostname", str, data.get("hostname", KeyError)) + self.hostname = Field( + "hostname", str, data.get("hostname", KeyError) + ) # Validation self.specs.validation = self.specs_validation @@ -142,22 +167,28 @@ class CreateHostSchema(OTPSchema): def specs_validation(self): ALLOWED_BASE = 10 - _cpu = self.specs.value.get('cpu', KeyError) - _ram = self.specs.value.get('ram', KeyError) - _os_ssd = self.specs.value.get('os-ssd', KeyError) - _hdd = self.specs.value.get('hdd', KeyError) + _cpu = self.specs.value.get("cpu", KeyError) + _ram = self.specs.value.get("ram", KeyError) + _os_ssd = self.specs.value.get("os-ssd", KeyError) + _hdd = self.specs.value.get("hdd", KeyError) if KeyError in [_cpu, _ram, _os_ssd, _hdd]: - self.add_error("You must specify CPU, RAM and OS-SSD in your specs") + self.add_error( + "You must specify CPU, RAM and OS-SSD in your specs" + ) return None try: parsed_ram = bitmath.parse_string_unsafe(_ram) parsed_os_ssd = bitmath.parse_string_unsafe(_os_ssd) if parsed_ram.base != ALLOWED_BASE: - self.add_error("Your specified RAM is not in correct units") + self.add_error( + "Your specified RAM is not in correct units" + ) if parsed_os_ssd.base != ALLOWED_BASE: - self.add_error("Your specified OS-SSD is not in correct units") + self.add_error( + "Your specified OS-SSD is not in correct units" + ) if _cpu < 1: self.add_error("CPU must be atleast 1") @@ -172,7 +203,9 @@ class CreateHostSchema(OTPSchema): for hdd in _hdd: _parsed_hdd = bitmath.parse_string_unsafe(hdd) if _parsed_hdd.base != ALLOWED_BASE: - self.add_error("Your specified HDD is not in correct units") + self.add_error( + "Your specified HDD is not in correct units" + ) break else: parsed_hdd.append(str(_parsed_hdd)) @@ -183,15 +216,17 @@ class CreateHostSchema(OTPSchema): else: if self.get_errors(): self.specs = { - 'cpu': _cpu, - 'ram': str(parsed_ram), - 'os-ssd': str(parsed_os_ssd), - 'hdd': parsed_hdd + "cpu": _cpu, + "ram": str(parsed_ram), + "os-ssd": str(parsed_os_ssd), + "hdd": parsed_hdd, } def validation(self): if self.realm.value != "ungleich-admin": - self.add_error("Invalid Credentials/Insufficient Permission") + self.add_error( + "Invalid Credentials/Insufficient Permission" + ) # VM Operations @@ -203,9 +238,13 @@ class CreateVMSchema(OTPSchema): # Fields self.specs = Field("specs", dict, data.get("specs", KeyError)) - self.vm_name = Field("vm_name", str, data.get("vm_name", KeyError)) + self.vm_name = Field( + "vm_name", str, data.get("vm_name", KeyError) + ) self.image = Field("image", str, data.get("image", KeyError)) - self.network = Field("network", list, data.get("network", KeyError)) + self.network = Field( + "network", list, data.get("network", KeyError) + ) # Validation self.image.validation = self.image_validation @@ -219,17 +258,25 @@ class CreateVMSchema(OTPSchema): def image_validation(self): try: - image_uuid = helper.resolve_image_name(self.image.value, shared.etcd_client) + image_uuid = helper.resolve_image_name( + self.image.value, shared.etcd_client + ) except Exception as e: - logger.exception("Cannot resolve image name = %s", self.image.value) + logger.exception( + "Cannot resolve image name = %s", self.image.value + ) self.add_error(str(e)) else: self.image_uuid = image_uuid def vm_name_validation(self): - if resolve_vm_name(name=self.vm_name.value, owner=self.name.value): + if resolve_vm_name( + name=self.vm_name.value, owner=self.name.value + ): self.add_error( - 'VM with same name "{}" already exists'.format(self.vm_name.value) + 'VM with same name "{}" already exists'.format( + self.vm_name.value + ) ) def network_validation(self): @@ -237,32 +284,46 @@ class CreateVMSchema(OTPSchema): if _network: for net in _network: - network = shared.etcd_client.get(os.path.join(settings['etcd']['network_prefix'], - self.name.value, - net), value_in_json=True) + network = shared.etcd_client.get( + os.path.join( + settings["etcd"]["network_prefix"], + self.name.value, + net, + ), + value_in_json=True, + ) if not network: - self.add_error("Network with name {} does not exists" \ - .format(net)) + self.add_error( + "Network with name {} does not exists".format( + net + ) + ) def specs_validation(self): ALLOWED_BASE = 10 - _cpu = self.specs.value.get('cpu', KeyError) - _ram = self.specs.value.get('ram', KeyError) - _os_ssd = self.specs.value.get('os-ssd', KeyError) - _hdd = self.specs.value.get('hdd', KeyError) + _cpu = self.specs.value.get("cpu", KeyError) + _ram = self.specs.value.get("ram", KeyError) + _os_ssd = self.specs.value.get("os-ssd", KeyError) + _hdd = self.specs.value.get("hdd", KeyError) if KeyError in [_cpu, _ram, _os_ssd, _hdd]: - self.add_error("You must specify CPU, RAM and OS-SSD in your specs") + self.add_error( + "You must specify CPU, RAM and OS-SSD in your specs" + ) return None try: parsed_ram = bitmath.parse_string_unsafe(_ram) parsed_os_ssd = bitmath.parse_string_unsafe(_os_ssd) if parsed_ram.base != ALLOWED_BASE: - self.add_error("Your specified RAM is not in correct units") + self.add_error( + "Your specified RAM is not in correct units" + ) if parsed_os_ssd.base != ALLOWED_BASE: - self.add_error("Your specified OS-SSD is not in correct units") + self.add_error( + "Your specified OS-SSD is not in correct units" + ) if _cpu < 1: self.add_error("CPU must be atleast 1") @@ -277,7 +338,9 @@ class CreateVMSchema(OTPSchema): for hdd in _hdd: _parsed_hdd = bitmath.parse_string_unsafe(hdd) if _parsed_hdd.base != ALLOWED_BASE: - self.add_error("Your specified HDD is not in correct units") + self.add_error( + "Your specified HDD is not in correct units" + ) break else: parsed_hdd.append(str(_parsed_hdd)) @@ -288,21 +351,24 @@ class CreateVMSchema(OTPSchema): else: if self.get_errors(): self.specs = { - 'cpu': _cpu, - 'ram': str(parsed_ram), - 'os-ssd': str(parsed_os_ssd), - 'hdd': parsed_hdd + "cpu": _cpu, + "ram": str(parsed_ram), + "os-ssd": str(parsed_os_ssd), + "hdd": parsed_hdd, } class VMStatusSchema(OTPSchema): def __init__(self, data): data["uuid"] = ( - resolve_vm_name( - name=data.get("vm_name", None), - owner=(data.get("in_support_of", None) or data.get("name", None)), - ) - or KeyError + resolve_vm_name( + name=data.get("vm_name", None), + owner=( + data.get("in_support_of", None) + or data.get("name", None) + ), + ) + or KeyError ) self.uuid = VmUUIDField(data) @@ -313,7 +379,8 @@ class VMStatusSchema(OTPSchema): def validation(self): vm = shared.vm_pool.get(self.uuid.value) if not ( - vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" + vm.value["owner"] == self.name.value + or self.realm.value == "ungleich-admin" ): self.add_error("Invalid User") @@ -321,11 +388,14 @@ class VMStatusSchema(OTPSchema): class VmActionSchema(OTPSchema): def __init__(self, data): data["uuid"] = ( - resolve_vm_name( - name=data.get("vm_name", None), - owner=(data.get("in_support_of", None) or data.get("name", None)), - ) - or KeyError + resolve_vm_name( + name=data.get("vm_name", None), + owner=( + data.get("in_support_of", None) + or data.get("name", None) + ), + ) + or KeyError ) self.uuid = VmUUIDField(data) self.action = Field("action", str, data.get("action", KeyError)) @@ -340,20 +410,23 @@ class VmActionSchema(OTPSchema): allowed_actions = ["start", "stop", "delete"] if self.action.value not in allowed_actions: self.add_error( - "Invalid Action. Allowed Actions are {}".format(allowed_actions) + "Invalid Action. Allowed Actions are {}".format( + allowed_actions + ) ) def validation(self): vm = shared.vm_pool.get(self.uuid.value) if not ( - vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" + vm.value["owner"] == self.name.value + or self.realm.value == "ungleich-admin" ): self.add_error("Invalid User") if ( - self.action.value == "start" - and vm.status == VMStatus.running - and vm.hostname != "" + self.action.value == "start" + and vm.status == VMStatus.running + and vm.hostname != "" ): self.add_error("VM Already Running") @@ -367,15 +440,20 @@ class VmActionSchema(OTPSchema): class VmMigrationSchema(OTPSchema): def __init__(self, data): data["uuid"] = ( - resolve_vm_name( - name=data.get("vm_name", None), - owner=(data.get("in_support_of", None) or data.get("name", None)), - ) - or KeyError + resolve_vm_name( + name=data.get("vm_name", None), + owner=( + data.get("in_support_of", None) + or data.get("name", None) + ), + ) + or KeyError ) self.uuid = VmUUIDField(data) - self.destination = Field("destination", str, data.get("destination", KeyError)) + self.destination = Field( + "destination", str, data.get("destination", KeyError) + ) self.destination.validation = self.destination_validation @@ -384,9 +462,18 @@ class VmMigrationSchema(OTPSchema): def destination_validation(self): hostname = self.destination.value - host = next(filter(lambda h: h.hostname == hostname, shared.host_pool.hosts), None) + host = next( + filter( + lambda h: h.hostname == hostname, shared.host_pool.hosts + ), + None, + ) if not host: - self.add_error("No Such Host ({}) exists".format(self.destination.value)) + self.add_error( + "No Such Host ({}) exists".format( + self.destination.value + ) + ) elif host.status != HostStatus.alive: self.add_error("Destination Host is dead") else: @@ -395,20 +482,27 @@ class VmMigrationSchema(OTPSchema): def validation(self): vm = shared.vm_pool.get(self.uuid.value) if not ( - vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" + vm.value["owner"] == self.name.value + or self.realm.value == "ungleich-admin" ): self.add_error("Invalid User") if vm.status != VMStatus.running: self.add_error("Can't migrate non-running VM") - if vm.hostname == os.path.join(settings['etcd']['host_prefix'], self.destination.value): - self.add_error("Destination host couldn't be same as Source Host") + if vm.hostname == os.path.join( + settings["etcd"]["host_prefix"], self.destination.value + ): + self.add_error( + "Destination host couldn't be same as Source Host" + ) class AddSSHSchema(OTPSchema): def __init__(self, data): - self.key_name = Field("key_name", str, data.get("key_name", KeyError)) + self.key_name = Field( + "key_name", str, data.get("key_name", KeyError) + ) self.key = Field("key", str, data.get("key_name", KeyError)) fields = [self.key_name, self.key] @@ -417,7 +511,9 @@ class AddSSHSchema(OTPSchema): class RemoveSSHSchema(OTPSchema): def __init__(self, data): - self.key_name = Field("key_name", str, data.get("key_name", KeyError)) + self.key_name = Field( + "key_name", str, data.get("key_name", KeyError) + ) fields = [self.key_name] super().__init__(data=data, fields=fields) @@ -425,7 +521,9 @@ class RemoveSSHSchema(OTPSchema): class GetSSHSchema(OTPSchema): def __init__(self, data): - self.key_name = Field("key_name", str, data.get("key_name", None)) + self.key_name = Field( + "key_name", str, data.get("key_name", None) + ) fields = [self.key_name] super().__init__(data=data, fields=fields) @@ -433,7 +531,9 @@ class GetSSHSchema(OTPSchema): class CreateNetwork(OTPSchema): def __init__(self, data): - self.network_name = Field("network_name", str, data.get("network_name", KeyError)) + self.network_name = Field( + "network_name", str, data.get("network_name", KeyError) + ) self.type = Field("type", str, data.get("type", KeyError)) self.user = Field("user", bool, bool(data.get("user", False))) @@ -444,15 +544,26 @@ class CreateNetwork(OTPSchema): super().__init__(data, fields=fields) def network_name_validation(self): - network = shared.etcd_client.get(os.path.join(settings['etcd']['network_prefix'], - self.name.value, - self.network_name.value), - value_in_json=True) + network = shared.etcd_client.get( + os.path.join( + settings["etcd"]["network_prefix"], + self.name.value, + self.network_name.value, + ), + value_in_json=True, + ) if network: - self.add_error("Network with name {} already exists" \ - .format(self.network_name.value)) + self.add_error( + "Network with name {} already exists".format( + self.network_name.value + ) + ) def network_type_validation(self): supported_network_types = ["vxlan"] if self.type.value not in supported_network_types: - self.add_error("Unsupported Network Type. Supported network types are {}".format(supported_network_types)) + self.add_error( + "Unsupported Network Type. Supported network types are {}".format( + supported_network_types + ) + ) diff --git a/ucloud/common/etcd_wrapper.py b/ucloud/common/etcd_wrapper.py index 5f464e1..7367a6c 100644 --- a/ucloud/common/etcd_wrapper.py +++ b/ucloud/common/etcd_wrapper.py @@ -8,7 +8,7 @@ from functools import wraps from . import logger -PseudoEtcdMeta = namedtuple('PseudoEtcdMeta', ['key']) +PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"]) class EtcdEntry: @@ -16,8 +16,8 @@ class EtcdEntry: # value: str def __init__(self, meta, value, value_in_json=False): - self.key = meta.key.decode('utf-8') - self.value = value.decode('utf-8') + self.key = meta.key.decode("utf-8") + self.value = value.decode("utf-8") if value_in_json: self.value = json.loads(self.value) @@ -29,11 +29,18 @@ def readable_errors(func): try: return func(*args, **kwargs) except etcd3.exceptions.ConnectionFailedError as err: - raise etcd3.exceptions.ConnectionFailedError('etcd connection failed.') from err + raise etcd3.exceptions.ConnectionFailedError( + "etcd connection failed." + ) from err except etcd3.exceptions.ConnectionTimeoutError as err: - raise etcd3.exceptions.ConnectionTimeoutError('etcd connection timeout.') from err + raise etcd3.exceptions.ConnectionTimeoutError( + "etcd connection timeout." + ) from err except Exception: - logger.exception('Some etcd error occured. See syslog for details.') + logger.exception( + "Some etcd error occured. See syslog for details." + ) + return wrapper @@ -56,7 +63,7 @@ class Etcd3Wrapper: _value = json.dumps(_value) if not isinstance(_key, str): - _key = _key.decode('utf-8') + _key = _key.decode("utf-8") return self.client.put(_key, _value, **kwargs) @@ -70,18 +77,25 @@ class Etcd3Wrapper: @readable_errors def watch_prefix(self, key, timeout=0, value_in_json=False): - timeout_event = EtcdEntry(PseudoEtcdMeta(key=b'TIMEOUT'), - value=str.encode(json.dumps({'status': 'TIMEOUT', - 'type': 'TIMEOUT'})), - value_in_json=value_in_json) + timeout_event = EtcdEntry( + PseudoEtcdMeta(key=b"TIMEOUT"), + value=str.encode( + json.dumps({"status": "TIMEOUT", "type": "TIMEOUT"}) + ), + value_in_json=value_in_json, + ) event_queue = queue.Queue() def add_event_to_queue(event): - if hasattr(event, 'events'): + if hasattr(event, "events"): for e in event.events: if e.value: - event_queue.put(EtcdEntry(e, e.value, value_in_json=value_in_json)) + event_queue.put( + EtcdEntry( + e, e.value, value_in_json=value_in_json + ) + ) self.client.add_watch_prefix_callback(key, add_event_to_queue) @@ -96,4 +110,8 @@ class Etcd3Wrapper: class PsuedoEtcdEntry(EtcdEntry): def __init__(self, key, value, value_in_json=False): - super().__init__(PseudoEtcdMeta(key=key.encode('utf-8')), value, value_in_json=value_in_json) + super().__init__( + PseudoEtcdMeta(key=key.encode("utf-8")), + value, + value_in_json=value_in_json, + ) diff --git a/ucloud/common/host.py b/ucloud/common/host.py index ccbf7a8..191a2c0 100644 --- a/ucloud/common/host.py +++ b/ucloud/common/host.py @@ -29,7 +29,9 @@ class HostEntry(SpecificEtcdEntryBase): self.last_heartbeat = time.strftime("%Y-%m-%d %H:%M:%S") def is_alive(self): - last_heartbeat = datetime.strptime(self.last_heartbeat, "%Y-%m-%d %H:%M:%S") + last_heartbeat = datetime.strptime( + self.last_heartbeat, "%Y-%m-%d %H:%M:%S" + ) delta = datetime.now() - last_heartbeat if delta.total_seconds() > 60: return False diff --git a/ucloud/common/logging.py b/ucloud/common/logging.py index ba1e59d..9e0d2be 100644 --- a/ucloud/common/logging.py +++ b/ucloud/common/logging.py @@ -7,21 +7,21 @@ class NoTracebackStreamHandler(logging.StreamHandler): info, cache = record.exc_info, record.exc_text record.exc_info, record.exc_text = None, None - if record.levelname in ['WARNING', 'WARN']: + if record.levelname in ["WARNING", "WARN"]: color = colorama.Fore.LIGHTYELLOW_EX - elif record.levelname == 'ERROR': + elif record.levelname == "ERROR": color = colorama.Fore.LIGHTRED_EX - elif record.levelname == 'INFO': + elif record.levelname == "INFO": color = colorama.Fore.LIGHTGREEN_EX - elif record.levelname == 'CRITICAL': + elif record.levelname == "CRITICAL": color = colorama.Fore.LIGHTCYAN_EX else: color = colorama.Fore.WHITE try: - print(color, end='', flush=True) + print(color, end="", flush=True) super().handle(record) finally: record.exc_info = info record.exc_text = cache - print(colorama.Style.RESET_ALL, end='', flush=True) + print(colorama.Style.RESET_ALL, end="", flush=True) diff --git a/ucloud/common/network.py b/ucloud/common/network.py index 1503446..61dbd64 100644 --- a/ucloud/common/network.py +++ b/ucloud/common/network.py @@ -11,7 +11,9 @@ def random_bytes(num=6): return [random.randrange(256) for _ in range(num)] -def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='%02x'): +def generate_mac( + uaa=False, multicast=False, oui=None, separator=":", byte_fmt="%02x" +): mac = random_bytes() if oui: if type(oui) == str: @@ -30,35 +32,51 @@ def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt=' def create_dev(script, _id, dev, ip=None): - command = ['sudo', '-p', 'Enter password to create network devices for vm: ', - script, str(_id), dev] + command = [ + "sudo", + "-p", + "Enter password to create network devices for vm: ", + script, + str(_id), + dev, + ] if ip: command.append(ip) try: output = sp.check_output(command, stderr=sp.PIPE) except Exception: - logger.exception('Creation of interface %s failed.', dev) + logger.exception("Creation of interface %s failed.", dev) return None else: - return output.decode('utf-8').strip() + return output.decode("utf-8").strip() def delete_network_interface(iface): try: sp.check_output( [ - 'sudo', '-p', 'Enter password to remove {} network device: '.format(iface), - 'ip', 'link', 'del', iface - ], stderr=sp.PIPE + "sudo", + "-p", + "Enter password to remove {} network device: ".format( + iface + ), + "ip", + "link", + "del", + iface, + ], + stderr=sp.PIPE, ) except Exception: - logger.exception('Interface %s Deletion failed', iface) + logger.exception("Interface %s Deletion failed", iface) def find_free_port(): - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + with closing( + socket.socket(socket.AF_INET, socket.SOCK_STREAM) + ) as s: try: - s.bind(('', 0)) + s.bind(("", 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) except Exception: return None diff --git a/ucloud/common/request.py b/ucloud/common/request.py index 5705eed..a8c2d0a 100644 --- a/ucloud/common/request.py +++ b/ucloud/common/request.py @@ -17,7 +17,6 @@ class RequestType: class RequestEntry(SpecificEtcdEntryBase): - def __init__(self, e): self.destination_sock_path = None self.destination_host_key = None @@ -30,8 +29,11 @@ class RequestEntry(SpecificEtcdEntryBase): @classmethod def from_scratch(cls, request_prefix, **kwargs): - e = PsuedoEtcdEntry(join(request_prefix, uuid4().hex), - value=json.dumps(kwargs).encode("utf-8"), value_in_json=True) + e = PsuedoEtcdEntry( + join(request_prefix, uuid4().hex), + value=json.dumps(kwargs).encode("utf-8"), + value_in_json=True, + ) return cls(e) diff --git a/ucloud/common/schemas.py b/ucloud/common/schemas.py index a592ec2..04978a5 100644 --- a/ucloud/common/schemas.py +++ b/ucloud/common/schemas.py @@ -14,7 +14,7 @@ class StorageUnit(fields.Field): class SpecsSchema(Schema): cpu = fields.Int() ram = StorageUnit() - os_ssd = StorageUnit(data_key='os-ssd', attribute='os-ssd') + os_ssd = StorageUnit(data_key="os-ssd", attribute="os-ssd") hdd = fields.List(StorageUnit()) @@ -29,11 +29,13 @@ class VMSchema(Schema): image_uuid = fields.Str() hostname = fields.Str() metadata = fields.Dict() - network = fields.List(fields.Tuple((fields.Str(), fields.Str(), fields.Int()))) + network = fields.List( + fields.Tuple((fields.Str(), fields.Str(), fields.Int())) + ) in_migration = fields.Bool() class NetworkSchema(Schema): - _id = fields.Int(data_key='id', attribute='id') - _type = fields.Str(data_key='type', attribute='type') + _id = fields.Int(data_key="id", attribute="id") + _type = fields.Str(data_key="type", attribute="type") ipv6 = fields.Str() diff --git a/ucloud/common/storage_handlers.py b/ucloud/common/storage_handlers.py index d2190ba..b337f23 100644 --- a/ucloud/common/storage_handlers.py +++ b/ucloud/common/storage_handlers.py @@ -11,7 +11,7 @@ from ucloud.settings import settings as config class ImageStorageHandler(ABC): - handler_name = 'base' + handler_name = "base" def __init__(self, image_base, vm_base): self.image_base = image_base @@ -55,9 +55,9 @@ class ImageStorageHandler(ABC): try: sp.check_output(command, stderr=sp.PIPE) except sp.CalledProcessError as e: - _stderr = e.stderr.decode('utf-8').strip() + _stderr = e.stderr.decode("utf-8").strip() if report: - logger.exception('%s:- %s', error_origin, _stderr) + logger.exception("%s:- %s", error_origin, _stderr) return False return True @@ -72,14 +72,16 @@ class ImageStorageHandler(ABC): class FileSystemBasedImageStorageHandler(ImageStorageHandler): - handler_name = 'Filesystem' + handler_name = "Filesystem" def import_image(self, src, dest, protect=False): dest = join_path(self.image_base, dest) try: shutil.copy(src, dest) if protect: - os.chmod(dest, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH) + os.chmod( + dest, stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH + ) except Exception as e: logger.exception(e) return False @@ -97,7 +99,14 @@ class FileSystemBasedImageStorageHandler(ImageStorageHandler): def resize_vm_image(self, path, size): path = join_path(self.vm_base, path) - command = ["qemu-img", "resize", "-f", "raw", path, "{}M".format(size)] + command = [ + "qemu-img", + "resize", + "-f", + "raw", + path, + "{}M".format(size), + ] if self.execute_command(command): return True else: @@ -126,15 +135,25 @@ class FileSystemBasedImageStorageHandler(ImageStorageHandler): class CEPHBasedImageStorageHandler(ImageStorageHandler): - handler_name = 'Ceph' + handler_name = "Ceph" def import_image(self, src, dest, protect=False): dest = join_path(self.image_base, dest) import_command = ["rbd", "import", src, dest] commands = [import_command] if protect: - snap_create_command = ["rbd", "snap", "create", "{}@protected".format(dest)] - snap_protect_command = ["rbd", "snap", "protect", "{}@protected".format(dest)] + snap_create_command = [ + "rbd", + "snap", + "create", + "{}@protected".format(dest), + ] + snap_protect_command = [ + "rbd", + "snap", + "protect", + "{}@protected".format(dest), + ] commands.append(snap_create_command) commands.append(snap_protect_command) @@ -174,16 +193,16 @@ class CEPHBasedImageStorageHandler(ImageStorageHandler): def get_storage_handler(): - __storage_backend = config['storage']['storage_backend'] - if __storage_backend == 'filesystem': + __storage_backend = config["storage"]["storage_backend"] + if __storage_backend == "filesystem": return FileSystemBasedImageStorageHandler( - vm_base=config['storage']['vm_dir'], - image_base=config['storage']['image_dir'] + vm_base=config["storage"]["vm_dir"], + image_base=config["storage"]["image_dir"], ) - elif __storage_backend == 'ceph': + elif __storage_backend == "ceph": return CEPHBasedImageStorageHandler( - vm_base=config['storage']['ceph_vm_pool'], - image_base=config['storage']['ceph_image_pool'] + vm_base=config["storage"]["ceph_vm_pool"], + image_base=config["storage"]["ceph_image_pool"], ) else: - raise Exception('Unknown Image Storage Handler') + raise Exception("Unknown Image Storage Handler") diff --git a/ucloud/common/vm.py b/ucloud/common/vm.py index 238f19d..d11046d 100644 --- a/ucloud/common/vm.py +++ b/ucloud/common/vm.py @@ -13,13 +13,12 @@ class VMStatus: def declare_stopped(vm): - vm['hostname'] = '' - vm['in_migration'] = False - vm['status'] = VMStatus.stopped + vm["hostname"] = "" + vm["in_migration"] = False + vm["status"] = VMStatus.stopped class VMEntry(SpecificEtcdEntryBase): - def __init__(self, e): self.owner = None # type: str self.specs = None # type: dict @@ -48,7 +47,9 @@ class VMEntry(SpecificEtcdEntryBase): def add_log(self, msg): self.log = self.log[:5] - self.log.append("{} - {}".format(datetime.now().isoformat(), msg)) + self.log.append( + "{} - {}".format(datetime.now().isoformat(), msg) + ) class VmPool: diff --git a/ucloud/configure/main.py b/ucloud/configure/main.py index e4770d9..31201f6 100644 --- a/ucloud/configure/main.py +++ b/ucloud/configure/main.py @@ -5,31 +5,41 @@ from ucloud.shared import shared def update_config(section, kwargs): - uncloud_config = shared.etcd_client.get(settings.config_key, value_in_json=True) + uncloud_config = shared.etcd_client.get( + settings.config_key, value_in_json=True + ) if not uncloud_config: uncloud_config = {} else: uncloud_config = uncloud_config.value - + uncloud_config[section] = kwargs - shared.etcd_client.put(settings.config_key, uncloud_config, value_in_json=True) + shared.etcd_client.put( + settings.config_key, uncloud_config, value_in_json=True + ) def configure_parser(parser): configure_subparsers = parser.add_subparsers(dest="subcommand") - + otp_parser = configure_subparsers.add_parser("otp") - otp_parser.add_argument("--verification-controller-url", - required=True, metavar="URL") - otp_parser.add_argument("--auth-name", required=True, - metavar="OTP-NAME") - otp_parser.add_argument("--auth-realm", required=True, - metavar="OTP-REALM") - otp_parser.add_argument("--auth-seed", required=True, - metavar="OTP-SEED") + otp_parser.add_argument( + "--verification-controller-url", required=True, metavar="URL" + ) + otp_parser.add_argument( + "--auth-name", required=True, metavar="OTP-NAME" + ) + otp_parser.add_argument( + "--auth-realm", required=True, metavar="OTP-REALM" + ) + otp_parser.add_argument( + "--auth-seed", required=True, metavar="OTP-SEED" + ) network_parser = configure_subparsers.add_parser("network") - network_parser.add_argument("--prefix-length", required=True, type=int) + network_parser.add_argument( + "--prefix-length", required=True, type=int + ) network_parser.add_argument("--prefix", required=True) network_parser.add_argument("--vxlan-phy-dev", required=True) @@ -38,25 +48,31 @@ def configure_parser(parser): netbox_parser.add_argument("--token", required=True) ssh_parser = configure_subparsers.add_parser("ssh") - ssh_parser.add_argument('--username', default="root") - ssh_parser.add_argument('--private-key-path', - default=os.path.expanduser("~/.ssh/id_rsa")) + ssh_parser.add_argument("--username", default="root") + ssh_parser.add_argument( + "--private-key-path", + default=os.path.expanduser("~/.ssh/id_rsa"), + ) storage_parser = configure_subparsers.add_parser("storage") - storage_parser.add_argument('--file-dir', required=True) - storage_parser_subparsers = storage_parser.add_subparsers(dest="storage_backend") - - filesystem_storage_parser = storage_parser_subparsers.add_parser("filesystem") - filesystem_storage_parser.add_argument('--vm-dir', required=True) - filesystem_storage_parser.add_argument('--image-dir', required=True) + storage_parser.add_argument("--file-dir", required=True) + storage_parser_subparsers = storage_parser.add_subparsers( + dest="storage_backend" + ) + + filesystem_storage_parser = storage_parser_subparsers.add_parser( + "filesystem" + ) + filesystem_storage_parser.add_argument("--vm-dir", required=True) + filesystem_storage_parser.add_argument("--image-dir", required=True) ceph_storage_parser = storage_parser_subparsers.add_parser("ceph") - ceph_storage_parser.add_argument('--ceph-vm-pool', required=True) - ceph_storage_parser.add_argument('--ceph-image-pool', required=True) + ceph_storage_parser.add_argument("--ceph-vm-pool", required=True) + ceph_storage_parser.add_argument("--ceph-image-pool", required=True) def main(**kwargs): - subcommand = kwargs.pop('subcommand') + subcommand = kwargs.pop("subcommand") if not subcommand: pass else: diff --git a/ucloud/docs/source/conf.py b/ucloud/docs/source/conf.py index 9b133f9..70307f8 100644 --- a/ucloud/docs/source/conf.py +++ b/ucloud/docs/source/conf.py @@ -17,9 +17,9 @@ # -- Project information ----------------------------------------------------- -project = 'ucloud' -copyright = '2019, ungleich' -author = 'ungleich' +project = "ucloud" +copyright = "2019, ungleich" +author = "ungleich" # -- General configuration --------------------------------------------------- @@ -27,12 +27,12 @@ author = 'ungleich' # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ - 'sphinx.ext.autodoc', - 'sphinx_rtd_theme', + "sphinx.ext.autodoc", + "sphinx_rtd_theme", ] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. @@ -50,4 +50,4 @@ html_theme = "sphinx_rtd_theme" # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] diff --git a/ucloud/filescanner/main.py b/ucloud/filescanner/main.py index 778e942..b12797b 100755 --- a/ucloud/filescanner/main.py +++ b/ucloud/filescanner/main.py @@ -21,7 +21,8 @@ def sha512sum(file: str): ELSE: return None """ - if not isinstance(file, str): raise TypeError + if not isinstance(file, str): + raise TypeError try: output = sp.check_output(["sha512sum", file], stderr=sp.PIPE) except sp.CalledProcessError as e: @@ -49,23 +50,25 @@ def track_file(file, base_dir): file_path = pathlib.Path(file).parts[-1] # Create Entry - entry_key = os.path.join(settings['etcd']['file_prefix'], str(file_id)) + entry_key = os.path.join( + settings["etcd"]["file_prefix"], str(file_id) + ) entry_value = { "filename": file_path, "owner": owner, "sha512sum": sha512sum(file), "creation_date": creation_date, - "size": os.path.getsize(file) + "size": os.path.getsize(file), } logger.info("Tracking %s", file) shared.etcd_client.put(entry_key, entry_value, value_in_json=True) - os.setxattr(file, 'user.utracked', b'True') + os.setxattr(file, "user.utracked", b"True") def main(): - base_dir = settings['storage']['file_dir'] + base_dir = settings["storage"]["file_dir"] # Recursively Get All Files and Folder below BASE_DIR files = glob.glob("{}/**".format(base_dir), recursive=True) @@ -76,7 +79,7 @@ def main(): untracked_files = [] for file in files: try: - os.getxattr(file, 'user.utracked') + os.getxattr(file, "user.utracked") except OSError: track_file(file, base_dir) untracked_files.append(file) diff --git a/ucloud/host/main.py b/ucloud/host/main.py index 904f26c..88dfb7c 100755 --- a/ucloud/host/main.py +++ b/ucloud/host/main.py @@ -15,49 +15,79 @@ from . import virtualmachine, logger def update_heartbeat(hostname): """Update Last HeartBeat Time for :param hostname: in etcd""" host_pool = shared.host_pool - this_host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) + this_host = next( + filter(lambda h: h.hostname == hostname, host_pool.hosts), None + ) while True: this_host.update_heartbeat() host_pool.put(this_host) time.sleep(10) -def maintenance(): +def maintenance(host): vmm = VMM() running_vms = vmm.discover() for vm_uuid in running_vms: - if vmm.is_running(vm_uuid) and vmm.get_status(vm_uuid) == 'running': - vm = shared.vm_pool.get(join_path(settings['etcd']['vm_prefix'], vm_uuid)) + if ( + vmm.is_running(vm_uuid) + and vmm.get_status(vm_uuid) == "running" + ): + vm = shared.vm_pool.get( + join_path(settings["etcd"]["vm_prefix"], vm_uuid) + ) vm.status = VMStatus.running + vm.vnc_socket = vmm.get_vnc(vm_uuid) + vm.hostname = host shared.vm_pool.put(vm) def main(hostname): host_pool = shared.host_pool - host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) - assert host is not None, "No such host with name = {}".format(hostname) + host = next( + filter(lambda h: h.hostname == hostname, host_pool.hosts), None + ) + assert host is not None, "No such host with name = {}".format( + hostname + ) try: - heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,)) + heartbeat_updating_process = mp.Process( + target=update_heartbeat, args=(hostname,) + ) heartbeat_updating_process.start() except Exception as e: - raise Exception('ucloud-host heartbeat updating mechanism is not working') from e + raise Exception( + "ucloud-host heartbeat updating mechanism is not working" + ) from e for events_iterator in [ - shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True), - shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], timeout=10, value_in_json=True), + shared.etcd_client.get_prefix( + settings["etcd"]["request_prefix"], value_in_json=True + ), + shared.etcd_client.watch_prefix( + settings["etcd"]["request_prefix"], + timeout=10, + value_in_json=True, + ), ]: for request_event in events_iterator: request_event = RequestEntry(request_event) if request_event.type == "TIMEOUT": - maintenance() + maintenance(host.key) if request_event.hostname == host.key: logger.debug("VM Request: %s", request_event) - shared.request_pool.client.client.delete(request_event.key) - vm_entry = shared.etcd_client.get(join_path(settings['etcd']['vm_prefix'], request_event.uuid)) + shared.request_pool.client.client.delete( + request_event.key + ) + vm_entry = shared.etcd_client.get( + join_path( + settings["etcd"]["vm_prefix"], + request_event.uuid, + ) + ) if vm_entry: vm = virtualmachine.VM(vm_entry) @@ -70,23 +100,35 @@ def main(hostname): elif request_event.type == RequestType.DeleteVM: vm.delete() - elif request_event.type == RequestType.InitVMMigration: + elif ( + request_event.type + == RequestType.InitVMMigration + ): vm.start(destination_host_key=host.key) elif request_event.type == RequestType.TransferVM: - host = host_pool.get(request_event.destination_host_key) + host = host_pool.get( + request_event.destination_host_key + ) if host: - vm.migrate(destination_host=host.hostname, - destination_sock_path=request_event.destination_sock_path) + vm.migrate( + destination_host=host.hostname, + destination_sock_path=request_event.destination_sock_path, + ) else: - logger.error('Host %s not found!', request_event.destination_host_key) + logger.error( + "Host %s not found!", + request_event.destination_host_key, + ) else: logger.info("VM Entry missing") if __name__ == "__main__": argparser = argparse.ArgumentParser() - argparser.add_argument("hostname", help="Name of this host. e.g uncloud1.ungleich.ch") + argparser.add_argument( + "hostname", help="Name of this host. e.g uncloud1.ungleich.ch" + ) args = argparser.parse_args() - mp.set_start_method('spawn') + mp.set_start_method("spawn") main(args.hostname) diff --git a/ucloud/host/qmp/__init__.py b/ucloud/host/qmp/__init__.py index 775b397..40ac3a4 100755 --- a/ucloud/host/qmp/__init__.py +++ b/ucloud/host/qmp/__init__.py @@ -26,10 +26,7 @@ LOG = logging.getLogger(__name__) # Mapping host architecture to any additional architectures it can # support which often includes its 32 bit cousin. -ADDITIONAL_ARCHES = { - "x86_64": "i386", - "aarch64": "armhf" -} +ADDITIONAL_ARCHES = {"x86_64": "i386", "aarch64": "armhf"} def kvm_available(target_arch=None): @@ -81,10 +78,17 @@ class QEMUMachine(object): # vm is guaranteed to be shut down here """ - def __init__(self, binary, args=None, wrapper=None, name=None, - test_dir="/var/tmp", monitor_address=None, - socket_scm_helper=None): - ''' + def __init__( + self, + binary, + args=None, + wrapper=None, + name=None, + test_dir="/var/tmp", + monitor_address=None, + socket_scm_helper=None, + ): + """ Initialize a QEMUMachine @param binary: path to the qemu binary @@ -95,7 +99,7 @@ class QEMUMachine(object): @param monitor_address: address for QMP monitor @param socket_scm_helper: helper program, required for send_fd_scm() @note: Qemu process is not started until launch() is used. - ''' + """ if args is None: args = [] if wrapper is None: @@ -109,7 +113,9 @@ class QEMUMachine(object): self._qemu_log_file = None self._popen = None self._binary = binary - self._args = list(args) # Force copy args in case we modify them + self._args = list( + args + ) # Force copy args in case we modify them self._wrapper = wrapper self._events = [] self._iolog = None @@ -137,26 +143,24 @@ class QEMUMachine(object): # This can be used to add an unused monitor instance. def add_monitor_null(self): - self._args.append('-monitor') - self._args.append('null') + self._args.append("-monitor") + self._args.append("null") - def add_fd(self, fd, fdset, opaque, opts=''): + def add_fd(self, fd, fdset, opaque, opts=""): """ Pass a file descriptor to the VM """ - options = ['fd=%d' % fd, - 'set=%d' % fdset, - 'opaque=%s' % opaque] + options = ["fd=%d" % fd, "set=%d" % fdset, "opaque=%s" % opaque] if opts: options.append(opts) # This did not exist before 3.4, but since then it is # mandatory for our purpose - if hasattr(os, 'set_inheritable'): + if hasattr(os, "set_inheritable"): os.set_inheritable(fd, True) - self._args.append('-add-fd') - self._args.append(','.join(options)) + self._args.append("-add-fd") + self._args.append(",".join(options)) return self # Exactly one of fd and file_path must be given. @@ -168,18 +172,21 @@ class QEMUMachine(object): if self._socket_scm_helper is None: raise QEMUMachineError("No path to socket_scm_helper set") if not os.path.exists(self._socket_scm_helper): - raise QEMUMachineError("%s does not exist" % - self._socket_scm_helper) + raise QEMUMachineError( + "%s does not exist" % self._socket_scm_helper + ) # This did not exist before 3.4, but since then it is # mandatory for our purpose - if hasattr(os, 'set_inheritable'): + if hasattr(os, "set_inheritable"): os.set_inheritable(self._qmp.get_sock_fd(), True) if fd is not None: os.set_inheritable(fd, True) - fd_param = ["%s" % self._socket_scm_helper, - "%d" % self._qmp.get_sock_fd()] + fd_param = [ + "%s" % self._socket_scm_helper, + "%d" % self._qmp.get_sock_fd(), + ] if file_path is not None: assert fd is None @@ -188,9 +195,14 @@ class QEMUMachine(object): assert fd is not None fd_param.append(str(fd)) - devnull = open(os.path.devnull, 'rb') - proc = subprocess.Popen(fd_param, stdin=devnull, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, close_fds=False) + devnull = open(os.path.devnull, "rb") + proc = subprocess.Popen( + fd_param, + stdin=devnull, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + close_fds=False, + ) output = proc.communicate()[0] if output: LOG.debug(output) @@ -231,24 +243,29 @@ class QEMUMachine(object): if isinstance(self._monitor_address, tuple): moncdev = "socket,id=mon,host=%s,port=%s" % ( self._monitor_address[0], - self._monitor_address[1]) + self._monitor_address[1], + ) else: - moncdev = 'socket,id=mon,path=%s' % self._vm_monitor - args = ['-chardev', moncdev, - '-mon', 'chardev=mon,mode=control'] + moncdev = "socket,id=mon,path=%s" % self._vm_monitor + args = ["-chardev", moncdev, "-mon", "chardev=mon,mode=control"] if self._machine is not None: - args.extend(['-machine', self._machine]) + args.extend(["-machine", self._machine]) if self._console_set: - self._console_address = os.path.join(self._temp_dir, - self._name + "-console.sock") - chardev = ('socket,id=console,path=%s,server,nowait' % - self._console_address) - args.extend(['-chardev', chardev]) + self._console_address = os.path.join( + self._temp_dir, self._name + "-console.sock" + ) + chardev = ( + "socket,id=console,path=%s,server,nowait" + % self._console_address + ) + args.extend(["-chardev", chardev]) if self._console_device_type is None: - args.extend(['-serial', 'chardev:console']) + args.extend(["-serial", "chardev:console"]) else: - device = '%s,chardev=console' % self._console_device_type - args.extend(['-device', device]) + device = ( + "%s,chardev=console" % self._console_device_type + ) + args.extend(["-device", device]) return args def _pre_launch(self): @@ -256,13 +273,17 @@ class QEMUMachine(object): if self._monitor_address is not None: self._vm_monitor = self._monitor_address else: - self._vm_monitor = os.path.join(self._temp_dir, - self._name + "-monitor.sock") - self._qemu_log_path = os.path.join(self._temp_dir, self._name + ".log") - self._qemu_log_file = open(self._qemu_log_path, 'wb') + self._vm_monitor = os.path.join( + self._temp_dir, self._name + "-monitor.sock" + ) + self._qemu_log_path = os.path.join( + self._temp_dir, self._name + ".log" + ) + self._qemu_log_file = open(self._qemu_log_path, "wb") - self._qmp = qmp.QEMUMonitorProtocol(self._vm_monitor, - server=True) + self._qmp = qmp.QEMUMonitorProtocol( + self._vm_monitor, server=True + ) def _post_launch(self): self._qmp.accept() @@ -289,7 +310,7 @@ class QEMUMachine(object): """ if self._launched: - raise QEMUMachineError('VM already launched') + raise QEMUMachineError("VM already launched") self._iolog = None self._qemu_full_args = None @@ -299,11 +320,11 @@ class QEMUMachine(object): except: self.shutdown() - LOG.debug('Error launching VM') + LOG.debug("Error launching VM") if self._qemu_full_args: - LOG.debug('Command: %r', ' '.join(self._qemu_full_args)) + LOG.debug("Command: %r", " ".join(self._qemu_full_args)) if self._iolog: - LOG.debug('Output: %r', self._iolog) + LOG.debug("Output: %r", self._iolog) raise Exception(self._iolog) raise @@ -311,17 +332,25 @@ class QEMUMachine(object): """ Launch the VM and establish a QMP connection """ - devnull = open(os.path.devnull, 'rb') + devnull = open(os.path.devnull, "rb") self._pre_launch() - self._qemu_full_args = (self._wrapper + [self._binary] + - self._base_args() + self._args) - LOG.debug('VM launch command: %r', ' '.join(self._qemu_full_args)) - self._popen = subprocess.Popen(self._qemu_full_args, - stdin=devnull, - stdout=self._qemu_log_file, - stderr=subprocess.STDOUT, - shell=False, - close_fds=False) + self._qemu_full_args = ( + self._wrapper + + [self._binary] + + self._base_args() + + self._args + ) + LOG.debug( + "VM launch command: %r", " ".join(self._qemu_full_args) + ) + self._popen = subprocess.Popen( + self._qemu_full_args, + stdin=devnull, + stdout=self._qemu_log_file, + stderr=subprocess.STDOUT, + shell=False, + close_fds=False, + ) self._post_launch() def wait(self): @@ -339,7 +368,7 @@ class QEMUMachine(object): """ if self.is_running(): try: - self._qmp.cmd('quit') + self._qmp.cmd("quit") self._qmp.close() except: self._popen.kill() @@ -350,11 +379,11 @@ class QEMUMachine(object): exitcode = self.exitcode() if exitcode is not None and exitcode < 0: - msg = 'qemu received signal %i: %s' + msg = "qemu received signal %i: %s" if self._qemu_full_args: - command = ' '.join(self._qemu_full_args) + command = " ".join(self._qemu_full_args) else: - command = '' + command = "" LOG.warn(msg, -exitcode, command) self._launched = False @@ -366,7 +395,7 @@ class QEMUMachine(object): qmp_args = dict() for key, value in args.items(): if conv_keys: - qmp_args[key.replace('_', '-')] = value + qmp_args[key.replace("_", "-")] = value else: qmp_args[key] = value @@ -427,7 +456,9 @@ class QEMUMachine(object): try: for key in match: if key in event: - if not QEMUMachine.event_match(event[key], match[key]): + if not QEMUMachine.event_match( + event[key], match[key] + ): return False else: return False @@ -458,8 +489,9 @@ class QEMUMachine(object): def _match(event): for name, match in events: - if (event['event'] == name and - self.event_match(event, match)): + if event["event"] == name and self.event_match( + event, match + ): return True return False @@ -531,7 +563,8 @@ class QEMUMachine(object): Returns a socket connected to the console """ if self._console_socket is None: - self._console_socket = socket.socket(socket.AF_UNIX, - socket.SOCK_STREAM) + self._console_socket = socket.socket( + socket.AF_UNIX, socket.SOCK_STREAM + ) self._console_socket.connect(self._console_address) return self._console_socket diff --git a/ucloud/host/qmp/qmp.py b/ucloud/host/qmp/qmp.py index bf35d71..ad187eb 100755 --- a/ucloud/host/qmp/qmp.py +++ b/ucloud/host/qmp/qmp.py @@ -32,7 +32,7 @@ class QMPTimeoutError(QMPError): class QEMUMonitorProtocol(object): #: Logger object for debugging messages - logger = logging.getLogger('QMP') + logger = logging.getLogger("QMP") #: Socket's error class error = socket.error #: Socket's timeout @@ -55,7 +55,9 @@ class QEMUMonitorProtocol(object): self.__sock = self.__get_sock() self.__sockfile = None if server: - self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.__sock.setsockopt( + socket.SOL_SOCKET, socket.SO_REUSEADDR, 1 + ) self.__sock.bind(self.__address) self.__sock.listen(1) @@ -71,7 +73,7 @@ class QEMUMonitorProtocol(object): if greeting is None or "QMP" not in greeting: raise QMPConnectError # Greeting seems ok, negotiate capabilities - resp = self.cmd('qmp_capabilities') + resp = self.cmd("qmp_capabilities") if "return" in resp: return greeting raise QMPCapabilitiesError @@ -82,7 +84,7 @@ class QEMUMonitorProtocol(object): if not data: return resp = json.loads(data) - if 'event' in resp: + if "event" in resp: self.logger.debug("<<< %s", resp) self.__events.append(resp) if not only_event: @@ -165,7 +167,7 @@ class QEMUMonitorProtocol(object): """ self.logger.debug(">>> %s", qmp_cmd) try: - self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8')) + self.__sock.sendall(json.dumps(qmp_cmd).encode("utf-8")) except socket.error as err: if err[0] == errno.EPIPE: return @@ -182,11 +184,11 @@ class QEMUMonitorProtocol(object): @param args: command arguments (dict) @param cmd_id: command id (dict, list, string or int) """ - qmp_cmd = {'execute': name} + qmp_cmd = {"execute": name} if args: - qmp_cmd['arguments'] = args + qmp_cmd["arguments"] = args if cmd_id: - qmp_cmd['id'] = cmd_id + qmp_cmd["id"] = cmd_id return self.cmd_obj(qmp_cmd) def command(self, cmd, **kwds): @@ -195,8 +197,8 @@ class QEMUMonitorProtocol(object): """ ret = self.cmd(cmd, kwds) if "error" in ret: - raise Exception(ret['error']['desc']) - return ret['return'] + raise Exception(ret["error"]["desc"]) + return ret["return"] def pull_event(self, wait=False): """ diff --git a/ucloud/host/virtualmachine.py b/ucloud/host/virtualmachine.py index db0f7b8..d795b3f 100755 --- a/ucloud/host/virtualmachine.py +++ b/ucloud/host/virtualmachine.py @@ -23,10 +23,6 @@ from ucloud.vmm import VMM from marshmallow import ValidationError -def maintenance(): - pass - - class VM: def __init__(self, vm_entry): self.schema = VMSchema() @@ -35,23 +31,30 @@ class VM: try: self.vm = self.schema.loads(vm_entry.value) except ValidationError: - logger.exception('Couldn\'t validate VM Entry', vm_entry.value) + logger.exception( + "Couldn't validate VM Entry", vm_entry.value + ) self.vm = None else: - self.uuid = vm_entry.key.split('/')[-1] - self.host_key = self.vm['hostname'] + self.uuid = vm_entry.key.split("/")[-1] + self.host_key = self.vm["hostname"] def get_qemu_args(self): command = ( - '-name {owner}_{name}' - ' -drive file={file},format=raw,if=virtio,cache=none' - ' -device virtio-rng-pci' - ' -m {memory} -smp cores={cores},threads={threads}' - ).format(owner=self.vm['owner'], name=self.vm['name'], - memory=int(self.vm['specs']['ram'].to_MB()), cores=self.vm['specs']['cpu'], - threads=1, file=shared.storage_handler.qemu_path_string(self.uuid)) + "-name {owner}_{name}" + " -drive file={file},format=raw,if=virtio,cache=none" + " -device virtio-rng-pci" + " -m {memory} -smp cores={cores},threads={threads}" + ).format( + owner=self.vm["owner"], + name=self.vm["name"], + memory=int(self.vm["specs"]["ram"].to_MB()), + cores=self.vm["specs"]["cpu"], + threads=1, + file=shared.storage_handler.qemu_path_string(self.uuid), + ) - return command.split(' ') + return command.split(" ") def start(self, destination_host_key=None): migration = False @@ -63,24 +66,34 @@ class VM: network_args = self.create_network_dev() except Exception as err: declare_stopped(self.vm) - self.vm['log'].append('Cannot Setup Network Properly') - logger.error('Cannot Setup Network Properly for vm %s', self.uuid, exc_info=err) + self.vm["log"].append("Cannot Setup Network Properly") + logger.error( + "Cannot Setup Network Properly for vm %s", + self.uuid, + exc_info=err, + ) else: - self.vmm.start(uuid=self.uuid, migration=migration, - *self.get_qemu_args(), *network_args) + self.vmm.start( + uuid=self.uuid, + migration=migration, + *self.get_qemu_args(), + *network_args + ) status = self.vmm.get_status(self.uuid) - if status == 'running': - self.vm['status'] = VMStatus.running - self.vm['vnc_socket'] = self.vmm.get_vnc(self.uuid) - elif status == 'inmigrate': + if status == "running": + self.vm["status"] = VMStatus.running + self.vm["vnc_socket"] = self.vmm.get_vnc(self.uuid) + elif status == "inmigrate": r = RequestEntry.from_scratch( type=RequestType.TransferVM, # Transfer VM hostname=self.host_key, # Which VM should get this request. It is source host uuid=self.uuid, # uuid of VM - destination_sock_path=join_path(self.vmm.socket_dir, self.uuid), + destination_sock_path=join_path( + self.vmm.socket_dir, self.uuid + ), destination_host_key=destination_host_key, # Where source host transfer VM - request_prefix=settings['etcd']['request_prefix'] + request_prefix=settings["etcd"]["request_prefix"], ) shared.request_pool.put(r) else: @@ -96,15 +109,22 @@ class VM: self.sync() def migrate(self, destination_host, destination_sock_path): - self.vmm.transfer(src_uuid=self.uuid, destination_sock_path=destination_sock_path, - host=destination_host) + self.vmm.transfer( + src_uuid=self.uuid, + destination_sock_path=destination_sock_path, + host=destination_host, + ) def create_network_dev(self): - command = '' - for network_mac_and_tap in self.vm['network']: + command = "" + for network_mac_and_tap in self.vm["network"]: network_name, mac, tap = network_mac_and_tap - _key = os.path.join(settings['etcd']['network_prefix'], self.vm['owner'], network_name) + _key = os.path.join( + settings["etcd"]["network_prefix"], + self.vm["owner"], + network_name, + ) network = shared.etcd_client.get(_key, value_in_json=True) network_schema = NetworkSchema() try: @@ -112,49 +132,64 @@ class VM: except ValidationError: continue - if network['type'] == "vxlan": - tap = create_vxlan_br_tap(_id=network['id'], - _dev=settings['network']['vxlan_phy_dev'], - tap_id=tap, - ip=network['ipv6']) + if network["type"] == "vxlan": + tap = create_vxlan_br_tap( + _id=network["id"], + _dev=settings["network"]["vxlan_phy_dev"], + tap_id=tap, + ip=network["ipv6"], + ) - all_networks = shared.etcd_client.get_prefix(settings['etcd']['network_prefix'], - value_in_json=True) + all_networks = shared.etcd_client.get_prefix( + settings["etcd"]["network_prefix"], + value_in_json=True, + ) - if ipaddress.ip_network(network['ipv6']).is_global: + if ipaddress.ip_network(network["ipv6"]).is_global: update_radvd_conf(all_networks) - command += '-netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no' \ - ' -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}' \ - .format(tap=tap, net_id=network['id'], mac=mac) + command += ( + "-netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" + " -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}".format( + tap=tap, net_id=network["id"], mac=mac + ) + ) - return command.split(' ') + return command.split(" ") def delete_network_dev(self): try: - for network in self.vm['network']: + for network in self.vm["network"]: network_name = network[0] _ = network[1] # tap_mac tap_id = network[2] - delete_network_interface('tap{}'.format(tap_id)) + delete_network_interface("tap{}".format(tap_id)) - owners_vms = shared.vm_pool.by_owner(self.vm['owner']) - owners_running_vms = shared.vm_pool.by_status(VMStatus.running, - _vms=owners_vms) + owners_vms = shared.vm_pool.by_owner(self.vm["owner"]) + owners_running_vms = shared.vm_pool.by_status( + VMStatus.running, _vms=owners_vms + ) networks = map( - lambda n: n[0], map(lambda vm: vm.network, owners_running_vms) + lambda n: n[0], + map(lambda vm: vm.network, owners_running_vms), ) networks_in_use_by_user_vms = [vm[0] for vm in networks] if network_name not in networks_in_use_by_user_vms: - network_entry = resolve_network(network[0], self.vm['owner']) + network_entry = resolve_network( + network[0], self.vm["owner"] + ) if network_entry: network_type = network_entry.value["type"] network_id = network_entry.value["id"] if network_type == "vxlan": - delete_network_interface('br{}'.format(network_id)) - delete_network_interface('vxlan{}'.format(network_id)) + delete_network_interface( + "br{}".format(network_id) + ) + delete_network_interface( + "vxlan{}".format(network_id) + ) except Exception: logger.exception("Exception in network interface deletion") @@ -163,15 +198,21 @@ class VM: # File Already exists. No Problem Continue logger.debug("Image for vm %s exists", self.uuid) else: - if shared.storage_handler.make_vm_image(src=self.vm['image_uuid'], dest=self.uuid): - if not shared.storage_handler.resize_vm_image(path=self.uuid, - size=int(self.vm['specs']['os-ssd'].to_MB())): - self.vm['status'] = VMStatus.error + if shared.storage_handler.make_vm_image( + src=self.vm["image_uuid"], dest=self.uuid + ): + if not shared.storage_handler.resize_vm_image( + path=self.uuid, + size=int(self.vm["specs"]["os-ssd"].to_MB()), + ): + self.vm["status"] = VMStatus.error else: logger.info("New VM Created") def sync(self): - shared.etcd_client.put(self.key, self.schema.dump(self.vm), value_in_json=True) + shared.etcd_client.put( + self.key, self.schema.dump(self.vm), value_in_json=True + ) def delete(self): self.stop() @@ -186,50 +227,77 @@ class VM: def resolve_network(network_name, network_owner): network = shared.etcd_client.get( - join_path(settings['etcd']['network_prefix'], network_owner, network_name), value_in_json=True + join_path( + settings["etcd"]["network_prefix"], + network_owner, + network_name, + ), + value_in_json=True, ) return network def create_vxlan_br_tap(_id, _dev, tap_id, ip=None): - network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') - vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'), - _id=_id, dev=_dev) + network_script_base = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "network" + ) + vxlan = create_dev( + script=os.path.join(network_script_base, "create-vxlan.sh"), + _id=_id, + dev=_dev, + ) if vxlan: - bridge = create_dev(script=os.path.join(network_script_base, 'create-bridge.sh'), - _id=_id, dev=vxlan, ip=ip) + bridge = create_dev( + script=os.path.join( + network_script_base, "create-bridge.sh" + ), + _id=_id, + dev=vxlan, + ip=ip, + ) if bridge: - tap = create_dev(script=os.path.join(network_script_base, 'create-tap.sh'), - _id=str(tap_id), dev=bridge) + tap = create_dev( + script=os.path.join( + network_script_base, "create-tap.sh" + ), + _id=str(tap_id), + dev=bridge, + ) if tap: return tap def update_radvd_conf(all_networks): - network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') + network_script_base = os.path.join( + os.path.dirname(os.path.dirname(__file__)), "network" + ) networks = { - net.value['ipv6']: net.value['id'] + net.value["ipv6"]: net.value["id"] for net in all_networks - if net.value.get('ipv6') and ipaddress.ip_network(net.value.get('ipv6')).is_global + if net.value.get("ipv6") + and ipaddress.ip_network(net.value.get("ipv6")).is_global } - radvd_template = open(os.path.join(network_script_base, - 'radvd-template.conf'), 'r').read() + radvd_template = open( + os.path.join(network_script_base, "radvd-template.conf"), "r" + ).read() radvd_template = Template(radvd_template) content = [ radvd_template.safe_substitute( - bridge='br{}'.format(networks[net]), - prefix=net + bridge="br{}".format(networks[net]), prefix=net ) - for net in networks if networks.get(net) + for net in networks + if networks.get(net) ] - with open('/etc/radvd.conf', 'w') as radvd_conf: + with open("/etc/radvd.conf", "w") as radvd_conf: radvd_conf.writelines(content) try: - sp.check_output(['systemctl', 'restart', 'radvd']) + sp.check_output(["systemctl", "restart", "radvd"]) except sp.CalledProcessError: try: - sp.check_output(['service', 'radvd', 'restart']) + sp.check_output(["service", "radvd", "restart"]) except sp.CalledProcessError as err: - raise err.__class__('Cannot start/restart radvd service', err.cmd) from err + raise err.__class__( + "Cannot start/restart radvd service", err.cmd + ) from err diff --git a/ucloud/imagescanner/main.py b/ucloud/imagescanner/main.py index e215c88..e1960bc 100755 --- a/ucloud/imagescanner/main.py +++ b/ucloud/imagescanner/main.py @@ -9,7 +9,13 @@ from ucloud.imagescanner import logger def qemu_img_type(path): - qemu_img_info_command = ["qemu-img", "info", "--output", "json", path] + qemu_img_info_command = [ + "qemu-img", + "info", + "--output", + "json", + path, + ] try: qemu_img_info = sp.check_output(qemu_img_info_command) except Exception as e: @@ -22,32 +28,57 @@ def qemu_img_type(path): def main(): # We want to get images entries that requests images to be created - images = shared.etcd_client.get_prefix(settings['etcd']['image_prefix'], value_in_json=True) - images_to_be_created = list(filter(lambda im: im.value['status'] == 'TO_BE_CREATED', images)) + images = shared.etcd_client.get_prefix( + settings["etcd"]["image_prefix"], value_in_json=True + ) + images_to_be_created = list( + filter(lambda im: im.value["status"] == "TO_BE_CREATED", images) + ) for image in images_to_be_created: try: - image_uuid = image.key.split('/')[-1] - image_owner = image.value['owner'] - image_filename = image.value['filename'] - image_store_name = image.value['store_name'] - image_full_path = join_path(settings['storage']['file_dir'], image_owner, image_filename) + image_uuid = image.key.split("/")[-1] + image_owner = image.value["owner"] + image_filename = image.value["filename"] + image_store_name = image.value["store_name"] + image_full_path = join_path( + settings["storage"]["file_dir"], + image_owner, + image_filename, + ) - image_stores = shared.etcd_client.get_prefix(settings['etcd']['image_store_prefix'], - value_in_json=True) - user_image_store = next(filter( - lambda s, store_name=image_store_name: s.value["name"] == store_name, - image_stores - )) + image_stores = shared.etcd_client.get_prefix( + settings["etcd"]["image_store_prefix"], + value_in_json=True, + ) + user_image_store = next( + filter( + lambda s, store_name=image_store_name: s.value[ + "name" + ] + == store_name, + image_stores, + ) + ) - image_store_pool = user_image_store.value['attributes']['pool'] + image_store_pool = user_image_store.value["attributes"][ + "pool" + ] except Exception as e: logger.exception(e) else: # At least our basic data is available - qemu_img_convert_command = ["qemu-img", "convert", "-f", "qcow2", - "-O", "raw", image_full_path, "image.raw"] + qemu_img_convert_command = [ + "qemu-img", + "convert", + "-f", + "qcow2", + "-O", + "raw", + image_full_path, + "image.raw", + ] if qemu_img_type(image_full_path) == "qcow2": try: @@ -55,16 +86,20 @@ def main(): sp.check_output(qemu_img_convert_command,) except sp.CalledProcessError: - logger.exception('Image convertion from .qcow2 to .raw failed.') + logger.exception( + "Image convertion from .qcow2 to .raw failed." + ) else: # Import and Protect - r_status = shared.storage_handler.import_image(src="image.raw", - dest=image_uuid, - protect=True) + r_status = shared.storage_handler.import_image( + src="image.raw", dest=image_uuid, protect=True + ) if r_status: # Everything is successfully done image.value["status"] = "CREATED" - shared.etcd_client.put(image.key, json.dumps(image.value)) + shared.etcd_client.put( + image.key, json.dumps(image.value) + ) finally: try: os.remove("image.raw") @@ -74,7 +109,9 @@ def main(): else: # The user provided image is either not found or of invalid format image.value["status"] = "INVALID_IMAGE" - shared.etcd_client.put(image.key, json.dumps(image.value)) + shared.etcd_client.put( + image.key, json.dumps(image.value) + ) if __name__ == "__main__": diff --git a/ucloud/metadata/main.py b/ucloud/metadata/main.py index adec9e7..2974e33 100644 --- a/ucloud/metadata/main.py +++ b/ucloud/metadata/main.py @@ -21,33 +21,39 @@ def handle_exception(e): return e # now you're handling non-HTTP exceptions only - return {'message': 'Server Error'}, 500 + return {"message": "Server Error"}, 500 def get_vm_entry(mac_addr): - return next(filter(lambda vm: mac_addr in list(zip(*vm.network))[1], shared.vm_pool.vms), None) + return next( + filter( + lambda vm: mac_addr in list(zip(*vm.network))[1], + shared.vm_pool.vms, + ), + None, + ) # https://stackoverflow.com/questions/37140846/how-to-convert-ipv6-link-local-address-to-mac-address-in-python def ipv62mac(ipv6): # remove subnet info if given - subnet_index = ipv6.find('/') + subnet_index = ipv6.find("/") if subnet_index != -1: ipv6 = ipv6[:subnet_index] - ipv6_parts = ipv6.split(':') + ipv6_parts = ipv6.split(":") mac_parts = list() for ipv6_part in ipv6_parts[-4:]: while len(ipv6_part) < 4: - ipv6_part = '0' + ipv6_part + ipv6_part = "0" + ipv6_part mac_parts.append(ipv6_part[:2]) mac_parts.append(ipv6_part[-2:]) # modify parts to match MAC value - mac_parts[0] = '%02x' % (int(mac_parts[0], 16) ^ 2) + mac_parts[0] = "%02x" % (int(mac_parts[0], 16) ^ 2) del mac_parts[4] del mac_parts[3] - return ':'.join(mac_parts) + return ":".join(mac_parts) class Root(Resource): @@ -56,19 +62,27 @@ class Root(Resource): data = get_vm_entry(ipv62mac(request.remote_addr)) if not data: - return {'message': 'Metadata for such VM does not exists.'}, 404 + return ( + {"message": "Metadata for such VM does not exists."}, + 404, + ) else: - etcd_key = os.path.join(settings['etcd']['user_prefix'], - data.value['owner_realm'], - data.value['owner'], 'key') - etcd_entry = shared.etcd_client.get_prefix(etcd_key, value_in_json=True) + etcd_key = os.path.join( + settings["etcd"]["user_prefix"], + data.value["owner_realm"], + data.value["owner"], + "key", + ) + etcd_entry = shared.etcd_client.get_prefix( + etcd_key, value_in_json=True + ) user_personal_ssh_keys = [key.value for key in etcd_entry] - data.value['metadata']['ssh-keys'] += user_personal_ssh_keys - return data.value['metadata'], 200 + data.value["metadata"]["ssh-keys"] += user_personal_ssh_keys + return data.value["metadata"], 200 @staticmethod def post(): - return {'message': 'Previous Implementation is deprecated.'} + return {"message": "Previous Implementation is deprecated."} # data = etcd_client.get("/v1/metadata/{}".format(request.remote_addr), value_in_json=True) # print(data) # if data: @@ -94,12 +108,12 @@ class Root(Resource): # data, value_in_json=True) -api.add_resource(Root, '/') +api.add_resource(Root, "/") def main(): app.run(debug=True, host="::", port="80") -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/ucloud/scheduler/__init__.py b/ucloud/scheduler/__init__.py index 95e1be0..eea436a 100644 --- a/ucloud/scheduler/__init__.py +++ b/ucloud/scheduler/__init__.py @@ -1,3 +1,3 @@ import logging -logger = logging.getLogger(__name__) \ No newline at end of file +logger = logging.getLogger(__name__) diff --git a/ucloud/scheduler/helper.py b/ucloud/scheduler/helper.py index 0e9ef73..2fb7a22 100755 --- a/ucloud/scheduler/helper.py +++ b/ucloud/scheduler/helper.py @@ -24,17 +24,35 @@ def remaining_resources(host_specs, vms_specs): for component in _vms_specs: if isinstance(_vms_specs[component], str): - _vms_specs[component] = int(bitmath.parse_string_unsafe(_vms_specs[component]).to_MB()) + _vms_specs[component] = int( + bitmath.parse_string_unsafe( + _vms_specs[component] + ).to_MB() + ) elif isinstance(_vms_specs[component], list): - _vms_specs[component] = map(lambda x: int(bitmath.parse_string_unsafe(x).to_MB()), _vms_specs[component]) - _vms_specs[component] = reduce(lambda x, y: x + y, _vms_specs[component], 0) + _vms_specs[component] = map( + lambda x: int(bitmath.parse_string_unsafe(x).to_MB()), + _vms_specs[component], + ) + _vms_specs[component] = reduce( + lambda x, y: x + y, _vms_specs[component], 0 + ) for component in _remaining: if isinstance(_remaining[component], str): - _remaining[component] = int(bitmath.parse_string_unsafe(_remaining[component]).to_MB()) + _remaining[component] = int( + bitmath.parse_string_unsafe( + _remaining[component] + ).to_MB() + ) elif isinstance(_remaining[component], list): - _remaining[component] = map(lambda x: int(bitmath.parse_string_unsafe(x).to_MB()), _remaining[component]) - _remaining[component] = reduce(lambda x, y: x + y, _remaining[component], 0) + _remaining[component] = map( + lambda x: int(bitmath.parse_string_unsafe(x).to_MB()), + _remaining[component], + ) + _remaining[component] = reduce( + lambda x, y: x + y, _remaining[component], 0 + ) _remaining.subtract(_vms_specs) @@ -59,11 +77,15 @@ def get_suitable_host(vm_specs, hosts=None): running_vms_specs = [vm.specs for vm in vms] # Accumulate all of their combined specs - running_vms_accumulated_specs = accumulated_specs(running_vms_specs) + running_vms_accumulated_specs = accumulated_specs( + running_vms_specs + ) # Find out remaining resources after # host_specs - already running vm_specs - remaining = remaining_resources(host.specs, running_vms_accumulated_specs) + remaining = remaining_resources( + host.specs, running_vms_accumulated_specs + ) # Find out remaining - new_vm_specs remaining = remaining_resources(remaining, vm_specs) @@ -95,7 +117,7 @@ def dead_host_mitigation(dead_hosts_keys): vms_hosted_on_dead_host = shared.vm_pool.by_host(host_key) for vm in vms_hosted_on_dead_host: - vm.status = 'UNKNOWN' + vm.status = "UNKNOWN" shared.vm_pool.put(vm) shared.host_pool.put(host) @@ -104,10 +126,12 @@ def assign_host(vm): vm.hostname = get_suitable_host(vm.specs) shared.vm_pool.put(vm) - r = RequestEntry.from_scratch(type=RequestType.StartVM, - uuid=vm.uuid, - hostname=vm.hostname, - request_prefix=settings['etcd']['request_prefix']) + r = RequestEntry.from_scratch( + type=RequestType.StartVM, + uuid=vm.uuid, + hostname=vm.hostname, + request_prefix=settings["etcd"]["request_prefix"], + ) shared.request_pool.put(r) vm.log.append("VM scheduled for starting") diff --git a/ucloud/scheduler/main.py b/ucloud/scheduler/main.py index d91979f..7ee75e0 100755 --- a/ucloud/scheduler/main.py +++ b/ucloud/scheduler/main.py @@ -7,8 +7,13 @@ from ucloud.common.request import RequestEntry, RequestType from ucloud.shared import shared from ucloud.settings import settings -from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection, - assign_host, NoSuitableHostFound) +from .helper import ( + get_suitable_host, + dead_host_mitigation, + dead_host_detection, + assign_host, + NoSuitableHostFound, +) from . import logger @@ -16,8 +21,14 @@ def main(): pending_vms = [] for request_iterator in [ - shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True), - shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], timeout=5, value_in_json=True), + shared.etcd_client.get_prefix( + settings["etcd"]["request_prefix"], value_in_json=True + ), + shared.etcd_client.watch_prefix( + settings["etcd"]["request_prefix"], + timeout=5, + value_in_json=True, + ), ]: for request_event in request_iterator: request_entry = RequestEntry(request_event) @@ -41,25 +52,39 @@ def main(): # on our behalf. while pending_vms: pending_vm_entry = pending_vms.pop() - r = RequestEntry.from_scratch(type="ScheduleVM", - uuid=pending_vm_entry.uuid, - hostname=pending_vm_entry.hostname, - request_prefix=settings['etcd']['request_prefix']) + r = RequestEntry.from_scratch( + type="ScheduleVM", + uuid=pending_vm_entry.uuid, + hostname=pending_vm_entry.hostname, + request_prefix=settings["etcd"][ + "request_prefix" + ], + ) shared.request_pool.put(r) elif request_entry.type == RequestType.ScheduleVM: - logger.debug("%s, %s", request_entry.key, request_entry.value) + logger.debug( + "%s, %s", request_entry.key, request_entry.value + ) vm_entry = shared.vm_pool.get(request_entry.uuid) if vm_entry is None: - logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid)) + logger.info( + "Trying to act on {} but it is deleted".format( + request_entry.uuid + ) + ) continue - shared.etcd_client.client.delete(request_entry.key) # consume Request + shared.etcd_client.client.delete( + request_entry.key + ) # consume Request try: assign_host(vm_entry) except NoSuitableHostFound: - vm_entry.add_log("Can't schedule VM. No Resource Left.") + vm_entry.add_log( + "Can't schedule VM. No Resource Left." + ) shared.vm_pool.put(vm_entry) pending_vms.append(vm_entry) diff --git a/ucloud/scheduler/tests/test_basics.py b/ucloud/scheduler/tests/test_basics.py index 92b3a83..68bd8ec 100755 --- a/ucloud/scheduler/tests/test_basics.py +++ b/ucloud/scheduler/tests/test_basics.py @@ -70,9 +70,15 @@ class TestFunctions(unittest.TestCase): "last_heartbeat": datetime.utcnow().isoformat(), } with self.client.client.lock("lock"): - self.client.put(f"{self.host_prefix}/1", host1, value_in_json=True) - self.client.put(f"{self.host_prefix}/2", host2, value_in_json=True) - self.client.put(f"{self.host_prefix}/3", host3, value_in_json=True) + self.client.put( + f"{self.host_prefix}/1", host1, value_in_json=True + ) + self.client.put( + f"{self.host_prefix}/2", host2, value_in_json=True + ) + self.client.put( + f"{self.host_prefix}/3", host3, value_in_json=True + ) def create_vms(self): vm1 = json.dumps( @@ -146,15 +152,17 @@ class TestFunctions(unittest.TestCase): {"cpu": 8, "ram": 32}, ] self.assertEqual( - accumulated_specs(vms), {"ssd": 10, "cpu": 16, "ram": 48, "hdd": 10} + accumulated_specs(vms), + {"ssd": 10, "cpu": 16, "ram": 48, "hdd": 10}, ) def test_remaining_resources(self): host_specs = {"ssd": 10, "cpu": 16, "ram": 48, "hdd": 10} vms_specs = {"ssd": 10, "cpu": 32, "ram": 12, "hdd": 0} resultant_specs = {"ssd": 0, "cpu": -16, "ram": 36, "hdd": 10} - self.assertEqual(remaining_resources(host_specs, vms_specs), - resultant_specs) + self.assertEqual( + remaining_resources(host_specs, vms_specs), resultant_specs + ) def test_vmpool(self): self.p.join(1) @@ -167,7 +175,12 @@ class TestFunctions(unittest.TestCase): f"{self.vm_prefix}/1", { "owner": "meow", - "specs": {"cpu": 4, "ram": 8, "hdd": 100, "sdd": 256}, + "specs": { + "cpu": 4, + "ram": 8, + "hdd": 100, + "sdd": 256, + }, "hostname": f"{self.host_prefix}/3", "status": "SCHEDULED_DEPLOY", }, @@ -182,7 +195,12 @@ class TestFunctions(unittest.TestCase): f"{self.vm_prefix}/7", { "owner": "meow", - "specs": {"cpu": 10, "ram": 22, "hdd": 146, "sdd": 0}, + "specs": { + "cpu": 10, + "ram": 22, + "hdd": 146, + "sdd": 0, + }, "hostname": "", "status": "REQUESTED_NEW", }, @@ -197,7 +215,12 @@ class TestFunctions(unittest.TestCase): f"{self.vm_prefix}/7", { "owner": "meow", - "specs": {"cpu": 10, "ram": 22, "hdd": 146, "sdd": 0}, + "specs": { + "cpu": 10, + "ram": 22, + "hdd": 146, + "sdd": 0, + }, "hostname": "", "status": "REQUESTED_NEW", }, diff --git a/ucloud/scheduler/tests/test_dead_host_mechanism.py b/ucloud/scheduler/tests/test_dead_host_mechanism.py index 0b403ef..466b9ee 100755 --- a/ucloud/scheduler/tests/test_dead_host_mechanism.py +++ b/ucloud/scheduler/tests/test_dead_host_mechanism.py @@ -6,11 +6,7 @@ from os.path import dirname BASE_DIR = dirname(dirname(__file__)) sys.path.insert(0, BASE_DIR) -from main import ( - dead_host_detection, - dead_host_mitigation, - config -) +from main import dead_host_detection, dead_host_mitigation, config class TestDeadHostMechanism(unittest.TestCase): @@ -52,13 +48,23 @@ class TestDeadHostMechanism(unittest.TestCase): "last_heartbeat": datetime(2011, 1, 1).isoformat(), } with self.client.client.lock("lock"): - self.client.put(f"{self.host_prefix}/1", host1, value_in_json=True) - self.client.put(f"{self.host_prefix}/2", host2, value_in_json=True) - self.client.put(f"{self.host_prefix}/3", host3, value_in_json=True) - self.client.put(f"{self.host_prefix}/4", host4, value_in_json=True) + self.client.put( + f"{self.host_prefix}/1", host1, value_in_json=True + ) + self.client.put( + f"{self.host_prefix}/2", host2, value_in_json=True + ) + self.client.put( + f"{self.host_prefix}/3", host3, value_in_json=True + ) + self.client.put( + f"{self.host_prefix}/4", host4, value_in_json=True + ) def test_dead_host_detection(self): - hosts = self.client.get_prefix(self.host_prefix, value_in_json=True) + hosts = self.client.get_prefix( + self.host_prefix, value_in_json=True + ) deads = dead_host_detection(hosts) self.assertEqual(deads, ["/test/host/2", "/test/host/3"]) return deads @@ -66,7 +72,9 @@ class TestDeadHostMechanism(unittest.TestCase): def test_dead_host_mitigation(self): deads = self.test_dead_host_detection() dead_host_mitigation(self.client, deads) - hosts = self.client.get_prefix(self.host_prefix, value_in_json=True) + hosts = self.client.get_prefix( + self.host_prefix, value_in_json=True + ) deads = dead_host_detection(hosts) self.assertEqual(deads, []) diff --git a/ucloud/settings/__init__.py b/ucloud/settings/__init__.py index f9b358e..906e857 100644 --- a/ucloud/settings/__init__.py +++ b/ucloud/settings/__init__.py @@ -14,18 +14,22 @@ class CustomConfigParser(configparser.RawConfigParser): result = super().__getitem__(key) except KeyError as err: raise KeyError( - 'Key \'{}\' not found in configuration. Make sure you configure ucloud.'.format(key) + "Key '{}' not found in configuration. Make sure you configure ucloud.".format( + key + ) ) from err else: return result class Settings(object): - def __init__(self, config_key='/uncloud/config/'): - conf_name = 'ucloud.conf' - conf_dir = os.environ.get('UCLOUD_CONF_DIR', os.path.expanduser('~/ucloud/')) + def __init__(self, config_key="/uncloud/config/"): + conf_name = "ucloud.conf" + conf_dir = os.environ.get( + "UCLOUD_CONF_DIR", os.path.expanduser("~/ucloud/") + ) self.config_file = os.path.join(conf_dir, conf_name) - + self.config_parser = CustomConfigParser(allow_no_value=True) self.config_key = config_key @@ -33,43 +37,55 @@ class Settings(object): try: self.config_parser.read(self.config_file) except Exception as err: - logger.error('%s', err) + logger.error("%s", err) def get_etcd_client(self): args = tuple() try: kwargs = { - 'host': self.config_parser.get('etcd', 'url'), - 'port': self.config_parser.get('etcd', 'port'), - 'ca_cert': self.config_parser.get('etcd', 'ca_cert'), - 'cert_cert': self.config_parser.get('etcd', 'cert_cert'), - 'cert_key': self.config_parser.get('etcd', 'cert_key') + "host": self.config_parser.get("etcd", "url"), + "port": self.config_parser.get("etcd", "port"), + "ca_cert": self.config_parser.get("etcd", "ca_cert"), + "cert_cert": self.config_parser.get( + "etcd", "cert_cert" + ), + "cert_key": self.config_parser.get("etcd", "cert_key"), } except configparser.Error as err: - raise configparser.Error('{} in config file {}'.format(err.message, self.config_file)) from err + raise configparser.Error( + "{} in config file {}".format( + err.message, self.config_file + ) + ) from err else: try: wrapper = Etcd3Wrapper(*args, **kwargs) except Exception as err: - logger.error('etcd connection not successfull. Please check your config file.' - '\nDetails: %s\netcd connection parameters: %s', err, kwargs) + logger.error( + "etcd connection not successfull. Please check your config file." + "\nDetails: %s\netcd connection parameters: %s", + err, + kwargs, + ) sys.exit(1) else: return wrapper - + def read_internal_values(self): - self.config_parser.read_dict({ - 'etcd': { - 'file_prefix': '/files/', - 'host_prefix': '/hosts/', - 'image_prefix': '/images/', - 'image_store_prefix': '/imagestore/', - 'network_prefix': '/networks/', - 'request_prefix': '/requests/', - 'user_prefix': '/users/', - 'vm_prefix': '/vms/', + self.config_parser.read_dict( + { + "etcd": { + "file_prefix": "/files/", + "host_prefix": "/hosts/", + "image_prefix": "/images/", + "image_store_prefix": "/imagestore/", + "network_prefix": "/networks/", + "request_prefix": "/requests/", + "user_prefix": "/users/", + "vm_prefix": "/vms/", + } } - }) + ) def read_config_file_values(self, config_file): try: @@ -77,18 +93,26 @@ class Settings(object): with open(config_file, "r") as config_file_handle: self.config_parser.read_file(config_file_handle) except FileNotFoundError: - sys.exit('Configuration file {} not found!'.format(config_file)) + sys.exit( + "Configuration file {} not found!".format(config_file) + ) except Exception as err: logger.exception(err) sys.exit("Error occurred while reading configuration file") def read_values_from_etcd(self): etcd_client = self.get_etcd_client() - config_from_etcd = etcd_client.get(self.config_key, value_in_json=True) + config_from_etcd = etcd_client.get( + self.config_key, value_in_json=True + ) if config_from_etcd: self.config_parser.read_dict(config_from_etcd.value) else: - raise KeyError("Key '{}' not found in etcd. Please configure ucloud.".format(self.config_key)) + raise KeyError( + "Key '{}' not found in etcd. Please configure ucloud.".format( + self.config_key + ) + ) def __getitem__(self, key): self.read_values_from_etcd() diff --git a/ucloud/shared/__init__.py b/ucloud/shared/__init__.py index 7a296e9..294e34a 100644 --- a/ucloud/shared/__init__.py +++ b/ucloud/shared/__init__.py @@ -12,15 +12,19 @@ class Shared: @property def host_pool(self): - return HostPool(self.etcd_client, settings['etcd']['host_prefix']) + return HostPool( + self.etcd_client, settings["etcd"]["host_prefix"] + ) @property def vm_pool(self): - return VmPool(self.etcd_client, settings['etcd']['vm_prefix']) + return VmPool(self.etcd_client, settings["etcd"]["vm_prefix"]) @property def request_pool(self): - return RequestPool(self.etcd_client, settings['etcd']['request_prefix']) + return RequestPool( + self.etcd_client, settings["etcd"]["request_prefix"] + ) @property def storage_handler(self): diff --git a/ucloud/vmm/__init__.py b/ucloud/vmm/__init__.py index 9f9f5f9..3d3c304 100644 --- a/ucloud/vmm/__init__.py +++ b/ucloud/vmm/__init__.py @@ -37,7 +37,9 @@ class VMQMPHandles: self.sock.close() if exc_type: - logger.error('Couldn\'t get handle for VM.', exc_type, exc_val, exc_tb) + logger.error( + "Couldn't get handle for VM.", exc_type, exc_val, exc_tb + ) raise exc_type("Couldn't get handle for VM.") from exc_type @@ -54,29 +56,46 @@ class TransferVM(Process): with suppress(FileNotFoundError): os.remove(self.src_sock_path) - command = ['ssh', '-nNT', '-L', '{}:{}'.format(self.src_sock_path, self.dest_sock_path), - 'root@{}'.format(self.host)] + command = [ + "ssh", + "-nNT", + "-L", + "{}:{}".format(self.src_sock_path, self.dest_sock_path), + "root@{}".format(self.host), + ] try: p = sp.Popen(command) except Exception as e: - logger.error('Couldn\' forward unix socks over ssh.', exc_info=e) + logger.error( + "Couldn' forward unix socks over ssh.", exc_info=e + ) else: time.sleep(2) vmm = VMM() - logger.debug('Executing: ssh forwarding command: %s', command) - vmm.execute_command(self.src_uuid, command='migrate', - arguments={'uri': 'unix:{}'.format(self.src_sock_path)}) + logger.debug( + "Executing: ssh forwarding command: %s", command + ) + vmm.execute_command( + self.src_uuid, + command="migrate", + arguments={"uri": "unix:{}".format(self.src_sock_path)}, + ) while p.poll() is None: - success, output = vmm.execute_command(self.src_uuid, command='query-migrate') + success, output = vmm.execute_command( + self.src_uuid, command="query-migrate" + ) if success: - status = output['return']['status'] - if status != 'active': - print('Migration Status: ', status) + status = output["return"]["status"] + + if status != "active": + print("Migration Status: ", status) + if status == "completed": + vmm.stop(self.src_uuid) return else: - print('Migration Status: ', status) + print("Migration Status: ", status) else: return time.sleep(0.2) @@ -84,18 +103,29 @@ class TransferVM(Process): class VMM: # Virtual Machine Manager - def __init__(self, qemu_path='/usr/bin/qemu-system-x86_64', - vmm_backend=os.path.expanduser('~/ucloud/vmm/')): + def __init__( + self, + qemu_path="/usr/bin/qemu-system-x86_64", + vmm_backend=os.path.expanduser("~/ucloud/vmm/"), + ): self.qemu_path = qemu_path self.vmm_backend = vmm_backend - self.socket_dir = os.path.join(self.vmm_backend, 'sock') + self.socket_dir = os.path.join(self.vmm_backend, "sock") if not os.path.isdir(self.vmm_backend): - logger.info('{} does not exists. Creating it...'.format(self.vmm_backend)) + logger.info( + "{} does not exists. Creating it...".format( + self.vmm_backend + ) + ) os.makedirs(self.vmm_backend, exist_ok=True) if not os.path.isdir(self.socket_dir): - logger.info('{} does not exists. Creating it...'.format(self.socket_dir)) + logger.info( + "{} does not exists. Creating it...".format( + self.socket_dir + ) + ) os.makedirs(self.socket_dir, exist_ok=True) def is_running(self, uuid): @@ -106,8 +136,12 @@ class VMM: recv = sock.recv(4096) except Exception as err: # unix sock doesn't exists or it is closed - logger.debug('VM {} sock either don\' exists or it is closed. It mean VM is stopped.'.format(uuid), - exc_info=err) + logger.debug( + "VM {} sock either don' exists or it is closed. It mean VM is stopped.".format( + uuid + ), + exc_info=err, + ) else: # if we receive greetings from qmp it mean VM is running if len(recv) > 0: @@ -122,36 +156,67 @@ class VMM: # start --> sucess? migration_args = () if migration: - migration_args = ('-incoming', 'unix:{}'.format(os.path.join(self.socket_dir, uuid))) + migration_args = ( + "-incoming", + "unix:{}".format(os.path.join(self.socket_dir, uuid)), + ) if self.is_running(uuid): - logger.warning('Cannot start VM. It is already running.') + logger.warning("Cannot start VM. It is already running.") else: - qmp_arg = ('-qmp', 'unix:{},server,nowait'.format(join_path(self.vmm_backend, uuid))) - vnc_arg = ('-vnc', 'unix:{}'.format(tempfile.NamedTemporaryFile().name)) + qmp_arg = ( + "-qmp", + "unix:{},server,nowait".format( + join_path(self.vmm_backend, uuid) + ), + ) + vnc_arg = ( + "-vnc", + "unix:{}".format(tempfile.NamedTemporaryFile().name), + ) - command = ['sudo', '-p', 'Enter password to start VM {}: '.format(uuid), - self.qemu_path, *args, *qmp_arg, *migration_args, *vnc_arg, '-daemonize'] + command = [ + "sudo", + "-p", + "Enter password to start VM {}: ".format(uuid), + self.qemu_path, + *args, + *qmp_arg, + *migration_args, + *vnc_arg, + "-daemonize", + ] try: sp.check_output(command, stderr=sp.PIPE) except sp.CalledProcessError as err: - logger.exception('Error occurred while starting VM.\nDetail %s', err.stderr.decode('utf-8')) + logger.exception( + "Error occurred while starting VM.\nDetail %s", + err.stderr.decode("utf-8"), + ) else: with suppress(sp.CalledProcessError): - sp.check_output([ - 'sudo', '-p', - 'Enter password to correct permission for uncloud-vmm\'s directory', - 'chmod', '-R', 'o=rwx,g=rwx', self.vmm_backend - ]) + sp.check_output( + [ + "sudo", + "-p", + "Enter password to correct permission for uncloud-vmm's directory", + "chmod", + "-R", + "o=rwx,g=rwx", + self.vmm_backend, + ] + ) # TODO: Find some good way to check whether the virtual machine is up and # running without relying on non-guarenteed ways. for _ in range(10): time.sleep(2) status = self.get_status(uuid) - if status in ['running', 'inmigrate']: + if status in ["running", "inmigrate"]: return status - logger.warning('Timeout on VM\'s status. Shutting down VM %s', uuid) + logger.warning( + "Timeout on VM's status. Shutting down VM %s", uuid + ) self.stop(uuid) # TODO: What should we do more. VM can still continue to run in background. # If we have pid of vm we can kill it using OS. @@ -159,55 +224,73 @@ class VMM: def execute_command(self, uuid, command, **kwargs): # execute_command -> sucess?, output try: - with VMQMPHandles(os.path.join(self.vmm_backend, uuid)) as (sock_handle, file_handle): - command_to_execute = { - 'execute': command, - **kwargs - } - sock_handle.sendall(json.dumps(command_to_execute).encode('utf-8')) + with VMQMPHandles(os.path.join(self.vmm_backend, uuid)) as ( + sock_handle, + file_handle, + ): + command_to_execute = {"execute": command, **kwargs} + sock_handle.sendall( + json.dumps(command_to_execute).encode("utf-8") + ) output = file_handle.readline() except Exception: - logger.exception('Error occurred while executing command and getting valid output from qmp') + logger.exception( + "Error occurred while executing command and getting valid output from qmp" + ) else: try: output = json.loads(output) except Exception: - logger.exception('QMP Output isn\'t valid JSON. %s', output) + logger.exception( + "QMP Output isn't valid JSON. %s", output + ) else: - return 'return' in output, output + return "return" in output, output return False, None def stop(self, uuid): - success, output = self.execute_command(command='quit', uuid=uuid) + success, output = self.execute_command( + command="quit", uuid=uuid + ) return success def get_status(self, uuid): - success, output = self.execute_command(command='query-status', uuid=uuid) + success, output = self.execute_command( + command="query-status", uuid=uuid + ) if success: - return output['return']['status'] + return output["return"]["status"] else: # TODO: Think about this for a little more - return 'STOPPED' + return "STOPPED" def discover(self): vms = [ - uuid for uuid in os.listdir(self.vmm_backend) + uuid + for uuid in os.listdir(self.vmm_backend) if not isdir(join_path(self.vmm_backend, uuid)) ] return vms def get_vnc(self, uuid): - success, output = self.execute_command(uuid, command='query-vnc') + success, output = self.execute_command( + uuid, command="query-vnc" + ) if success: - return output['return']['service'] + return output["return"]["service"] return None def transfer(self, src_uuid, destination_sock_path, host): - p = TransferVM(src_uuid, destination_sock_path, socket_dir=self.socket_dir, host=host) + p = TransferVM( + src_uuid, + destination_sock_path, + socket_dir=self.socket_dir, + host=host, + ) p.start() # TODO: the following method should clean things that went wrong # e.g If VM migration fails or didn't start for long time # i.e 15 minutes we should stop the waiting VM. def maintenace(self): - pass \ No newline at end of file + pass