From 04993e41067edccd1fef3b6b48b4e8062f884c9f Mon Sep 17 00:00:00 2001 From: meow Date: Sun, 22 Dec 2019 12:26:48 +0500 Subject: [PATCH] Refactoring, Removal of most global vars, config default path is ~/ucloud/ --- scripts/ucloud | 31 +- setup.py | 4 +- ucloud/api/common_fields.py | 6 +- ucloud/api/create_image_store.py | 5 +- ucloud/api/helper.py | 18 +- ucloud/api/main.py | 118 +++--- ucloud/api/schemas.py | 23 +- ucloud/common/etcd_wrapper.py | 40 +- ucloud/common/network.py | 60 +++ ucloud/common/storage_handlers.py | 18 + ucloud/config.py | 39 -- ucloud/configure/main.py | 10 +- ucloud/filescanner/main.py | 17 +- ucloud/host/helper.py | 13 - ucloud/host/main.py | 110 +---- ucloud/host/virtualmachine.py | 648 +++++++++++++++--------------- ucloud/imagescanner/main.py | 19 +- ucloud/metadata/main.py | 10 +- ucloud/scheduler/helper.py | 25 +- ucloud/scheduler/main.py | 24 +- ucloud/scheduler/main.py.old | 93 ----- ucloud/settings/__init__.py | 38 +- ucloud/shared/__init__.py | 30 ++ 23 files changed, 673 insertions(+), 726 deletions(-) create mode 100644 ucloud/common/network.py delete mode 100644 ucloud/config.py delete mode 100644 ucloud/host/helper.py delete mode 100755 ucloud/scheduler/main.py.old create mode 100644 ucloud/shared/__init__.py diff --git a/scripts/ucloud b/scripts/ucloud index 8ea6027..7f3ef3a 100755 --- a/scripts/ucloud +++ b/scripts/ucloud @@ -3,29 +3,23 @@ import argparse import logging import importlib -import os import multiprocessing as mp import sys -from ucloud.configure.main import update_config, configure_parser +from ucloud.configure.main import configure_parser def exception_hook(exc_type, exc_value, exc_traceback): - logger.error( - "Uncaught exception", - exc_info=(exc_type, exc_value, exc_traceback) - ) - print(exc_type, exc_value) + logger.error( + 'Uncaught exception', + exc_info=(exc_type, exc_value, exc_traceback) + ) + print('Error: ', end='') + print(exc_type, exc_value, exc_traceback) if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG, - format='%(pathname)s:%(lineno)d -- %(levelname)-8s %(message)s', - filename='/var/log/ucloud.log', filemode='a') - logger = logging.getLogger("ucloud") - sys.excepthook = exception_hook - mp.set_start_method('spawn') arg_parser = argparse.ArgumentParser() subparsers = arg_parser.add_subparsers(dest="command") @@ -50,13 +44,20 @@ if __name__ == '__main__': if not args.command: arg_parser.print_help() else: + logging.basicConfig( + level=logging.DEBUG, + format='%(pathname)s:%(lineno)d -- %(levelname)-8s %(message)s', + handlers=[logging.handlers.SysLogHandler(address = '/dev/log')] + ) + logger = logging.getLogger("ucloud") + mp.set_start_method('spawn') + arguments = vars(args) try: name = arguments.pop('command') mod = importlib.import_module("ucloud.{}.main".format(name)) main = getattr(mod, "main") main(**arguments) - except Exception as e: - logger.exception(e) + logger.exception('Error') print(e) \ No newline at end of file diff --git a/setup.py b/setup.py index a7ddbe6..e273d68 100644 --- a/setup.py +++ b/setup.py @@ -1,3 +1,5 @@ +import os + from setuptools import setup, find_packages with open("README.md", "r") as fh: @@ -39,5 +41,5 @@ setup(name='ucloud', 'etcd3 @ https://github.com/kragniz/python-etcd3/tarball/master#egg=etcd3', ], scripts=['scripts/ucloud'], - data_files=[('/etc/ucloud/', ['conf/ucloud.conf'])], + data_files=[(os.path.expanduser('~/ucloud/'), ['conf/ucloud.conf'])], zip_safe=False) diff --git a/ucloud/api/common_fields.py b/ucloud/api/common_fields.py index cf86283..a793d26 100755 --- a/ucloud/api/common_fields.py +++ b/ucloud/api/common_fields.py @@ -1,6 +1,8 @@ import os -from ucloud.config import etcd_client, config +from ucloud.shared import shared +from ucloud.settings import settings + class Optional: pass @@ -47,6 +49,6 @@ class VmUUIDField(Field): self.validation = self.vm_uuid_validation def vm_uuid_validation(self): - r = etcd_client.get(os.path.join(config['etcd']['vm_prefix'], self.uuid)) + r = shared.etcd_client.get(os.path.join(settings['etcd']['vm_prefix'], self.uuid)) if not r: self.add_error("VM with uuid {} does not exists".format(self.uuid)) diff --git a/ucloud/api/create_image_store.py b/ucloud/api/create_image_store.py index 9023bd6..978a182 100755 --- a/ucloud/api/create_image_store.py +++ b/ucloud/api/create_image_store.py @@ -3,7 +3,8 @@ import os from uuid import uuid4 -from ucloud.config import etcd_client, config +from ucloud.shared import shared +from ucloud.settings import settings data = { "is_public": True, @@ -13,4 +14,4 @@ data = { "attributes": {"list": [], "key": [], "pool": "images"}, } -etcd_client.put(os.path.join(config['etcd']['image_store_prefix'], uuid4().hex), json.dumps(data)) +shared.etcd_client.put(os.path.join(settings['etcd']['image_store_prefix'], uuid4().hex), json.dumps(data)) diff --git a/ucloud/api/helper.py b/ucloud/api/helper.py index 1448e02..9cda36e 100755 --- a/ucloud/api/helper.py +++ b/ucloud/api/helper.py @@ -7,29 +7,29 @@ import requests from pyotp import TOTP -from ucloud.config import vm_pool, config - +from ucloud.shared import shared +from ucloud.settings import settings logger = logging.getLogger("ucloud.api.helper") def check_otp(name, realm, token): try: data = { - "auth_name": config['otp']['auth_name'], - "auth_token": TOTP(config['otp']['auth_seed']).now(), - "auth_realm": config['otp']['auth_realm'], + "auth_name": settings['otp']['auth_name'], + "auth_token": TOTP(settings['otp']['auth_seed']).now(), + "auth_realm": settings['otp']['auth_realm'], "name": name, "realm": realm, "token": token, } except binascii.Error as err: logger.error( - "Cannot compute OTP for seed: {}".format(config['otp']['auth_seed']) + "Cannot compute OTP for seed: {}".format(settings['otp']['auth_seed']) ) return 400 response = requests.post( - config['otp']['verification_controller_url'], json=data + settings['otp']['verification_controller_url'], json=data ) return response.status_code @@ -43,7 +43,7 @@ def resolve_vm_name(name, owner): result = next( filter( lambda vm: vm.value["owner"] == owner and vm.value["name"] == name, - vm_pool.vms, + shared.vm_pool.vms, ), None, ) @@ -81,7 +81,7 @@ def resolve_image_name(name, etcd_client): except Exception: raise ValueError("Image name not in correct format i.e {store_name}:{image_name}") - images = etcd_client.get_prefix(config['etcd']['image_prefix'], value_in_json=True) + images = etcd_client.get_prefix(settings['etcd']['image_prefix'], value_in_json=True) # Try to find image with name == image_name and store_name == store_name try: diff --git a/ucloud/api/main.py b/ucloud/api/main.py index 0e202d8..05972ff 100644 --- a/ucloud/api/main.py +++ b/ucloud/api/main.py @@ -10,10 +10,9 @@ from flask_restful import Resource, Api from ucloud.common import counters from ucloud.common.vm import VMStatus from ucloud.common.request import RequestEntry, RequestType -from ucloud.config import ( - etcd_client, request_pool, vm_pool, - host_pool, config, image_storage_handler -) +from ucloud.settings import settings +from ucloud.shared import shared + from . import schemas from .helper import generate_mac, mac2ipv6 from . import logger @@ -31,7 +30,7 @@ class CreateVM(Resource): validator = schemas.CreateVMSchema(data) if validator.is_valid(): vm_uuid = uuid4().hex - vm_key = join_path(config['etcd']['vm_prefix'], vm_uuid) + vm_key = join_path(settings['etcd']['vm_prefix'], vm_uuid) specs = { "cpu": validator.specs["cpu"], "ram": validator.specs["ram"], @@ -39,7 +38,7 @@ class CreateVM(Resource): "hdd": validator.specs["hdd"], } macs = [generate_mac() for _ in range(len(data["network"]))] - tap_ids = [counters.increment_etcd_counter(etcd_client, "/v1/counter/tap") + tap_ids = [counters.increment_etcd_counter(shared.etcd_client, "/v1/counter/tap") for _ in range(len(data["network"]))] vm_entry = { "name": data["vm_name"], @@ -54,14 +53,14 @@ class CreateVM(Resource): "network": list(zip(data["network"], macs, tap_ids)), "metadata": {"ssh-keys": []}, } - etcd_client.put(vm_key, vm_entry, value_in_json=True) + shared.etcd_client.put(vm_key, vm_entry, value_in_json=True) # Create ScheduleVM Request r = RequestEntry.from_scratch( type=RequestType.ScheduleVM, uuid=vm_uuid, - request_prefix=config['etcd']['request_prefix'] + request_prefix=settings['etcd']['request_prefix'] ) - request_pool.put(r) + shared.request_pool.put(r) return {"message": "VM Creation Queued"}, 200 return validator.get_errors(), 400 @@ -73,16 +72,16 @@ class VmStatus(Resource): data = request.json validator = schemas.VMStatusSchema(data) if validator.is_valid(): - vm = vm_pool.get( - join_path(config['etcd']['vm_prefix'], data["uuid"]) + vm = shared.vm_pool.get( + join_path(settings['etcd']['vm_prefix'], data["uuid"]) ) vm_value = vm.value.copy() vm_value["ip"] = [] for network_mac_and_tap in vm.network: network_name, mac, tap = network_mac_and_tap - network = etcd_client.get( + network = shared.etcd_client.get( join_path( - config['etcd']['network_prefix'], + settings['etcd']['network_prefix'], data["name"], network_name, ), @@ -102,8 +101,8 @@ class CreateImage(Resource): data = request.json validator = schemas.CreateImageSchema(data) if validator.is_valid(): - file_entry = etcd_client.get( - join_path(config['etcd']['file_prefix'], data["uuid"]) + file_entry = shared.etcd_client.get( + join_path(settings['etcd']['file_prefix'], data["uuid"]) ) file_entry_value = json.loads(file_entry.value) @@ -115,8 +114,8 @@ class CreateImage(Resource): "store_name": data["image_store"], "visibility": "public", } - etcd_client.put( - join_path(config['etcd']['image_prefix'], data["uuid"]), + shared.etcd_client.put( + join_path(settings['etcd']['image_prefix'], data["uuid"]), json.dumps(image_entry_json), ) @@ -127,8 +126,8 @@ class CreateImage(Resource): class ListPublicImages(Resource): @staticmethod def get(): - images = etcd_client.get_prefix( - config['etcd']['image_prefix'], value_in_json=True + images = shared.etcd_client.get_prefix( + settings['etcd']['image_prefix'], value_in_json=True ) r = { "images": [] @@ -150,8 +149,8 @@ class VMAction(Resource): validator = schemas.VmActionSchema(data) if validator.is_valid(): - vm_entry = vm_pool.get( - join_path(config['etcd']['vm_prefix'], data["uuid"]) + vm_entry = shared.vm_pool.get( + join_path(settings['etcd']['vm_prefix'], data["uuid"]) ) action = data["action"] @@ -159,25 +158,25 @@ class VMAction(Resource): action = "schedule" if action == "delete" and vm_entry.hostname == "": - if image_storage_handler.is_vm_image_exists(vm_entry.uuid): - r_status = image_storage_handler.delete_vm_image(vm_entry.uuid) + if shared.storage_handler.is_vm_image_exists(vm_entry.uuid): + r_status = shared.storage_handler.delete_vm_image(vm_entry.uuid) if r_status: - etcd_client.client.delete(vm_entry.key) + shared.etcd_client.client.delete(vm_entry.key) return {"message": "VM successfully deleted"} else: logger.error("Some Error Occurred while deleting VM") return {"message": "VM deletion unsuccessfull"} else: - etcd_client.client.delete(vm_entry.key) + shared.etcd_client.client.delete(vm_entry.key) return {"message": "VM successfully deleted"} r = RequestEntry.from_scratch( type="{}VM".format(action.title()), uuid=data["uuid"], hostname=vm_entry.hostname, - request_prefix=config['etcd']['request_prefix'] + request_prefix=settings['etcd']['request_prefix'] ) - request_pool.put(r) + shared.request_pool.put(r) return {"message": "VM {} Queued".format(action.title())}, 200 else: return validator.get_errors(), 400 @@ -190,18 +189,18 @@ class VMMigration(Resource): validator = schemas.VmMigrationSchema(data) if validator.is_valid(): - vm = vm_pool.get(data["uuid"]) + vm = shared.vm_pool.get(data["uuid"]) r = RequestEntry.from_scratch( type=RequestType.ScheduleVM, uuid=vm.uuid, destination=join_path( - config['etcd']['host_prefix'], validator.destination.value + settings['etcd']['host_prefix'], validator.destination.value ), migration=True, - request_prefix=config['etcd']['request_prefix'] + request_prefix=settings['etcd']['request_prefix'] ) - request_pool.put(r) + shared.request_pool.put(r) return {"message": "VM Migration Initialization Queued"}, 200 else: return validator.get_errors(), 400 @@ -214,8 +213,8 @@ class ListUserVM(Resource): validator = schemas.OTPSchema(data) if validator.is_valid(): - vms = etcd_client.get_prefix( - config['etcd']['vm_prefix'], value_in_json=True + vms = shared.etcd_client.get_prefix( + settings['etcd']['vm_prefix'], value_in_json=True ) return_vms = [] user_vms = filter(lambda v: v.value["owner"] == data["name"], vms) @@ -227,7 +226,6 @@ class ListUserVM(Resource): "specs": vm.value["specs"], "status": vm.value["status"], "hostname": vm.value["hostname"], - # "mac": vm.value["mac"], "vnc_socket": None if vm.value.get("vnc_socket", None) is None else vm.value["vnc_socket"], @@ -248,8 +246,8 @@ class ListUserFiles(Resource): validator = schemas.OTPSchema(data) if validator.is_valid(): - files = etcd_client.get_prefix( - config['etcd']['file_prefix'], value_in_json=True + files = shared.etcd_client.get_prefix( + settings['etcd']['file_prefix'], value_in_json=True ) return_files = [] user_files = list( @@ -273,14 +271,14 @@ class CreateHost(Resource): data = request.json validator = schemas.CreateHostSchema(data) if validator.is_valid(): - host_key = join_path(config['etcd']['host_prefix'], uuid4().hex) + host_key = join_path(settings['etcd']['host_prefix'], uuid4().hex) host_entry = { "specs": data["specs"], "hostname": data["hostname"], "status": "DEAD", "last_heartbeat": "", } - etcd_client.put(host_key, host_entry, value_in_json=True) + shared.etcd_client.put(host_key, host_entry, value_in_json=True) return {"message": "Host Created"}, 200 @@ -290,7 +288,7 @@ class CreateHost(Resource): class ListHost(Resource): @staticmethod def get(): - hosts = host_pool.hosts + hosts = shared.host_pool.hosts r = { host.key: { "status": host.status, @@ -312,12 +310,12 @@ class GetSSHKeys(Resource): # {user_prefix}/{realm}/{name}/key/ etcd_key = join_path( - config['etcd']['user_prefix'], + settings['etcd']['user_prefix'], data["realm"], data["name"], "key", ) - etcd_entry = etcd_client.get_prefix( + etcd_entry = shared.etcd_client.get_prefix( etcd_key, value_in_json=True ) @@ -329,13 +327,13 @@ class GetSSHKeys(Resource): # {user_prefix}/{realm}/{name}/key/{key_name} etcd_key = join_path( - config['etcd']['user_prefix'], + settings['etcd']['user_prefix'], data["realm"], data["name"], "key", data["key_name"], ) - etcd_entry = etcd_client.get(etcd_key, value_in_json=True) + etcd_entry = shared.etcd_client.get(etcd_key, value_in_json=True) if etcd_entry: return { @@ -358,13 +356,13 @@ class AddSSHKey(Resource): # {user_prefix}/{realm}/{name}/key/{key_name} etcd_key = join_path( - config['etcd']['user_prefix'], + settings['etcd']['user_prefix'], data["realm"], data["name"], "key", data["key_name"], ) - etcd_entry = etcd_client.get(etcd_key, value_in_json=True) + etcd_entry = shared.etcd_client.get(etcd_key, value_in_json=True) if etcd_entry: return { "message": "Key with name '{}' already exists".format( @@ -373,7 +371,7 @@ class AddSSHKey(Resource): } else: # Key Not Found. It implies user' haven't added any key yet. - etcd_client.put(etcd_key, data["key"], value_in_json=True) + shared.etcd_client.put(etcd_key, data["key"], value_in_json=True) return {"message": "Key added successfully"} else: return validator.get_errors(), 400 @@ -388,15 +386,15 @@ class RemoveSSHKey(Resource): # {user_prefix}/{realm}/{name}/key/{key_name} etcd_key = join_path( - config['etcd']['user_prefix'], + settings['etcd']['user_prefix'], data["realm"], data["name"], "key", data["key_name"], ) - etcd_entry = etcd_client.get(etcd_key, value_in_json=True) + etcd_entry = shared.etcd_client.get(etcd_key, value_in_json=True) if etcd_entry: - etcd_client.client.delete(etcd_key) + shared.etcd_client.client.delete(etcd_key) return {"message": "Key successfully removed."} else: return { @@ -418,22 +416,22 @@ class CreateNetwork(Resource): network_entry = { "id": counters.increment_etcd_counter( - etcd_client, "/v1/counter/vxlan" + shared.etcd_client, "/v1/counter/vxlan" ), "type": data["type"], } if validator.user.value: try: nb = pynetbox.api( - url=config['netbox']['url'], - token=config['netbox']['token'], + url=settings['netbox']['url'], + token=settings['netbox']['token'], ) nb_prefix = nb.ipam.prefixes.get( - prefix=config['network']['prefix'] + prefix=settings['network']['prefix'] ) prefix = nb_prefix.available_prefixes.create( data={ - "prefix_length": int(config['network']['prefix_length']), + "prefix_length": int(settings['network']['prefix_length']), "description": '{}\'s network "{}"'.format( data["name"], data["network_name"] ), @@ -449,11 +447,11 @@ class CreateNetwork(Resource): network_entry["ipv6"] = "fd00::/64" network_key = join_path( - config['etcd']['network_prefix'], + settings['etcd']['network_prefix'], data['name'], data['network_name'], ) - etcd_client.put(network_key, network_entry, value_in_json=True) + shared.etcd_client.put(network_key, network_entry, value_in_json=True) return {"message": "Network successfully added."} else: return validator.get_errors(), 400 @@ -467,9 +465,9 @@ class ListUserNetwork(Resource): if validator.is_valid(): prefix = join_path( - config['etcd']['network_prefix'], data["name"] + settings['etcd']['network_prefix'], data["name"] ) - networks = etcd_client.get_prefix(prefix, value_in_json=True) + networks = shared.etcd_client.get_prefix(prefix, value_in_json=True) user_networks = [] for net in networks: net.value["name"] = net.key.split("/")[-1] @@ -503,7 +501,7 @@ api.add_resource(CreateNetwork, "/network/create") def main(): - image_stores = list(etcd_client.get_prefix(config['etcd']['image_store_prefix'], value_in_json=True)) + image_stores = list(shared.etcd_client.get_prefix(settings['etcd']['image_store_prefix'], value_in_json=True)) if len(image_stores) == 0: data = { "is_public": True, @@ -513,7 +511,7 @@ def main(): "attributes": {"list": [], "key": [], "pool": "images"}, } - etcd_client.put(join_path(config['etcd']['image_store_prefix'], uuid4().hex), json.dumps(data)) + shared.etcd_client.put(join_path(settings['etcd']['image_store_prefix'], uuid4().hex), json.dumps(data)) app.run(host="::", debug=True) diff --git a/ucloud/api/schemas.py b/ucloud/api/schemas.py index a3e0aa8..d639be4 100755 --- a/ucloud/api/schemas.py +++ b/ucloud/api/schemas.py @@ -21,7 +21,8 @@ import bitmath from ucloud.common.host import HostStatus from ucloud.common.vm import VMStatus -from ucloud.config import etcd_client, config, vm_pool, host_pool +from ucloud.shared import shared +from ucloud.settings import settings from . import helper, logger from .common_fields import Field, VmUUIDField from .helper import check_otp, resolve_vm_name @@ -102,14 +103,14 @@ class CreateImageSchema(BaseSchema): super().__init__(data, fields) def file_uuid_validation(self): - file_entry = etcd_client.get(os.path.join(config['etcd']['file_prefix'], self.uuid.value)) + file_entry = shared.etcd_client.get(os.path.join(settings['etcd']['file_prefix'], self.uuid.value)) if file_entry is None: self.add_error( "Image File with uuid '{}' Not Found".format(self.uuid.value) ) def image_store_name_validation(self): - image_stores = list(etcd_client.get_prefix(config['etcd']['image_store_prefix'])) + image_stores = list(shared.etcd_client.get_prefix(settings['etcd']['image_store_prefix'])) image_store = next( filter( @@ -218,7 +219,7 @@ class CreateVMSchema(OTPSchema): def image_validation(self): try: - image_uuid = helper.resolve_image_name(self.image.value, etcd_client) + image_uuid = helper.resolve_image_name(self.image.value, shared.etcd_client) except Exception as e: logger.exception("Cannot resolve image name = %s", self.image.value) self.add_error(str(e)) @@ -236,7 +237,7 @@ class CreateVMSchema(OTPSchema): if _network: for net in _network: - network = etcd_client.get(os.path.join(config['etcd']['network_prefix'], + network = shared.etcd_client.get(os.path.join(settings['etcd']['network_prefix'], self.name.value, net), value_in_json=True) if not network: @@ -310,7 +311,7 @@ class VMStatusSchema(OTPSchema): super().__init__(data, fields) def validation(self): - vm = vm_pool.get(self.uuid.value) + vm = shared.vm_pool.get(self.uuid.value) if not ( vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" ): @@ -343,7 +344,7 @@ class VmActionSchema(OTPSchema): ) def validation(self): - vm = vm_pool.get(self.uuid.value) + vm = shared.vm_pool.get(self.uuid.value) if not ( vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" ): @@ -383,7 +384,7 @@ class VmMigrationSchema(OTPSchema): def destination_validation(self): hostname = self.destination.value - host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) + host = next(filter(lambda h: h.hostname == hostname, shared.host_pool.hosts), None) if not host: self.add_error("No Such Host ({}) exists".format(self.destination.value)) elif host.status != HostStatus.alive: @@ -392,7 +393,7 @@ class VmMigrationSchema(OTPSchema): self.destination.value = host.key def validation(self): - vm = vm_pool.get(self.uuid.value) + vm = shared.vm_pool.get(self.uuid.value) if not ( vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" ): @@ -401,7 +402,7 @@ class VmMigrationSchema(OTPSchema): if vm.status != VMStatus.running: self.add_error("Can't migrate non-running VM") - if vm.hostname == os.path.join(config['etcd']['host_prefix'], self.destination.value): + if vm.hostname == os.path.join(settings['etcd']['host_prefix'], self.destination.value): self.add_error("Destination host couldn't be same as Source Host") @@ -443,7 +444,7 @@ class CreateNetwork(OTPSchema): super().__init__(data, fields=fields) def network_name_validation(self): - network = etcd_client.get(os.path.join(config['etcd']['network_prefix'], + network = shared.etcd_client.get(os.path.join(settings['etcd']['network_prefix'], self.name.value, self.network_name.value), value_in_json=True) diff --git a/ucloud/common/etcd_wrapper.py b/ucloud/common/etcd_wrapper.py index a3fb83f..e249e6c 100644 --- a/ucloud/common/etcd_wrapper.py +++ b/ucloud/common/etcd_wrapper.py @@ -4,40 +4,63 @@ import queue import copy from collections import namedtuple +from functools import wraps + +from . import logger + +PseudoEtcdMeta = namedtuple('PseudoEtcdMeta', ['key']) -PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"]) class EtcdEntry: # key: str # value: str def __init__(self, meta, value, value_in_json=False): - self.key = meta.key.decode("utf-8") - self.value = value.decode("utf-8") + self.key = meta.key.decode('utf-8') + self.value = value.decode('utf-8') if value_in_json: self.value = json.loads(self.value) + +def readable_errors(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + func(*args, **kwargs) + except etcd3.exceptions.ConnectionFailedError as err: + raise etcd3.exceptions.ConnectionFailedError('etcd connection failed') from err + except etcd3.exceptions.ConnectionTimeoutError as err: + raise etcd3.exceptions.ConnectionTimeoutError('etcd connection timeout') from err + except Exception: + print('Some error occurred, most probably it is etcd that is erroring out.') + logger.exception('Some etcd error occurred') + return wrapper + + class Etcd3Wrapper: def __init__(self, *args, **kwargs): self.client = etcd3.client(*args, **kwargs) + @readable_errors def get(self, *args, value_in_json=False, **kwargs): _value, _key = self.client.get(*args, **kwargs) if _key is None or _value is None: return None return EtcdEntry(_key, _value, value_in_json=value_in_json) + @readable_errors def put(self, *args, value_in_json=False, **kwargs): _key, _value = args if value_in_json: _value = json.dumps(_value) if not isinstance(_key, str): - _key = _key.decode("utf-8") + _key = _key.decode('utf-8') return self.client.put(_key, _value, **kwargs) + @readable_errors def get_prefix(self, *args, value_in_json=False, **kwargs): r = self.client.get_prefix(*args, **kwargs) for entry in r: @@ -45,10 +68,11 @@ class Etcd3Wrapper: if e.value: yield e + @readable_errors def watch_prefix(self, key, timeout=0, value_in_json=False): - timeout_event = EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), - value=str.encode(json.dumps({"status": "TIMEOUT", - "type": "TIMEOUT"})), + timeout_event = EtcdEntry(PseudoEtcdMeta(key=b'TIMEOUT'), + value=str.encode(json.dumps({'status': 'TIMEOUT', + 'type': 'TIMEOUT'})), value_in_json=value_in_json) event_queue = queue.Queue() @@ -71,4 +95,4 @@ class Etcd3Wrapper: class PsuedoEtcdEntry(EtcdEntry): def __init__(self, key, value, value_in_json=False): - super().__init__(PseudoEtcdMeta(key=key.encode("utf-8")), value, value_in_json=value_in_json) + super().__init__(PseudoEtcdMeta(key=key.encode('utf-8')), value, value_in_json=value_in_json) diff --git a/ucloud/common/network.py b/ucloud/common/network.py new file mode 100644 index 0000000..6a6c6e2 --- /dev/null +++ b/ucloud/common/network.py @@ -0,0 +1,60 @@ +import subprocess as sp +import random +import logging +import socket +from contextlib import closing + +logger = logging.getLogger(__name__) + + +def random_bytes(num=6): + return [random.randrange(256) for _ in range(num)] + + +def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='%02x'): + mac = random_bytes() + if oui: + if type(oui) == str: + oui = [int(chunk) for chunk in oui.split(separator)] + mac = oui + random_bytes(num=6 - len(oui)) + else: + if multicast: + mac[0] |= 1 # set bit 0 + else: + mac[0] &= ~1 # clear bit 0 + if uaa: + mac[0] &= ~(1 << 1) # clear bit 1 + else: + mac[0] |= 1 << 1 # set bit 1 + return separator.join(byte_fmt % b for b in mac) + + +def create_dev(script, _id, dev, ip=None): + command = [script, _id, dev] + if ip: + command.append(ip) + try: + output = sp.check_output(command, stderr=sp.PIPE) + except Exception as e: + print(e) + return None + else: + return output.decode('utf-8').strip() + + +def delete_network_interface(iface): + try: + sp.check_output(['ip', 'link', 'del', iface]) + except Exception: + logger.exception('Interface Deletion failed') + + +def find_free_port(): + with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + try: + s.bind(('', 0)) + s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + except Exception: + return None + else: + return s.getsockname()[1] diff --git a/ucloud/common/storage_handlers.py b/ucloud/common/storage_handlers.py index 8b1097a..d2bd452 100644 --- a/ucloud/common/storage_handlers.py +++ b/ucloud/common/storage_handlers.py @@ -7,6 +7,8 @@ from abc import ABC from . import logger from os.path import join as join_path +from ucloud.settings import settings as config + class ImageStorageHandler(ABC): def __init__(self, image_base, vm_base): @@ -156,3 +158,19 @@ class CEPHBasedImageStorageHandler(ImageStorageHandler): path = join_path(self.vm_base, path) command = ["rbd", "info", path] return self.execute_command(command, report=False) + + +def get_storage_handler(): + __storage_backend = config['storage']['storage_backend'] + if __storage_backend == 'filesystem': + return FileSystemBasedImageStorageHandler( + vm_base=config['storage']['vm_dir'], + image_base=config['storage']['image_dir'] + ) + elif __storage_backend == 'ceph': + return CEPHBasedImageStorageHandler( + vm_base=config['storage']['ceph_vm_pool'], + image_base=config['storage']['ceph_image_pool'] + ) + else: + raise Exception('Unknown Image Storage Handler') diff --git a/ucloud/config.py b/ucloud/config.py deleted file mode 100644 index 97d1561..0000000 --- a/ucloud/config.py +++ /dev/null @@ -1,39 +0,0 @@ -import configparser -import os -import logging - -from ucloud.common.host import HostPool -from ucloud.common.request import RequestPool -from ucloud.common.vm import VmPool -from ucloud.common.storage_handlers import (FileSystemBasedImageStorageHandler, - CEPHBasedImageStorageHandler) -from ucloud.common.etcd_wrapper import Etcd3Wrapper -from ucloud.settings import Settings -from os.path import join as join_path - -logger = logging.getLogger('ucloud.config') - - - -config = Settings() -etcd_client = config.get_etcd_client() - -host_pool = HostPool(etcd_client, config['etcd']['host_prefix']) -vm_pool = VmPool(etcd_client, config['etcd']['vm_prefix']) -request_pool = RequestPool(etcd_client, config['etcd']['request_prefix']) - -running_vms = [] - -__storage_backend = config['storage']['storage_backend'] -if __storage_backend == 'filesystem': - image_storage_handler = FileSystemBasedImageStorageHandler( - vm_base=config['storage']['vm_dir'], - image_base=config['storage']['image_dir'] - ) -elif __storage_backend == 'ceph': - image_storage_handler = CEPHBasedImageStorageHandler( - vm_base=config['storage']['ceph_vm_pool'], - image_base=config['storage']['ceph_image_pool'] - ) -else: - raise Exception('Unknown Image Storage Handler') diff --git a/ucloud/configure/main.py b/ucloud/configure/main.py index 0baa8eb..71e07a1 100644 --- a/ucloud/configure/main.py +++ b/ucloud/configure/main.py @@ -2,21 +2,19 @@ import argparse import sys import os -from ucloud.settings import Settings +from ucloud.settings import settings +from ucloud.shared import shared -config = Settings() -etcd_client = config.get_etcd_client() def update_config(section, kwargs): - uncloud_config = etcd_client.get(config.config_key, - value_in_json=True) + uncloud_config = shared.etcd_client.get(settings.config_key, value_in_json=True) if not uncloud_config: uncloud_config = {} else: uncloud_config = uncloud_config.value uncloud_config[section] = kwargs - etcd_client.put(config.config_key, uncloud_config, value_in_json=True) + shared.etcd_client.put(settings.config_key, uncloud_config, value_in_json=True) def configure_parser(parser): diff --git a/ucloud/filescanner/main.py b/ucloud/filescanner/main.py index 14a77cf..ff38748 100755 --- a/ucloud/filescanner/main.py +++ b/ucloud/filescanner/main.py @@ -8,8 +8,8 @@ import sys from uuid import uuid4 from . import logger -from ucloud.config import config, etcd_client - +from ucloud.settings import settings +from ucloud.shared import shared def getxattr(file, attr): """Get specified user extended attribute (arg:attr) of a file (arg:file)""" @@ -69,11 +69,10 @@ except Exception as e: def main(): - BASE_DIR = config['storage']['file_dir'] - FILE_PREFIX = config['etcd']['file_prefix'] + base_dir = settings['storage']['file_dir'] # Recursively Get All Files and Folder below BASE_DIR - files = glob.glob("{}/**".format(BASE_DIR), recursive=True) + files = glob.glob("{}/**".format(base_dir), recursive=True) # Retain only Files files = list(filter(os.path.isfile, files)) @@ -89,7 +88,7 @@ def main(): file_id = uuid4() # Get Username - owner = pathlib.Path(file).parts[len(pathlib.Path(BASE_DIR).parts)] + owner = pathlib.Path(file).parts[len(pathlib.Path(base_dir).parts)] # Get Creation Date of File # Here, we are assuming that ctime is creation time @@ -105,7 +104,7 @@ def main(): file_path = pathlib.Path(file).parts[-1] # Create Entry - entry_key = os.path.join(FILE_PREFIX, str(file_id)) + entry_key = os.path.join(settings['etcd']['file_prefix'], str(file_id)) entry_value = { "filename": file_path, "owner": owner, @@ -115,8 +114,8 @@ def main(): } logger.info("Tracking %s", file) - # Insert Entry - etcd_client.put(entry_key, entry_value, value_in_json=True) + + shared.etcd_client.put(entry_key, entry_value, value_in_json=True) setxattr(file, "utracked", True) diff --git a/ucloud/host/helper.py b/ucloud/host/helper.py deleted file mode 100644 index edcb82d..0000000 --- a/ucloud/host/helper.py +++ /dev/null @@ -1,13 +0,0 @@ -import socket -from contextlib import closing - - -def find_free_port(): - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: - try: - s.bind(('', 0)) - s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - except Exception: - return None - else: - return s.getsockname()[1] diff --git a/ucloud/host/main.py b/ucloud/host/main.py index ddc52c7..f78f629 100755 --- a/ucloud/host/main.py +++ b/ucloud/host/main.py @@ -3,22 +3,21 @@ import multiprocessing as mp import time import sys -from os.path import isdir -from ucloud.common.etcd_wrapper import Etcd3Wrapper from ucloud.common.request import RequestEntry, RequestType -from ucloud.config import (vm_pool, request_pool, - etcd_client, running_vms, - HostPool, config) +from ucloud.common.host import HostPool +from ucloud.shared import shared +from ucloud.settings import settings -from .helper import find_free_port from . import virtualmachine, logger +vmm = virtualmachine.VMM() + def update_heartbeat(hostname): """Update Last HeartBeat Time for :param hostname: in etcd""" - client = config.get_etcd_client() - host_pool = HostPool(client, config['etcd']['host_prefix']) + client = shared.etcd_client + host_pool = HostPool(client) this_host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) while True: @@ -27,122 +26,55 @@ def update_heartbeat(hostname): time.sleep(10) -def maintenance(host): - # To capture vm running according to running_vms list - - # This is to capture successful migration of a VM. - # Suppose, this host is running "vm1" and user initiated - # request to migrate this "vm1" to some other host. On, - # successful migration the destination host would set - # the vm hostname to itself. Thus, we are checking - # whether this host vm is successfully migrated. If yes - # then we shutdown "vm1" on this host. - - to_be_removed = [] - for running_vm in running_vms: - with vm_pool.get_put(running_vm.key) as vm_entry: - if vm_entry.hostname != host.key and not vm_entry.in_migration: - running_vm.handle.shutdown() - logger.info("VM migration not completed successfully.") - to_be_removed.append(running_vm) - - for r in to_be_removed: - running_vms.remove(r) - - # To check vm running according to etcd entries - alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key)) - - for vm_entry in alleged_running_vms: - _vm = virtualmachine.get_vm(running_vms, vm_entry.key) - # Whether, the allegedly running vm is in our - # running_vms list or not if it is said to be - # running on this host but it is not then we - # need to shut it down - - # This is to capture poweroff/shutdown of a VM - # initiated by user inside VM. OR crash of VM by some - # user running process - if (_vm and not _vm.handle.is_running()) or not _vm: - logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running())) - vm_entry.add_log("""{} is not running but is said to be running. - So, shutting it down and declare it killed""".format(vm_entry.key)) - vm_entry.declare_killed() - vm_pool.put(vm_entry) - if _vm: - running_vms.remove(_vm) - -def check(): - if config['storage']['storage_backend'] == 'filesystem' and \ - not isdir(config['storage']['vm_dir']): - - print("You have set STORAGE_BACKEND to filesystem. So, the vm directory mentioned" - " in /etc/ucloud/ucloud.conf file must exists. But, it don't.") - sys.exit(1) - - def main(hostname): - check() - - heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,)) - - host_pool = HostPool(etcd_client, config['etcd']['host_prefix']) + host_pool = HostPool(shared.etcd_client) host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) assert host is not None, "No such host with name = {}".format(hostname) try: + heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,)) heartbeat_updating_process.start() except Exception as e: - logger.info("No Need To Go Further. Our heartbeat updating mechanism is not working") logger.exception(e) - exit(-1) - - logger.info("%s Session Started %s", '*' * 5, '*' * 5) - - # It is seen that under heavy load, timeout event doesn't come - # in a predictive manner (which is intentional because we give - # higher priority to customer's requests) which delays heart - # beat update which in turn misunderstood by scheduler that the - # host is dead when it is actually alive. So, to ensure that we - # update the heart beat in a predictive manner we start Heart - # beat updating mechanism in separated thread + sys.exit("No Need To Go Further. ucloud-host heartbeat updating mechanism is not working") for events_iterator in [ - etcd_client.get_prefix(config['etcd']['request_prefix'], value_in_json=True), - etcd_client.watch_prefix(config['etcd']['request_prefix'], timeout=10, value_in_json=True), + shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True), + shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], timeout=10, value_in_json=True), ]: for request_event in events_iterator: request_event = RequestEntry(request_event) if request_event.type == "TIMEOUT": - maintenance(host) + vmm.maintenance(host) continue # If the event is directed toward me OR I am destination of a InitVMMigration if request_event.hostname == host.key or request_event.destination == host.key: logger.debug("VM Request: %s", request_event) - request_pool.client.client.delete(request_event.key) - vm_entry = vm_pool.get(request_event.uuid) + shared.request_pool.client.client.delete(request_event.key) + vm_entry = shared.vm_pool.get(request_event.uuid) if vm_entry: if request_event.type == RequestType.StartVM: - virtualmachine.start(vm_entry) + vmm.start(vm_entry) elif request_event.type == RequestType.StopVM: - virtualmachine.stop(vm_entry) + vmm.stop(vm_entry) elif request_event.type == RequestType.DeleteVM: - virtualmachine.delete(vm_entry) + vmm.delete(vm_entry) elif request_event.type == RequestType.InitVMMigration: - virtualmachine.start(vm_entry, host.key, find_free_port()) + vmm.start(vm_entry, host.key) elif request_event.type == RequestType.TransferVM: - virtualmachine.transfer(request_event) + vmm.transfer(request_event) else: logger.info("VM Entry missing") - logger.info("Running VMs %s", running_vms) + logger.info("Running VMs %s", vmm.running_vms) if __name__ == "__main__": diff --git a/ucloud/host/virtualmachine.py b/ucloud/host/virtualmachine.py index 4a7584a..cc06ce3 100755 --- a/ucloud/host/virtualmachine.py +++ b/ucloud/host/virtualmachine.py @@ -5,7 +5,6 @@ # https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor import os -import random import subprocess as sp import tempfile import time @@ -21,11 +20,12 @@ import sshtunnel from ucloud.common.helpers import get_ipv6_address from ucloud.common.request import RequestEntry, RequestType from ucloud.common.vm import VMEntry, VMStatus -from ucloud.config import (etcd_client, request_pool, - running_vms, vm_pool, config, - image_storage_handler) -from . import qmp +from ucloud.common.network import create_dev, delete_network_interface, find_free_port from ucloud.host import logger +from ucloud.shared import shared +from ucloud.settings import settings + +from . import qmp class VM: @@ -38,193 +38,22 @@ class VM: return "VM({})".format(self.key) -def delete_network_interface(iface): - try: - sp.check_output(['ip', 'link', 'del', iface]) - except Exception: - pass +def capture_all_exception(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + func(*args, **kwargs) + except Exception: + logger.info("Exception absorbed by captual_all_exception()") + logger.exception(func.__name__) - -def resolve_network(network_name, network_owner): - network = etcd_client.get(join_path(config['etcd']['network_prefix'], - network_owner, - network_name), - value_in_json=True) - return network - - -def delete_vm_network(vm_entry): - try: - for network in vm_entry.network: - network_name = network[0] - tap_mac = network[1] - tap_id = network[2] - - delete_network_interface('tap{}'.format(tap_id)) - - owners_vms = vm_pool.by_owner(vm_entry.owner) - owners_running_vms = vm_pool.by_status(VMStatus.running, - _vms=owners_vms) - - networks = map(lambda n: n[0], - map(lambda vm: vm.network, owners_running_vms) - ) - networks_in_use_by_user_vms = [vm[0] for vm in networks] - if network_name not in networks_in_use_by_user_vms: - network_entry = resolve_network(network[0], vm_entry.owner) - if network_entry: - network_type = network_entry.value["type"] - network_id = network_entry.value["id"] - if network_type == "vxlan": - delete_network_interface('br{}'.format(network_id)) - delete_network_interface('vxlan{}'.format(network_id)) - except Exception: - logger.exception("Exception in network interface deletion") - - -def create_dev(script, _id, dev, ip=None): - command = [script, _id, dev] - if ip: - command.append(ip) - try: - output = sp.check_output(command, stderr=sp.PIPE) - except Exception as e: - print(e.stderr) - return None - else: - return output.decode("utf-8").strip() - - -def create_vxlan_br_tap(_id, _dev, tap_id, ip=None): - network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') - vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'), - _id=_id, dev=_dev) - if vxlan: - bridge = create_dev(script=os.path.join(network_script_base, 'create-bridge.sh'), - _id=_id, dev=vxlan, ip=ip) - if bridge: - tap = create_dev(script=os.path.join(network_script_base, 'create-tap.sh'), - _id=str(tap_id), dev=bridge) - if tap: - return tap - - -def random_bytes(num=6): - return [random.randrange(256) for _ in range(num)] - - -def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='%02x'): - mac = random_bytes() - if oui: - if type(oui) == str: - oui = [int(chunk) for chunk in oui.split(separator)] - mac = oui + random_bytes(num=6 - len(oui)) - else: - if multicast: - mac[0] |= 1 # set bit 0 - else: - mac[0] &= ~1 # clear bit 0 - if uaa: - mac[0] &= ~(1 << 1) # clear bit 1 - else: - mac[0] |= 1 << 1 # set bit 1 - return separator.join(byte_fmt % b for b in mac) - - -def update_radvd_conf(etcd_client): - network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') - - networks = { - net.value['ipv6']: net.value['id'] - for net in etcd_client.get_prefix('/v1/network/', value_in_json=True) - if net.value.get('ipv6') - } - radvd_template = open(os.path.join(network_script_base, - 'radvd-template.conf'), 'r').read() - radvd_template = Template(radvd_template) - - content = [radvd_template.safe_substitute(bridge='br{}'.format(networks[net]), - prefix=net) - for net in networks if networks.get(net)] - - with open('/etc/radvd.conf', 'w') as radvd_conf: - radvd_conf.writelines(content) - try: - sp.check_output(['systemctl', 'restart', 'radvd']) - except Exception: - sp.check_output(['service', 'radvd', 'restart']) - - -def get_start_command_args(vm_entry, vnc_sock_filename: str, migration=False, migration_port=None): - threads_per_core = 1 - vm_memory = int(bitmath.parse_string_unsafe(vm_entry.specs["ram"]).to_MB()) - vm_cpus = int(vm_entry.specs["cpu"]) - vm_uuid = vm_entry.uuid - vm_networks = vm_entry.network - - command = "-name {}_{}".format(vm_entry.owner, vm_entry.name) - - command += " -drive file={},format=raw,if=virtio,cache=none".format( - image_storage_handler.qemu_path_string(vm_uuid) - ) - command += " -device virtio-rng-pci -vnc unix:{}".format(vnc_sock_filename) - command += " -m {} -smp cores={},threads={}".format( - vm_memory, vm_cpus, threads_per_core - ) - - if migration: - command += " -incoming tcp:[::]:{}".format(migration_port) - - tap = None - for network_mac_and_tap in vm_networks: - network_name, mac, tap = network_mac_and_tap - - _key = os.path.join(config['etcd']['network_prefix'], vm_entry.owner, network_name) - network = etcd_client.get(_key, value_in_json=True) - network_type = network.value["type"] - network_id = str(network.value["id"]) - network_ipv6 = network.value["ipv6"] - - if network_type == "vxlan": - tap = create_vxlan_br_tap(_id=network_id, - _dev=config['network']['vxlan_phy_dev'], - tap_id=tap, - ip=network_ipv6) - update_radvd_conf(etcd_client) - - command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" \ - " -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}" \ - .format(tap=tap, net_id=network_id, mac=mac) - - return command.split(" ") - - -def create_vm_object(vm_entry, migration=False, migration_port=None): - # NOTE: If migration suddenly stop working, having different - # VNC unix filename on source and destination host can - # be a possible cause of it. - - # REQUIREMENT: Use Unix Socket instead of TCP Port for VNC - vnc_sock_file = tempfile.NamedTemporaryFile() - - qemu_args = get_start_command_args( - vm_entry=vm_entry, - vnc_sock_filename=vnc_sock_file.name, - migration=migration, - migration_port=migration_port, - ) - qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args) - return VM(vm_entry.key, qemu_machine, vnc_sock_file) - - -def get_vm(vm_list: list, vm_key) -> Union[VM, None]: - return next((vm for vm in vm_list if vm.key == vm_key), None) + return wrapper def need_running_vm(func): @wraps(func) - def wrapper(e): - vm = get_vm(running_vms, e.key) + def wrapper(self, e): + vm = self.get_vm(self.running_vms, e.key) if vm: try: status = vm.handle.command("query-status") @@ -242,143 +71,336 @@ def need_running_vm(func): return wrapper -def create(vm_entry: VMEntry): - if image_storage_handler.is_vm_image_exists(vm_entry.uuid): - # File Already exists. No Problem Continue - logger.debug("Image for vm %s exists", vm_entry.uuid) - else: - vm_hdd = int(bitmath.parse_string_unsafe(vm_entry.specs["os-ssd"]).to_MB()) - if image_storage_handler.make_vm_image(src=vm_entry.image_uuid, dest=vm_entry.uuid): - if not image_storage_handler.resize_vm_image(path=vm_entry.uuid, size=vm_hdd): - vm_entry.status = VMStatus.error - else: - logger.info("New VM Created") +class VMM: + def __init__(self): + self.etcd_client = shared.etcd_client + self.storage_handler = shared.storage_handler + self.running_vms = [] + def get_start_command_args(self, vm_entry, vnc_sock_filename: str, migration=False, migration_port=None): + threads_per_core = 1 + vm_memory = int(bitmath.parse_string_unsafe(vm_entry.specs['ram']).to_MB()) + vm_cpus = int(vm_entry.specs['cpu']) + vm_uuid = vm_entry.uuid + vm_networks = vm_entry.network -def start(vm_entry: VMEntry, destination_host_key=None, migration_port=None): - _vm = get_vm(running_vms, vm_entry.key) + command = '-name {}_{}'.format(vm_entry.owner, vm_entry.name) - # VM already running. No need to proceed further. - if _vm: - logger.info("VM %s already running" % vm_entry.uuid) - return - else: - logger.info("Trying to start %s" % vm_entry.uuid) - if destination_host_key: - launch_vm(vm_entry, migration=True, migration_port=migration_port, - destination_host_key=destination_host_key) - else: - create(vm_entry) - launch_vm(vm_entry) - - -@need_running_vm -def stop(vm_entry): - vm = get_vm(running_vms, vm_entry.key) - vm.handle.shutdown() - if not vm.handle.is_running(): - vm_entry.add_log("Shutdown successfully") - vm_entry.declare_stopped() - vm_pool.put(vm_entry) - running_vms.remove(vm) - delete_vm_network(vm_entry) - - -def delete(vm_entry): - logger.info("Deleting VM | %s", vm_entry) - stop(vm_entry) - - if image_storage_handler.is_vm_image_exists(vm_entry.uuid): - r_status = image_storage_handler.delete_vm_image(vm_entry.uuid) - if r_status: - etcd_client.client.delete(vm_entry.key) - else: - etcd_client.client.delete(vm_entry.key) - -def transfer(request_event): - # This function would run on source host i.e host on which the vm - # is running initially. This host would be responsible for transferring - # vm state to destination host. - - _host, _port = request_event.parameters["host"], request_event.parameters["port"] - _uuid = request_event.uuid - _destination = request_event.destination_host_key - vm = get_vm(running_vms, join_path(config['etcd']['vm_prefix'], _uuid)) - - if vm: - tunnel = sshtunnel.SSHTunnelForwarder( - _host, - ssh_username=config['ssh']['username'], - ssh_pkey=config['ssh']['private_key_path'], - remote_bind_address=("127.0.0.1", _port), - ssh_proxy_enabled=True, - ssh_proxy=(_host, 22) + command += ' -drive file={},format=raw,if=virtio,cache=none'.format( + self.storage_handler.qemu_path_string(vm_uuid) ) - try: - tunnel.start() - except sshtunnel.BaseSSHTunnelForwarderError: - logger.exception("Couldn't establish connection to (%s, 22)", _host) + command += ' -device virtio-rng-pci -vnc unix:{}'.format(vnc_sock_filename) + command += ' -m {} -smp cores={},threads={}'.format( + vm_memory, vm_cpus, threads_per_core + ) + + if migration: + command += ' -incoming tcp:[::]:{}'.format(migration_port) + + for network_mac_and_tap in vm_networks: + network_name, mac, tap = network_mac_and_tap + + _key = os.path.join(settings['etcd']['network_prefix'], vm_entry.owner, network_name) + network = self.etcd_client.get(_key, value_in_json=True) + network_type = network.value["type"] + network_id = str(network.value["id"]) + network_ipv6 = network.value["ipv6"] + + if network_type == "vxlan": + tap = create_vxlan_br_tap(_id=network_id, + _dev=settings['network']['vxlan_phy_dev'], + tap_id=tap, + ip=network_ipv6) + all_networks = self.etcd_client.get_prefix('/v1/network/', value_in_json=True) + update_radvd_conf(all_networks) + + command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" \ + " -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}" \ + .format(tap=tap, net_id=network_id, mac=mac) + + return command.split(" ") + + def create_vm_object(self, vm_entry, migration=False, migration_port=None): + vnc_sock_file = tempfile.NamedTemporaryFile() + + qemu_args = self.get_start_command_args( + vm_entry=vm_entry, + vnc_sock_filename=vnc_sock_file.name, + migration=migration, + migration_port=migration_port, + ) + qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args) + return VM(vm_entry.key, qemu_machine, vnc_sock_file) + + @staticmethod + def get_vm(vm_list: list, vm_key) -> Union[VM, None]: + return next((vm for vm in vm_list if vm.key == vm_key), None) + + @capture_all_exception + def create(self, vm_entry: VMEntry): + if self.storage_handler.is_vm_image_exists(vm_entry.uuid): + # File Already exists. No Problem Continue + logger.debug("Image for vm %s exists", vm_entry.uuid) + return None else: - vm.handle.command( - "migrate", uri="tcp:0.0.0.0:{}".format(tunnel.local_bind_port) - ) + vm_hdd = int(bitmath.parse_string_unsafe(vm_entry.specs["os-ssd"]).to_MB()) + if self.storage_handler.make_vm_image(src=vm_entry.image_uuid, dest=vm_entry.uuid): + if not self.storage_handler.resize_vm_image(path=vm_entry.uuid, size=vm_hdd): + vm_entry.status = VMStatus.error + else: + logger.info("New VM Created") - status = vm.handle.command("query-migrate")["status"] - while status not in ["failed", "completed"]: - time.sleep(2) - status = vm.handle.command("query-migrate")["status"] + @capture_all_exception + def start(self, vm_entry: VMEntry, destination_host_key=None): + _vm = self.get_vm(self.running_vms, vm_entry.key) - with vm_pool.get_put(request_event.uuid) as source_vm: - if status == "failed": - source_vm.add_log("Migration Failed") - elif status == "completed": - # If VM is successfully migrated then shutdown the VM - # on this host and update hostname to destination host key - source_vm.add_log("Successfully migrated") - source_vm.hostname = _destination - running_vms.remove(vm) - vm.handle.shutdown() - source_vm.in_migration = False # VM transfer finished - finally: - tunnel.close() + # VM already running. No need to proceed further. + if _vm: + logger.info("VM %s already running" % vm_entry.uuid) + return + else: + logger.info("Trying to start %s" % vm_entry.uuid) + if destination_host_key: + migration_port = find_free_port() + self.launch_vm(vm_entry, migration=True, migration_port=migration_port, + destination_host_key=destination_host_key) + else: + self.create(vm_entry) + self.launch_vm(vm_entry) - -def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_key=None): - logger.info("Starting %s" % vm_entry.key) - - vm = create_vm_object(vm_entry, migration=migration, migration_port=migration_port) - try: - vm.handle.launch() - except Exception: - logger.exception("Error Occured while starting VM") + @need_running_vm + @capture_all_exception + def stop(self, vm_entry): + vm = self.get_vm(self.running_vms, vm_entry.key) vm.handle.shutdown() + if not vm.handle.is_running(): + vm_entry.add_log("Shutdown successfully") + vm_entry.declare_stopped() + shared.vm_pool.put(vm_entry) + self.running_vms.remove(vm) + delete_vm_network(vm_entry) - if migration: - # We don't care whether MachineError or any other error occurred - pass + @capture_all_exception + def delete(self, vm_entry): + logger.info("Deleting VM | %s", vm_entry) + self.stop(vm_entry) + + if self.storage_handler.is_vm_image_exists(vm_entry.uuid): + r_status = self.storage_handler.delete_vm_image(vm_entry.uuid) + if r_status: + shared.etcd_client.client.delete(vm_entry.key) else: - # Error during typical launch of a vm - vm.handle.shutdown() - vm_entry.declare_killed() - vm_pool.put(vm_entry) - else: - vm_entry.vnc_socket = vm.vnc_socket_file.name - running_vms.append(vm) + shared.etcd_client.client.delete(vm_entry.key) - if migration: - vm_entry.in_migration = True - r = RequestEntry.from_scratch( - type=RequestType.TransferVM, - hostname=vm_entry.hostname, - parameters={"host": get_ipv6_address(), "port": migration_port}, - uuid=vm_entry.uuid, - destination_host_key=destination_host_key, - request_prefix=config['etcd']['request_prefix'] + @capture_all_exception + def transfer(self, request_event): + # This function would run on source host i.e host on which the vm + # is running initially. This host would be responsible for transferring + # vm state to destination host. + + _host, _port = request_event.parameters["host"], request_event.parameters["port"] + _uuid = request_event.uuid + _destination = request_event.destination_host_key + vm = self.get_vm(self.running_vms, join_path(settings['etcd']['vm_prefix'], _uuid)) + + if vm: + tunnel = sshtunnel.SSHTunnelForwarder( + _host, + ssh_username=settings['ssh']['username'], + ssh_pkey=settings['ssh']['private_key_path'], + remote_bind_address=("127.0.0.1", _port), + ssh_proxy_enabled=True, + ssh_proxy=(_host, 22) ) - request_pool.put(r) - else: - # Typical launching of a vm - vm_entry.status = VMStatus.running - vm_entry.add_log("Started successfully") + try: + tunnel.start() + except sshtunnel.BaseSSHTunnelForwarderError: + logger.exception("Couldn't establish connection to (%s, 22)", _host) + else: + vm.handle.command( + "migrate", uri="tcp:0.0.0.0:{}".format(tunnel.local_bind_port) + ) - vm_pool.put(vm_entry) + status = vm.handle.command("query-migrate")["status"] + while status not in ["failed", "completed"]: + time.sleep(2) + status = vm.handle.command("query-migrate")["status"] + + with shared.vm_pool.get_put(request_event.uuid) as source_vm: + if status == "failed": + source_vm.add_log("Migration Failed") + elif status == "completed": + # If VM is successfully migrated then shutdown the VM + # on this host and update hostname to destination host key + source_vm.add_log("Successfully migrated") + source_vm.hostname = _destination + self.running_vms.remove(vm) + vm.handle.shutdown() + source_vm.in_migration = False # VM transfer finished + finally: + tunnel.close() + + @capture_all_exception + def launch_vm(self, vm_entry, migration=False, migration_port=None, destination_host_key=None): + logger.info("Starting %s" % vm_entry.key) + + vm = self.create_vm_object(vm_entry, migration=migration, migration_port=migration_port) + try: + vm.handle.launch() + except Exception: + logger.exception("Error Occured while starting VM") + vm.handle.shutdown() + + if migration: + # We don't care whether MachineError or any other error occurred + pass + else: + # Error during typical launch of a vm + vm.handle.shutdown() + vm_entry.declare_killed() + shared.vm_pool.put(vm_entry) + else: + vm_entry.vnc_socket = vm.vnc_socket_file.name + self.running_vms.append(vm) + + if migration: + vm_entry.in_migration = True + r = RequestEntry.from_scratch( + type=RequestType.TransferVM, + hostname=vm_entry.hostname, + parameters={"host": get_ipv6_address(), "port": migration_port}, + uuid=vm_entry.uuid, + destination_host_key=destination_host_key, + request_prefix=settings['etcd']['request_prefix'] + ) + shared.request_pool.put(r) + else: + # Typical launching of a vm + vm_entry.status = VMStatus.running + vm_entry.add_log("Started successfully") + + shared.vm_pool.put(vm_entry) + + @capture_all_exception + def maintenance(self, host): + # To capture vm running according to running_vms list + + # This is to capture successful migration of a VM. + # Suppose, this host is running "vm1" and user initiated + # request to migrate this "vm1" to some other host. On, + # successful migration the destination host would set + # the vm hostname to itself. Thus, we are checking + # whether this host vm is successfully migrated. If yes + # then we shutdown "vm1" on this host. + logger.debug("Starting Maintenance!!") + to_be_removed = [] + for running_vm in self.running_vms: + with shared.vm_pool.get_put(running_vm.key) as vm_entry: + if vm_entry.hostname != host.key and not vm_entry.in_migration: + running_vm.handle.shutdown() + logger.info("VM migration not completed successfully.") + to_be_removed.append(running_vm) + + for r in to_be_removed: + self.running_vms.remove(r) + + # To check vm running according to etcd entries + alleged_running_vms = shared.vm_pool.by_status("RUNNING", shared.vm_pool.by_host(host.key)) + + for vm_entry in alleged_running_vms: + _vm = self.get_vm(self.running_vms, vm_entry.key) + # Whether, the allegedly running vm is in our + # running_vms list or not if it is said to be + # running on this host but it is not then we + # need to shut it down + + # This is to capture poweroff/shutdown of a VM + # initiated by user inside VM. OR crash of VM by some + # user running process + if (_vm and not _vm.handle.is_running()) or not _vm: + logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running())) + vm_entry.add_log("""{} is not running but is said to be running. + So, shutting it down and declare it killed""".format(vm_entry.key)) + vm_entry.declare_killed() + shared.vm_pool.put(vm_entry) + if _vm: + self.running_vms.remove(_vm) + + +def resolve_network(network_name, network_owner): + network = shared.etcd_client.get(join_path(settings['etcd']['network_prefix'], + network_owner, + network_name), + value_in_json=True) + return network + + +def delete_vm_network(vm_entry): + try: + for network in vm_entry.network: + network_name = network[0] + tap_mac = network[1] + tap_id = network[2] + + delete_network_interface('tap{}'.format(tap_id)) + + owners_vms = shared.vm_pool.by_owner(vm_entry.owner) + owners_running_vms = shared.vm_pool.by_status(VMStatus.running, + _vms=owners_vms) + + networks = map( + lambda n: n[0], map(lambda vm: vm.network, owners_running_vms) + ) + networks_in_use_by_user_vms = [vm[0] for vm in networks] + if network_name not in networks_in_use_by_user_vms: + network_entry = resolve_network(network[0], vm_entry.owner) + if network_entry: + network_type = network_entry.value["type"] + network_id = network_entry.value["id"] + if network_type == "vxlan": + delete_network_interface('br{}'.format(network_id)) + delete_network_interface('vxlan{}'.format(network_id)) + except Exception: + logger.exception("Exception in network interface deletion") + + +def create_vxlan_br_tap(_id, _dev, tap_id, ip=None): + network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') + vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'), + _id=_id, dev=_dev) + if vxlan: + bridge = create_dev(script=os.path.join(network_script_base, 'create-bridge.sh'), + _id=_id, dev=vxlan, ip=ip) + if bridge: + tap = create_dev(script=os.path.join(network_script_base, 'create-tap.sh'), + _id=str(tap_id), dev=bridge) + if tap: + return tap + + +def update_radvd_conf(all_networks): + network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') + + networks = { + net.value['ipv6']: net.value['id'] + for net in all_networks + if net.value.get('ipv6') + } + radvd_template = open(os.path.join(network_script_base, + 'radvd-template.conf'), 'r').read() + radvd_template = Template(radvd_template) + + content = [ + radvd_template.safe_substitute( + bridge='br{}'.format(networks[net]), + prefix=net + ) + for net in networks if networks.get(net) + ] + + with open('/etc/radvd.conf', 'w') as radvd_conf: + radvd_conf.writelines(content) + try: + sp.check_output(['systemctl', 'restart', 'radvd']) + except Exception: + sp.check_output(['service', 'radvd', 'restart']) diff --git a/ucloud/imagescanner/main.py b/ucloud/imagescanner/main.py index 135f8cb..d164ea3 100755 --- a/ucloud/imagescanner/main.py +++ b/ucloud/imagescanner/main.py @@ -5,7 +5,8 @@ import sys from os.path import isdir from os.path import join as join_path -from ucloud.config import etcd_client, config, image_storage_handler +from ucloud.settings import settings +from ucloud.shared import shared from ucloud.imagescanner import logger @@ -22,9 +23,9 @@ def qemu_img_type(path): def check(): """ check whether settings are sane, refuse to start if they aren't """ - if config['storage']['storage_backend'] == 'filesystem' and not isdir(config['storage']['image_dir']): + if settings['storage']['storage_backend'] == 'filesystem' and not isdir(settings['storage']['image_dir']): sys.exit("You have set STORAGE_BACKEND to filesystem, but " - "{} does not exist. Refusing to start".format(config['storage']['image_dir']) + "{} does not exist. Refusing to start".format(settings['storage']['image_dir']) ) try: @@ -36,7 +37,7 @@ def check(): def main(): # We want to get images entries that requests images to be created - images = etcd_client.get_prefix(config['etcd']['image_prefix'], value_in_json=True) + images = shared.etcd_client.get_prefix(settings['etcd']['image_prefix'], value_in_json=True) images_to_be_created = list(filter(lambda im: im.value['status'] == 'TO_BE_CREATED', images)) for image in images_to_be_created: @@ -45,9 +46,9 @@ def main(): image_owner = image.value['owner'] image_filename = image.value['filename'] image_store_name = image.value['store_name'] - image_full_path = join_path(config['storage']['file_dir'], image_owner, image_filename) + image_full_path = join_path(settings['storage']['file_dir'], image_owner, image_filename) - image_stores = etcd_client.get_prefix(config['etcd']['image_store_prefix'], + image_stores = shared.etcd_client.get_prefix(settings['etcd']['image_store_prefix'], value_in_json=True) user_image_store = next(filter( lambda s, store_name=image_store_name: s.value["name"] == store_name, @@ -71,18 +72,18 @@ def main(): logger.exception(e) else: # Import and Protect - r_status = image_storage_handler.import_image(src="image.raw", + r_status = shared.storage_handler.import_image(src="image.raw", dest=image_uuid, protect=True) if r_status: # Everything is successfully done image.value["status"] = "CREATED" - etcd_client.put(image.key, json.dumps(image.value)) + shared.etcd_client.put(image.key, json.dumps(image.value)) else: # The user provided image is either not found or of invalid format image.value["status"] = "INVALID_IMAGE" - etcd_client.put(image.key, json.dumps(image.value)) + shared.etcd_client.put(image.key, json.dumps(image.value)) try: os.remove("image.raw") diff --git a/ucloud/metadata/main.py b/ucloud/metadata/main.py index 16b7c6d..5526084 100644 --- a/ucloud/metadata/main.py +++ b/ucloud/metadata/main.py @@ -2,15 +2,15 @@ import os from flask import Flask, request from flask_restful import Resource, Api - -from ucloud.config import etcd_client, config, vm_pool +from ucloud.settings import settings +from ucloud.shared import shared app = Flask(__name__) api = Api(app) def get_vm_entry(mac_addr): - return next(filter(lambda vm: mac_addr in list(zip(*vm.network))[1], vm_pool.vms), None) + return next(filter(lambda vm: mac_addr in list(zip(*vm.network))[1], shared.vm_pool.vms), None) # https://stackoverflow.com/questions/37140846/how-to-convert-ipv6-link-local-address-to-mac-address-in-python @@ -43,10 +43,10 @@ class Root(Resource): if not data: return {'message': 'Metadata for such VM does not exists.'}, 404 else: - etcd_key = os.path.join(config['etcd']['user_prefix'], + etcd_key = os.path.join(settings['etcd']['user_prefix'], data.value['owner_realm'], data.value['owner'], 'key') - etcd_entry = etcd_client.get_prefix(etcd_key, value_in_json=True) + etcd_entry = shared.etcd_client.get_prefix(etcd_key, value_in_json=True) user_personal_ssh_keys = [key.value for key in etcd_entry] data.value['metadata']['ssh-keys'] += user_personal_ssh_keys return data.value['metadata'], 200 diff --git a/ucloud/scheduler/helper.py b/ucloud/scheduler/helper.py index 560bdbc..643e8e9 100755 --- a/ucloud/scheduler/helper.py +++ b/ucloud/scheduler/helper.py @@ -6,7 +6,8 @@ import bitmath from ucloud.common.host import HostStatus from ucloud.common.request import RequestEntry, RequestType from ucloud.common.vm import VMStatus -from ucloud.config import vm_pool, host_pool, request_pool, config +from ucloud.shared import shared +from ucloud.settings import settings def accumulated_specs(vms_specs): @@ -46,14 +47,14 @@ class NoSuitableHostFound(Exception): def get_suitable_host(vm_specs, hosts=None): if hosts is None: - hosts = host_pool.by_status(HostStatus.alive) + hosts = shared.host_pool.by_status(HostStatus.alive) for host in hosts: # Filter them by host_name - vms = vm_pool.by_host(host.key) + vms = shared.vm_pool.by_host(host.key) # Filter them by status - vms = vm_pool.by_status(VMStatus.running, vms) + vms = shared.vm_pool.by_status(VMStatus.running, vms) running_vms_specs = [vm.specs for vm in vms] @@ -75,7 +76,7 @@ def get_suitable_host(vm_specs, hosts=None): def dead_host_detection(): # Bring out your dead! - Monty Python and the Holy Grail - hosts = host_pool.by_status(HostStatus.alive) + hosts = shared.host_pool.by_status(HostStatus.alive) dead_hosts_keys = [] for host in hosts: @@ -89,25 +90,25 @@ def dead_host_detection(): def dead_host_mitigation(dead_hosts_keys): for host_key in dead_hosts_keys: - host = host_pool.get(host_key) + host = shared.host_pool.get(host_key) host.declare_dead() - vms_hosted_on_dead_host = vm_pool.by_host(host_key) + vms_hosted_on_dead_host = shared.vm_pool.by_host(host_key) for vm in vms_hosted_on_dead_host: vm.declare_killed() - vm_pool.put(vm) - host_pool.put(host) + shared.vm_pool.put(vm) + shared.host_pool.put(host) def assign_host(vm): vm.hostname = get_suitable_host(vm.specs) - vm_pool.put(vm) + shared.vm_pool.put(vm) r = RequestEntry.from_scratch(type=RequestType.StartVM, uuid=vm.uuid, hostname=vm.hostname, - request_prefix=config['etcd']['request_prefix']) - request_pool.put(r) + request_prefix=settings['etcd']['request_prefix']) + shared.request_pool.put(r) vm.log.append("VM scheduled for starting") return vm.hostname diff --git a/ucloud/scheduler/main.py b/ucloud/scheduler/main.py index 49d6291..3412545 100755 --- a/ucloud/scheduler/main.py +++ b/ucloud/scheduler/main.py @@ -5,8 +5,8 @@ # maybe expose a prometheus compatible output from ucloud.common.request import RequestEntry, RequestType -from ucloud.config import etcd_client -from ucloud.config import host_pool, request_pool, vm_pool, config +from ucloud.shared import shared +from ucloud.settings import settings from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection, assign_host, NoSuitableHostFound) from . import logger @@ -16,8 +16,8 @@ def main(): pending_vms = [] for request_iterator in [ - etcd_client.get_prefix(config['etcd']['request_prefix'], value_in_json=True), - etcd_client.watch_prefix(config['etcd']['request_prefix'], timeout=5, value_in_json=True), + shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True), + shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], timeout=5, value_in_json=True), ]: for request_event in request_iterator: request_entry = RequestEntry(request_event) @@ -44,17 +44,17 @@ def main(): r = RequestEntry.from_scratch(type="ScheduleVM", uuid=pending_vm_entry.uuid, hostname=pending_vm_entry.hostname, - request_prefix=config['etcd']['request_prefix']) - request_pool.put(r) + request_prefix=settings['etcd']['request_prefix']) + shared.request_pool.put(r) elif request_entry.type == RequestType.ScheduleVM: logger.debug("%s, %s", request_entry.key, request_entry.value) - vm_entry = vm_pool.get(request_entry.uuid) + vm_entry = shared.vm_pool.get(request_entry.uuid) if vm_entry is None: logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid)) continue - etcd_client.client.delete(request_entry.key) # consume Request + shared.etcd_client.client.delete(request_entry.key) # consume Request # If the Request is about a VM which is labelled as "migration" # and has a destination @@ -62,7 +62,7 @@ def main(): and hasattr(request_entry, "destination") and request_entry.destination: try: get_suitable_host(vm_specs=vm_entry.specs, - hosts=[host_pool.get(request_entry.destination)]) + hosts=[shared.host_pool.get(request_entry.destination)]) except NoSuitableHostFound: logger.info("Requested destination host doesn't have enough capacity" "to hold %s" % vm_entry.uuid) @@ -70,8 +70,8 @@ def main(): r = RequestEntry.from_scratch(type=RequestType.InitVMMigration, uuid=request_entry.uuid, destination=request_entry.destination, - request_prefix=config['etcd']['request_prefix']) - request_pool.put(r) + request_prefix=settings['etcd']['request_prefix']) + shared.request_pool.put(r) # If the Request is about a VM that just want to get started/created else: @@ -81,7 +81,7 @@ def main(): assign_host(vm_entry) except NoSuitableHostFound: vm_entry.add_log("Can't schedule VM. No Resource Left.") - vm_pool.put(vm_entry) + shared.vm_pool.put(vm_entry) pending_vms.append(vm_entry) logger.info("No Resource Left. Emailing admin....") diff --git a/ucloud/scheduler/main.py.old b/ucloud/scheduler/main.py.old deleted file mode 100755 index e2c975a..0000000 --- a/ucloud/scheduler/main.py.old +++ /dev/null @@ -1,93 +0,0 @@ -# TODO -# 1. send an email to an email address defined by env['admin-email'] -# if resources are finished -# 2. Introduce a status endpoint of the scheduler - -# maybe expose a prometheus compatible output - -from ucloud.common.request import RequestEntry, RequestType -from ucloud.config import etcd_client -from ucloud.config import host_pool, request_pool, vm_pool, env_vars -from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection, - assign_host, NoSuitableHostFound) -from . import logger - - -def main(): - logger.info("%s SESSION STARTED %s", '*' * 5, '*' * 5) - - pending_vms = [] - - for request_iterator in [ - etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True), - etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=5, value_in_json=True), - ]: - for request_event in request_iterator: - request_entry = RequestEntry(request_event) - # Never Run time critical mechanism inside timeout - # mechanism because timeout mechanism only comes - # when no other event is happening. It means under - # heavy load there would not be a timeout event. - if request_entry.type == "TIMEOUT": - - # Detect hosts that are dead and set their status - # to "DEAD", and their VMs' status to "KILLED" - dead_hosts = dead_host_detection() - if dead_hosts: - logger.debug("Dead hosts: %s", dead_hosts) - dead_host_mitigation(dead_hosts) - - # If there are VMs that weren't assigned a host - # because there wasn't a host available which - # meets requirement of that VM then we would - # create a new ScheduleVM request for that VM - # on our behalf. - while pending_vms: - pending_vm_entry = pending_vms.pop() - r = RequestEntry.from_scratch(type="ScheduleVM", - uuid=pending_vm_entry.uuid, - hostname=pending_vm_entry.hostname, - request_prefix=env_vars.get("REQUEST_PREFIX")) - request_pool.put(r) - - elif request_entry.type == RequestType.ScheduleVM: - logger.debug("%s, %s", request_entry.key, request_entry.value) - - vm_entry = vm_pool.get(request_entry.uuid) - if vm_entry is None: - logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid)) - continue - etcd_client.client.delete(request_entry.key) # consume Request - - # If the Request is about a VM which is labelled as "migration" - # and has a destination - if hasattr(request_entry, "migration") and request_entry.migration \ - and hasattr(request_entry, "destination") and request_entry.destination: - try: - get_suitable_host(vm_specs=vm_entry.specs, - hosts=[host_pool.get(request_entry.destination)]) - except NoSuitableHostFound: - logger.info("Requested destination host doesn't have enough capacity" - "to hold %s" % vm_entry.uuid) - else: - r = RequestEntry.from_scratch(type=RequestType.InitVMMigration, - uuid=request_entry.uuid, - destination=request_entry.destination, - request_prefix=env_vars.get("REQUEST_PREFIX")) - request_pool.put(r) - - # If the Request is about a VM that just want to get started/created - else: - # assign_host only returns None when we couldn't be able to assign - # a host to a VM because of resource constraints - try: - assign_host(vm_entry) - except NoSuitableHostFound: - vm_entry.add_log("Can't schedule VM. No Resource Left.") - vm_pool.put(vm_entry) - - pending_vms.append(vm_entry) - logger.info("No Resource Left. Emailing admin....") - - -if __name__ == "__main__": - main() diff --git a/ucloud/settings/__init__.py b/ucloud/settings/__init__.py index 5f29c41..2c77300 100644 --- a/ucloud/settings/__init__.py +++ b/ucloud/settings/__init__.py @@ -23,28 +23,28 @@ class CustomConfigParser(configparser.RawConfigParser): class Settings(object): def __init__(self, config_key='/uncloud/config/'): conf_name = 'ucloud.conf' - conf_dir = os.environ.get('UCLOUD_CONF_DIR', '/etc/ucloud') - config_file = os.path.join(conf_dir, conf_name) + conf_dir = os.environ.get('UCLOUD_CONF_DIR', os.path.expanduser('~/ucloud/')) + self.config_file = os.path.join(conf_dir, conf_name) self.config_parser = CustomConfigParser(allow_no_value=True) self.config_key = config_key self.read_internal_values() - self.read_config_file_values(config_file) + self.config_parser.read(self.config_file) - self.etcd_wrapper_args = tuple() - self.etcd_wrapper_kwargs = { - 'host': self.config_parser['etcd']['url'], - 'port': self.config_parser['etcd']['port'], - 'ca_cert': self.config_parser['etcd']['ca_cert'], - 'cert_cert': self.config_parser['etcd']['cert_cert'], - 'cert_key': self.config_parser['etcd']['cert_key'] - } - - def get_etcd_client(self): - args = self.etcd_wrapper_args - kwargs = self.etcd_wrapper_kwargs + args = tuple() + try: + kwargs = { + 'host': self.config_parser.get('etcd', 'url'), + 'port': self.config_parser.get('etcd', 'port'), + 'ca_cert': self.config_parser.get('etcd', 'ca_cert'), + 'cert_cert': self.config_parser.get('etcd','cert_cert'), + 'cert_key': self.config_parser.get('etcd','cert_key') + } + except configparser.Error as err: + raise configparser.Error('{} in config file {}'.format(err.message, self.config_file)) from err + return Etcd3Wrapper(*args, **kwargs) def read_internal_values(self): @@ -78,9 +78,11 @@ class Settings(object): if config_from_etcd: self.config_parser.read_dict(config_from_etcd.value) else: - return - sys.exit("No settings found in etcd at key {}".format(self.config_key)) - + raise KeyError("Key '{}' not found in etcd".format(self.config_key)) + def __getitem__(self, key): self.read_values_from_etcd() return self.config_parser[key] + + +settings = Settings() diff --git a/ucloud/shared/__init__.py b/ucloud/shared/__init__.py new file mode 100644 index 0000000..7a296e9 --- /dev/null +++ b/ucloud/shared/__init__.py @@ -0,0 +1,30 @@ +from ucloud.settings import settings +from ucloud.common.vm import VmPool +from ucloud.common.host import HostPool +from ucloud.common.request import RequestPool +from ucloud.common.storage_handlers import get_storage_handler + + +class Shared: + @property + def etcd_client(self): + return settings.get_etcd_client() + + @property + def host_pool(self): + return HostPool(self.etcd_client, settings['etcd']['host_prefix']) + + @property + def vm_pool(self): + return VmPool(self.etcd_client, settings['etcd']['vm_prefix']) + + @property + def request_pool(self): + return RequestPool(self.etcd_client, settings['etcd']['request_prefix']) + + @property + def storage_handler(self): + return get_storage_handler() + + +shared = Shared()