diff --git a/setup.py b/setup.py index 2c1c2cb..51d21b8 100644 --- a/setup.py +++ b/setup.py @@ -40,7 +40,7 @@ setup(name='ucloud', 'colorama', 'sphinx-rtd-theme', 'etcd3 @ https://github.com/kragniz/python-etcd3/tarball/master#egg=etcd3', - 'werkzeug' + 'werkzeug', 'marshmallow' ], scripts=['scripts/ucloud'], data_files=[(os.path.expanduser('~/ucloud/'), ['conf/ucloud.conf'])], diff --git a/ucloud/api/main.py b/ucloud/api/main.py index 92c73f5..91cbead 100644 --- a/ucloud/api/main.py +++ b/ucloud/api/main.py @@ -1,7 +1,6 @@ import json import pynetbox import logging -import urllib3 from uuid import uuid4 from os.path import join as join_path @@ -78,6 +77,7 @@ class CreateVM(Resource): "vnc_socket": "", "network": list(zip(data["network"], macs, tap_ids)), "metadata": {"ssh-keys": []}, + "in_migration": False } shared.etcd_client.put(vm_key, vm_entry, value_in_json=True) @@ -216,16 +216,13 @@ class VMMigration(Resource): if validator.is_valid(): vm = shared.vm_pool.get(data["uuid"]) + r = RequestEntry.from_scratch(type=RequestType.InitVMMigration, + uuid=vm.uuid, + hostname=join_path( + settings['etcd']['host_prefix'], validator.destination.value + ), + request_prefix=settings['etcd']['request_prefix']) - r = RequestEntry.from_scratch( - type=RequestType.ScheduleVM, - uuid=vm.uuid, - destination=join_path( - settings['etcd']['host_prefix'], validator.destination.value - ), - migration=True, - request_prefix=settings['etcd']['request_prefix'] - ) shared.request_pool.put(r) return {"message": "VM Migration Initialization Queued"}, 200 else: diff --git a/ucloud/common/network.py b/ucloud/common/network.py index df7151b..629e92a 100644 --- a/ucloud/common/network.py +++ b/ucloud/common/network.py @@ -30,7 +30,7 @@ def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt=' def create_dev(script, _id, dev, ip=None): - command = [script, _id, dev] + command = [script, str(_id), dev] if ip: command.append(ip) try: diff --git a/ucloud/common/request.py b/ucloud/common/request.py index 1e4594d..2d9be44 100644 --- a/ucloud/common/request.py +++ b/ucloud/common/request.py @@ -19,6 +19,7 @@ class RequestType: class RequestEntry(SpecificEtcdEntryBase): def __init__(self, e): + self.destination_host_key = None self.type = None # type: str self.migration = None # type: bool self.destination = None # type: str diff --git a/ucloud/common/schemas.py b/ucloud/common/schemas.py new file mode 100644 index 0000000..a592ec2 --- /dev/null +++ b/ucloud/common/schemas.py @@ -0,0 +1,39 @@ +import bitmath + +from marshmallow import fields, Schema + + +class StorageUnit(fields.Field): + def _serialize(self, value, attr, obj, **kwargs): + return str(value) + + def _deserialize(self, value, attr, data, **kwargs): + return bitmath.parse_string_unsafe(value) + + +class SpecsSchema(Schema): + cpu = fields.Int() + ram = StorageUnit() + os_ssd = StorageUnit(data_key='os-ssd', attribute='os-ssd') + hdd = fields.List(StorageUnit()) + + +class VMSchema(Schema): + name = fields.Str() + owner = fields.Str() + owner_realm = fields.Str() + specs = fields.Nested(SpecsSchema) + status = fields.Str() + log = fields.List(fields.Str()) + vnc_socket = fields.Str() + image_uuid = fields.Str() + hostname = fields.Str() + metadata = fields.Dict() + network = fields.List(fields.Tuple((fields.Str(), fields.Str(), fields.Int()))) + in_migration = fields.Bool() + + +class NetworkSchema(Schema): + _id = fields.Int(data_key='id', attribute='id') + _type = fields.Str(data_key='type', attribute='type') + ipv6 = fields.Str() diff --git a/ucloud/common/storage_handlers.py b/ucloud/common/storage_handlers.py index 4a17ec7..d2190ba 100644 --- a/ucloud/common/storage_handlers.py +++ b/ucloud/common/storage_handlers.py @@ -19,8 +19,8 @@ class ImageStorageHandler(ABC): def import_image(self, image_src, image_dest, protect=False): """Put an image at the destination - :param src: An Image file - :param dest: A path where :param src: is to be put. + :param image_src: An Image file + :param image_dest: A path where :param src: is to be put. :param protect: If protect is true then the dest is protect (readonly etc) The obj must exist on filesystem. """ @@ -30,8 +30,8 @@ class ImageStorageHandler(ABC): def make_vm_image(self, image_path, path): """Copy image from src to dest - :param src: A path - :param dest: A path + :param image_path: A path + :param path: A path src and destination must be on same storage system i.e both on file system or both on CEPH etc. """ diff --git a/ucloud/common/vm.py b/ucloud/common/vm.py index 0fb5cea..238f19d 100644 --- a/ucloud/common/vm.py +++ b/ucloud/common/vm.py @@ -12,6 +12,12 @@ class VMStatus: error = "ERROR" # An error occurred that cannot be resolved automatically +def declare_stopped(vm): + vm['hostname'] = '' + vm['in_migration'] = False + vm['status'] = VMStatus.stopped + + class VMEntry(SpecificEtcdEntryBase): def __init__(self, e): diff --git a/ucloud/host/main.py b/ucloud/host/main.py index be4f501..8a7dbe7 100755 --- a/ucloud/host/main.py +++ b/ucloud/host/main.py @@ -1,17 +1,16 @@ import argparse import multiprocessing as mp import time -import sys from ucloud.common.request import RequestEntry, RequestType -from ucloud.common.host import HostPool from ucloud.shared import shared from ucloud.settings import settings +from ucloud.common.vm import VMStatus +from ucloud.vmm import VMM +from os.path import join as join_path from . import virtualmachine, logger -vmm = virtualmachine.VMM() - def update_heartbeat(hostname): """Update Last HeartBeat Time for :param hostname: in etcd""" @@ -25,6 +24,16 @@ def update_heartbeat(hostname): time.sleep(10) +def maintenance(): + vmm = VMM() + running_vms = vmm.discover() + for vm_uuid in running_vms: + if vmm.is_running(vm_uuid) and vmm.get_status(vm_uuid) == 'running': + vm = shared.vm_pool.get(join_path(settings['etcd']['vm_prefix'], vm_uuid)) + vm.status = VMStatus.running + shared.vm_pool.put(vm) + + def main(hostname): host_pool = shared.host_pool host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) @@ -34,8 +43,7 @@ def main(hostname): heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,)) heartbeat_updating_process.start() except Exception as e: - logger.exception(e) - sys.exit("No Need To Go Further. ucloud-host heartbeat updating mechanism is not working") + raise e.__class__('ucloud-host heartbeat updating mechanism is not working') from e for events_iterator in [ shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True), @@ -45,36 +53,37 @@ def main(hostname): request_event = RequestEntry(request_event) if request_event.type == "TIMEOUT": - vmm.maintenance(host) - continue + maintenance() - # If the event is directed toward me OR I am destination of a InitVMMigration - if request_event.hostname == host.key or request_event.destination == host.key: + if request_event.hostname == host.key: logger.debug("VM Request: %s", request_event) shared.request_pool.client.client.delete(request_event.key) - vm_entry = shared.vm_pool.get(request_event.uuid) + vm_entry = shared.etcd_client.get(join_path(settings['etcd']['vm_prefix'], request_event.uuid)) if vm_entry: + vm = virtualmachine.VM(vm_entry) if request_event.type == RequestType.StartVM: - vmm.start(vm_entry) + vm.start() elif request_event.type == RequestType.StopVM: - vmm.stop(vm_entry) + vm.stop() elif request_event.type == RequestType.DeleteVM: - vmm.delete(vm_entry) + vm.delete() elif request_event.type == RequestType.InitVMMigration: - vmm.start(vm_entry, host.key) + vm.start(destination_host_key=host.key) elif request_event.type == RequestType.TransferVM: - vmm.transfer(request_event) + host = host_pool.get(request_event.destination_host_key) + if host: + vm.migrate(destination=host.hostname) + else: + logger.error('Host %s not found!', request_event.destination_host_key) else: logger.info("VM Entry missing") - logger.info("Running VMs %s", vmm.running_vms) - if __name__ == "__main__": argparser = argparse.ArgumentParser() diff --git a/ucloud/host/virtualmachine.py b/ucloud/host/virtualmachine.py index dbd13de..6d25205 100755 --- a/ucloud/host/virtualmachine.py +++ b/ucloud/host/virtualmachine.py @@ -6,344 +6,189 @@ import os import subprocess as sp -import tempfile -import time import ipaddress -from functools import wraps from string import Template -from typing import Union from os.path import join as join_path -import bitmath -import sshtunnel - -from ucloud.common.helpers import get_ipv6_address from ucloud.common.request import RequestEntry, RequestType -from ucloud.common.vm import VMEntry, VMStatus -from ucloud.common.network import create_dev, delete_network_interface, find_free_port +from ucloud.common.vm import VMStatus, declare_stopped +from ucloud.common.network import create_dev, delete_network_interface +from ucloud.common.schemas import VMSchema, NetworkSchema from ucloud.host import logger from ucloud.shared import shared from ucloud.settings import settings +from ucloud.vmm import VMM -from . import qmp +from marshmallow import ValidationError + + +def maintenance(): + pass class VM: - def __init__(self, key, handle, vnc_socket_file): - self.key = key # type: str - self.handle = handle # type: qmp.QEMUMachine - self.vnc_socket_file = vnc_socket_file # type: tempfile.NamedTemporaryFile - - def __repr__(self): - return "VM({})".format(self.key) - - -def capture_all_exception(func): - @wraps(func) - def wrapper(*args, **kwargs): + def __init__(self, vm_entry): + self.schema = VMSchema() + self.vmm = VMM() + self.key = vm_entry.key try: - func(*args, **kwargs) - except Exception: - logger.exception('Unhandled exception occur in %s. For more details see Syslog.', __name__) - - return wrapper - - -class VMM: - def __init__(self): - self.etcd_client = shared.etcd_client - self.storage_handler = shared.storage_handler - self.running_vms = [] - - def get_start_command_args(self, vm_entry, vnc_sock_filename: str, migration=False, migration_port=None): - threads_per_core = 1 - vm_memory = int(bitmath.parse_string_unsafe(vm_entry.specs['ram']).to_MB()) - vm_cpus = int(vm_entry.specs['cpu']) - vm_uuid = vm_entry.uuid - vm_networks = vm_entry.network - - command = '-name {}_{}'.format(vm_entry.owner, vm_entry.name) - - command += ' -drive file={},format=raw,if=virtio,cache=none'.format( - self.storage_handler.qemu_path_string(vm_uuid) - ) - command += ' -device virtio-rng-pci -vnc unix:{}'.format(vnc_sock_filename) - command += ' -m {} -smp cores={},threads={}'.format( - vm_memory, vm_cpus, threads_per_core - ) - - if migration: - command += ' -incoming tcp:[::]:{}'.format(migration_port) - - for network_mac_and_tap in vm_networks: - network_name, mac, tap = network_mac_and_tap - - _key = os.path.join(settings['etcd']['network_prefix'], vm_entry.owner, network_name) - network = self.etcd_client.get(_key, value_in_json=True) - network_type = network.value["type"] - network_id = str(network.value["id"]) - network_ipv6 = network.value["ipv6"] - - if network_type == "vxlan": - tap = create_vxlan_br_tap(_id=network_id, - _dev=settings['network']['vxlan_phy_dev'], - tap_id=tap, - ip=network_ipv6) - all_networks = self.etcd_client.get_prefix('/v1/network/', value_in_json=True) - - if ipaddress.ip_network(network_ipv6).is_global: - update_radvd_conf(all_networks) - - command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" \ - " -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}" \ - .format(tap=tap, net_id=network_id, mac=mac) - - return command.split(" ") - - def create_vm_object(self, vm_entry, migration=False, migration_port=None): - vnc_sock_file = tempfile.NamedTemporaryFile() - - qemu_args = self.get_start_command_args( - vm_entry=vm_entry, - vnc_sock_filename=vnc_sock_file.name, - migration=migration, - migration_port=migration_port, - ) - qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args) - return VM(vm_entry.key, qemu_machine, vnc_sock_file) - - @staticmethod - def get_vm(vm_list: list, vm_key) -> Union[VM, None]: - return next((vm for vm in vm_list if vm.key == vm_key), None) - - @capture_all_exception - def create(self, vm_entry: VMEntry): - if self.storage_handler.is_vm_image_exists(vm_entry.uuid): - # File Already exists. No Problem Continue - logger.debug("Image for vm %s exists", vm_entry.uuid) - return None + self.vm = self.schema.loads(vm_entry.value) + except ValidationError: + logger.exception('Couldn\'t validate VM Entry', vm_entry.value) + self.vm = None else: - vm_hdd = int(bitmath.parse_string_unsafe(vm_entry.specs["os-ssd"]).to_MB()) - if self.storage_handler.make_vm_image(src=vm_entry.image_uuid, dest=vm_entry.uuid): - if not self.storage_handler.resize_vm_image(path=vm_entry.uuid, size=vm_hdd): - vm_entry.status = VMStatus.error - else: - logger.info("New VM Created") + self.uuid = vm_entry.key.split('/')[-1] + self.host_key = self.vm['hostname'] - @capture_all_exception - def start(self, vm_entry: VMEntry, destination_host_key=None): - _vm = self.get_vm(self.running_vms, vm_entry.key) + def get_qemu_args(self): + command = ( + '-name {owner}_{name}' + ' -drive file={file},format=raw,if=virtio,cache=none' + ' -device virtio-rng-pci' + ' -m {memory} -smp cores={cores},threads={threads}' + ).format(owner=self.vm['owner'], name=self.vm['name'], + memory=int(self.vm['specs']['ram'].to_MB()), cores=self.vm['specs']['cpu'], + threads=1, file=shared.storage_handler.qemu_path_string(self.uuid)) - # VM already running. No need to proceed further. - if _vm: - logger.info("VM %s already running" % vm_entry.uuid) - return - else: - logger.info("Trying to start %s" % vm_entry.uuid) - if destination_host_key: - migration_port = find_free_port() - self.launch_vm(vm_entry, migration=True, migration_port=migration_port, - destination_host_key=destination_host_key) - else: - self.create(vm_entry) - self.launch_vm(vm_entry) + return command.split(' ') - @capture_all_exception - def stop(self, vm_entry): - vm = self.get_vm(self.running_vms, vm_entry.key) - vm.handle.shutdown() - if not vm.handle.is_running(): - vm_entry.add_log("Shutdown successfully") - vm_entry.declare_stopped() - shared.vm_pool.put(vm_entry) - self.running_vms.remove(vm) - delete_vm_network(vm_entry) + def start(self, destination_host_key=None): + migration = False + if destination_host_key: + migration = True - @capture_all_exception - def delete(self, vm_entry): - logger.info("Deleting VM | %s", vm_entry) - self.stop(vm_entry) - - if self.storage_handler.is_vm_image_exists(vm_entry.uuid): - r_status = self.storage_handler.delete_vm_image(vm_entry.uuid) - if r_status: - shared.etcd_client.client.delete(vm_entry.key) - else: - shared.etcd_client.client.delete(vm_entry.key) - - @capture_all_exception - def transfer(self, request_event): - # This function would run on source host i.e host on which the vm - # is running initially. This host would be responsible for transferring - # vm state to destination host. - - _host, _port = request_event.parameters["host"], request_event.parameters["port"] - _uuid = request_event.uuid - _destination = request_event.destination_host_key - vm = self.get_vm(self.running_vms, join_path(settings['etcd']['vm_prefix'], _uuid)) - - if vm: - tunnel = sshtunnel.SSHTunnelForwarder( - _host, - ssh_username=settings['ssh']['username'], - ssh_pkey=settings['ssh']['private_key_path'], - remote_bind_address=("127.0.0.1", _port), - ssh_proxy_enabled=True, - ssh_proxy=(_host, 22) - ) - try: - tunnel.start() - except sshtunnel.BaseSSHTunnelForwarderError: - logger.exception("Couldn't establish connection to (%s, 22)", _host) - else: - vm.handle.command( - "migrate", uri="tcp:0.0.0.0:{}".format(tunnel.local_bind_port) - ) - - status = vm.handle.command("query-migrate")["status"] - while status not in ["failed", "completed"]: - time.sleep(2) - status = vm.handle.command("query-migrate")["status"] - - with shared.vm_pool.get_put(request_event.uuid) as source_vm: - if status == "failed": - source_vm.add_log("Migration Failed") - elif status == "completed": - # If VM is successfully migrated then shutdown the VM - # on this host and update hostname to destination host key - source_vm.add_log("Successfully migrated") - source_vm.hostname = _destination - self.running_vms.remove(vm) - vm.handle.shutdown() - source_vm.in_migration = False # VM transfer finished - finally: - tunnel.close() - - @capture_all_exception - def launch_vm(self, vm_entry, migration=False, migration_port=None, destination_host_key=None): - logger.info("Starting %s" % vm_entry.key) - - vm = self.create_vm_object(vm_entry, migration=migration, migration_port=migration_port) + self.create() try: - vm.handle.launch() - except Exception: - logger.exception("Error Occured while starting VM") - vm.handle.shutdown() - - if migration: - # We don't care whether MachineError or any other error occurred - pass - else: - # Error during typical launch of a vm - vm.handle.shutdown() - vm_entry.declare_killed() - shared.vm_pool.put(vm_entry) + network_args = self.create_network_dev() + except Exception as err: + declare_stopped(self.vm) + self.vm['log'].append('Cannot Setup Network Properly') + logger.error('Cannot Setup Network Properly for vm %s', self.uuid, exc_info=err) else: - vm_entry.vnc_socket = vm.vnc_socket_file.name - self.running_vms.append(vm) + self.vmm.start(uuid=self.uuid, migration=migration, + *self.get_qemu_args(), *network_args) - if migration: - vm_entry.in_migration = True + status = self.vmm.get_status(self.uuid) + if status == 'running': + self.vm['status'] = VMStatus.running + self.vm['vnc_socket'] = self.vmm.get_vnc(self.uuid) + elif status == 'inmigrate': r = RequestEntry.from_scratch( - type=RequestType.TransferVM, - hostname=vm_entry.hostname, - parameters={"host": get_ipv6_address(), "port": migration_port}, - uuid=vm_entry.uuid, - destination_host_key=destination_host_key, + type=RequestType.TransferVM, # Transfer VM + hostname=self.host_key, # Which VM should get this request. It is source host + uuid=self.uuid, # uuid of VM + destination_host_key=destination_host_key, # Where source host transfer VM request_prefix=settings['etcd']['request_prefix'] ) shared.request_pool.put(r) else: - # Typical launching of a vm - vm_entry.status = VMStatus.running - vm_entry.add_log("Started successfully") + self.stop() + declare_stopped(self.vm) - shared.vm_pool.put(vm_entry) + self.sync() - @capture_all_exception - def maintenance(self, host): - # To capture vm running according to running_vms list + def stop(self): + self.vmm.stop(self.uuid) + self.delete_network_dev() + declare_stopped(self.vm) + self.sync() - # This is to capture successful migration of a VM. - # Suppose, this host is running "vm1" and user initiated - # request to migrate this "vm1" to some other host. On, - # successful migration the destination host would set - # the vm hostname to itself. Thus, we are checking - # whether this host vm is successfully migrated. If yes - # then we shutdown "vm1" on this host. - logger.debug("Starting Maintenance!!") - to_be_removed = [] - for running_vm in self.running_vms: - with shared.vm_pool.get_put(running_vm.key) as vm_entry: - if vm_entry.hostname != host.key and not vm_entry.in_migration: - running_vm.handle.shutdown() - logger.info("VM migration not completed successfully.") - to_be_removed.append(running_vm) + def migrate(self, destination): + self.vmm.transfer(src_uuid=self.uuid, dest_uuid=self.uuid, host=destination) - for r in to_be_removed: - self.running_vms.remove(r) + def create_network_dev(self): + command = '' + for network_mac_and_tap in self.vm['network']: + network_name, mac, tap = network_mac_and_tap - # To check vm running according to etcd entries - alleged_running_vms = shared.vm_pool.by_status("RUNNING", shared.vm_pool.by_host(host.key)) + _key = os.path.join(settings['etcd']['network_prefix'], self.vm['owner'], network_name) + network = shared.etcd_client.get(_key, value_in_json=True) + network_schema = NetworkSchema() + try: + network = network_schema.load(network.value) + except ValidationError: + continue - for vm_entry in alleged_running_vms: - _vm = self.get_vm(self.running_vms, vm_entry.key) - # Whether, the allegedly running vm is in our - # running_vms list or not if it is said to be - # running on this host but it is not then we - # need to shut it down + if network['type'] == "vxlan": + tap = create_vxlan_br_tap(_id=network['id'], + _dev=settings['network']['vxlan_phy_dev'], + tap_id=tap, + ip=network['ipv6']) - # This is to capture poweroff/shutdown of a VM - # initiated by user inside VM. OR crash of VM by some - # user running process - if (_vm and not _vm.handle.is_running()) or not _vm: - logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running())) - vm_entry.add_log("""{} is not running but is said to be running. - So, shutting it down and declare it killed""".format(vm_entry.key)) - vm_entry.declare_killed() - shared.vm_pool.put(vm_entry) - if _vm: - self.running_vms.remove(_vm) + all_networks = shared.etcd_client.get_prefix(settings['etcd']['network_prefix'], + value_in_json=True) + + if ipaddress.ip_network(network['ipv6']).is_global: + update_radvd_conf(all_networks) + + command += '-netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no' \ + ' -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}' \ + .format(tap=tap, net_id=network['id'], mac=mac) + + return command.split(' ') + + def delete_network_dev(self): + try: + for network in self.vm['network']: + network_name = network[0] + _ = network[1] # tap_mac + tap_id = network[2] + + delete_network_interface('tap{}'.format(tap_id)) + + owners_vms = shared.vm_pool.by_owner(self.vm['owner']) + owners_running_vms = shared.vm_pool.by_status(VMStatus.running, + _vms=owners_vms) + + networks = map( + lambda n: n[0], map(lambda vm: vm.network, owners_running_vms) + ) + networks_in_use_by_user_vms = [vm[0] for vm in networks] + if network_name not in networks_in_use_by_user_vms: + network_entry = resolve_network(network[0], self.vm['owner']) + if network_entry: + network_type = network_entry.value["type"] + network_id = network_entry.value["id"] + if network_type == "vxlan": + delete_network_interface('br{}'.format(network_id)) + delete_network_interface('vxlan{}'.format(network_id)) + except Exception: + logger.exception("Exception in network interface deletion") + + def create(self): + if shared.storage_handler.is_vm_image_exists(self.uuid): + # File Already exists. No Problem Continue + logger.debug("Image for vm %s exists", self.uuid) + else: + if shared.storage_handler.make_vm_image(src=self.vm['image_uuid'], dest=self.uuid): + if not shared.storage_handler.resize_vm_image(path=self.uuid, + size=int(self.vm['specs']['os-ssd'].to_MB())): + self.vm['status'] = VMStatus.error + else: + logger.info("New VM Created") + + def sync(self): + shared.etcd_client.put(self.key, self.schema.dump(self.vm), value_in_json=True) + + def delete(self): + self.stop() + + if shared.storage_handler.is_vm_image_exists(self.uuid): + r_status = shared.storage_handler.delete_vm_image(self.uuid) + if r_status: + shared.etcd_client.client.delete(self.key) + else: + shared.etcd_client.client.delete(self.key) def resolve_network(network_name, network_owner): - network = shared.etcd_client.get(join_path(settings['etcd']['network_prefix'], - network_owner, - network_name), - value_in_json=True) + network = shared.etcd_client.get( + join_path(settings['etcd']['network_prefix'], network_owner, network_name), value_in_json=True + ) return network -def delete_vm_network(vm_entry): - try: - for network in vm_entry.network: - network_name = network[0] - tap_mac = network[1] - tap_id = network[2] - - delete_network_interface('tap{}'.format(tap_id)) - - owners_vms = shared.vm_pool.by_owner(vm_entry.owner) - owners_running_vms = shared.vm_pool.by_status(VMStatus.running, - _vms=owners_vms) - - networks = map( - lambda n: n[0], map(lambda vm: vm.network, owners_running_vms) - ) - networks_in_use_by_user_vms = [vm[0] for vm in networks] - if network_name not in networks_in_use_by_user_vms: - network_entry = resolve_network(network[0], vm_entry.owner) - if network_entry: - network_type = network_entry.value["type"] - network_id = network_entry.value["id"] - if network_type == "vxlan": - delete_network_interface('br{}'.format(network_id)) - delete_network_interface('vxlan{}'.format(network_id)) - except Exception: - logger.exception("Exception in network interface deletion") - - def create_vxlan_br_tap(_id, _dev, tap_id, ip=None): network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'), @@ -377,10 +222,12 @@ def update_radvd_conf(all_networks): ) for net in networks if networks.get(net) ] - with open('/etc/radvd.conf', 'w') as radvd_conf: radvd_conf.writelines(content) try: sp.check_output(['systemctl', 'restart', 'radvd']) - except Exception: - sp.check_output(['service', 'radvd', 'restart']) + except sp.CalledProcessError: + try: + sp.check_output(['service', 'radvd', 'restart']) + except sp.CalledProcessError as err: + raise err.__class__('Cannot start/restart radvd service', err.cmd) from err diff --git a/ucloud/scheduler/helper.py b/ucloud/scheduler/helper.py index 643e8e9..0e9ef73 100755 --- a/ucloud/scheduler/helper.py +++ b/ucloud/scheduler/helper.py @@ -95,7 +95,7 @@ def dead_host_mitigation(dead_hosts_keys): vms_hosted_on_dead_host = shared.vm_pool.by_host(host_key) for vm in vms_hosted_on_dead_host: - vm.declare_killed() + vm.status = 'UNKNOWN' shared.vm_pool.put(vm) shared.host_pool.put(host) diff --git a/ucloud/scheduler/main.py b/ucloud/scheduler/main.py index 3412545..d91979f 100755 --- a/ucloud/scheduler/main.py +++ b/ucloud/scheduler/main.py @@ -56,35 +56,14 @@ def main(): continue shared.etcd_client.client.delete(request_entry.key) # consume Request - # If the Request is about a VM which is labelled as "migration" - # and has a destination - if hasattr(request_entry, "migration") and request_entry.migration \ - and hasattr(request_entry, "destination") and request_entry.destination: - try: - get_suitable_host(vm_specs=vm_entry.specs, - hosts=[shared.host_pool.get(request_entry.destination)]) - except NoSuitableHostFound: - logger.info("Requested destination host doesn't have enough capacity" - "to hold %s" % vm_entry.uuid) - else: - r = RequestEntry.from_scratch(type=RequestType.InitVMMigration, - uuid=request_entry.uuid, - destination=request_entry.destination, - request_prefix=settings['etcd']['request_prefix']) - shared.request_pool.put(r) + try: + assign_host(vm_entry) + except NoSuitableHostFound: + vm_entry.add_log("Can't schedule VM. No Resource Left.") + shared.vm_pool.put(vm_entry) - # If the Request is about a VM that just want to get started/created - else: - # assign_host only returns None when we couldn't be able to assign - # a host to a VM because of resource constraints - try: - assign_host(vm_entry) - except NoSuitableHostFound: - vm_entry.add_log("Can't schedule VM. No Resource Left.") - shared.vm_pool.put(vm_entry) - - pending_vms.append(vm_entry) - logger.info("No Resource Left. Emailing admin....") + pending_vms.append(vm_entry) + logger.info("No Resource Left. Emailing admin....") if __name__ == "__main__": diff --git a/ucloud/vmm/__init__.py b/ucloud/vmm/__init__.py new file mode 100644 index 0000000..1291da4 --- /dev/null +++ b/ucloud/vmm/__init__.py @@ -0,0 +1,181 @@ +import os +import subprocess as sp +import logging +import socket +import json +import tempfile +import time + +from contextlib import suppress +from multiprocessing import Process +from os.path import join as join_path +from os.path import isdir + +logger = logging.getLogger(__name__) + + +class VMQMPHandles: + def __init__(self, path): + self.path = path + self.sock = socket.socket(socket.AF_UNIX) + self.file = self.sock.makefile() + + def __enter__(self): + self.sock.connect(self.path) + + # eat qmp greetings + self.file.readline() + + # init qmp + self.sock.sendall(b'{ "execute": "qmp_capabilities" }') + self.file.readline() + + return self.sock, self.file + + def __exit__(self, exc_type, exc_val, exc_tb): + self.file.close() + self.sock.close() + + if exc_type: + logger.error('Couldn\'t get handle for VM.', exc_type, exc_val, exc_tb) + raise exc_type("Couldn't get handle for VM.") from exc_type + + +class TransferVM(Process): + def __init__(self, src_uuid, dest_uuid, host, socket_dir): + self.src_uuid = src_uuid + self.dest_uuid = dest_uuid + self.host = host + self.src_sock_path = os.path.join(socket_dir, self.src_uuid) + self.dest_sock_path = os.path.join(socket_dir, self.dest_uuid) + + super().__init__() + + def run(self): + with suppress(FileNotFoundError): + os.remove(self.src_sock_path) + + command = ['ssh', '-nNT', '-L', '{}:{}'.format(self.src_sock_path, self.dest_sock_path), + 'root@{}'.format(self.host)] + + try: + p = sp.Popen(command) + except Exception as e: + logger.error('Couldn\' forward unix socks over ssh.', exc_info=e) + else: + time.sleep(2) + vmm = VMM() + logger.debug('Executing: ssh forwarding command: %s', command) + vmm.execute_command(self.src_uuid, command='migrate', + arguments={'uri': 'unix:{}'.format(self.src_sock_path)}) + + while p.poll() is None: + success, output = vmm.execute_command(self.src_uuid, command='query-migrate') + if success: + status = output['return']['status'] + if status != 'active': + print('Migration Status: ', status) + return + else: + print('Migration Status: ', status) + else: + return + time.sleep(0.2) + + +class VMM: + # Virtual Machine Manager + def __init__(self, qemu_path='/usr/bin/qemu-system-x86_64', + vmm_backend=os.path.expanduser('~/ucloud/vmm/')): + self.qemu_path = qemu_path + self.vmm_backend = vmm_backend + self.socket_dir = os.path.join(self.vmm_backend, 'sock') + + def is_running(self, uuid): + sock_path = os.path.join(self.vmm_backend, uuid) + try: + sock = socket.socket(socket.AF_UNIX) + sock.connect(sock_path) + recv = sock.recv(4096) + except Exception as err: + # unix sock doesn't exists or it is closed + logger.info('VM %s sock either don\' exists or it is closed.', uuid, + 'It mean VM is stopped.', exc_info=err) + else: + # if we receive greetings from qmp it mean VM is running + if len(recv) > 0: + return True + + with suppress(FileNotFoundError): + os.remove(sock_path) + + return False + + def start(self, *args, uuid, migration=False): + # start --> sucess? + migration_args = () + if migration: + migration_args = ('-incoming', 'unix:{}'.format(os.path.join(self.socket_dir, uuid))) + + if self.is_running(uuid): + logger.warning('Cannot start VM. It is already running.') + else: + qmp_arg = ('-qmp', 'unix:{}/{},server,nowait'.format(self.vmm_backend, uuid)) + vnc_arg = ('-vnc', 'unix:{}'.format(tempfile.NamedTemporaryFile().name)) + + command = [self.qemu_path, *args, *qmp_arg, *migration_args, *vnc_arg, '-daemonize'] + try: + sp.check_output(command, stderr=sp.PIPE) + except sp.CalledProcessError as err: + logger.exception('Error occurred while starting VM.\nDetail %s', err.stderr.decode('utf-8')) + else: + time.sleep(2) + + def execute_command(self, uuid, command, **kwargs): + # execute_command -> sucess?, output + try: + with VMQMPHandles(os.path.join(self.vmm_backend, uuid)) as (sock_handle, file_handle): + command_to_execute = { + 'execute': command, + **kwargs + } + sock_handle.sendall(json.dumps(command_to_execute).encode('utf-8')) + output = file_handle.readline() + except Exception as err: + logger.exception('Error occurred while executing command and getting valid output from qmp') + else: + try: + output = json.loads(output) + except: + logger.exception('QMP Output isn\'t valid JSON. %s', output) + else: + return 'return' in output, output + return False, None + + def stop(self, uuid): + success, output = self.execute_command(command='quit', uuid=uuid) + return success + + def get_status(self, uuid): + success, output = self.execute_command(command='query-status', uuid=uuid) + if success: + return output['return']['status'] + else: + return 'STOPPED' + + def discover(self): + vms = [ + uuid for uuid in os.listdir(self.vmm_backend) + if not isdir(join_path(self.vmm_backend, uuid)) + ] + return vms + + def get_vnc(self, uuid): + success, output = self.execute_command(uuid, command='query-vnc') + if success: + return output['return']['service'] + return None + + def transfer(self, src_uuid, dest_uuid, host): + p = TransferVM(src_uuid, dest_uuid, socket_dir=self.socket_dir, host=host) + p.start()