# QEMU Manual # https://qemu.weilnetz.de/doc/qemu-doc.html # For QEMU Monitor Protocol Commands Information, See # https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor import os import subprocess as sp import tempfile import time from functools import wraps from string import Template from typing import Union from os.path import join as join_path import bitmath import sshtunnel from ucloud.common.helpers import get_ipv6_address from ucloud.common.request import RequestEntry, RequestType from ucloud.common.vm import VMEntry, VMStatus from ucloud.common.network import create_dev, delete_network_interface, find_free_port from ucloud.host import logger from ucloud.shared import shared from ucloud.settings import settings from . import qmp class VM: def __init__(self, key, handle, vnc_socket_file): self.key = key # type: str self.handle = handle # type: qmp.QEMUMachine self.vnc_socket_file = vnc_socket_file # type: tempfile.NamedTemporaryFile def __repr__(self): return "VM({})".format(self.key) def capture_all_exception(func): @wraps(func) def wrapper(*args, **kwargs): try: func(*args, **kwargs) except Exception: logger.exception('Unhandled exception occur in %s. For more details see Syslog.', __name__) return wrapper class VMM: def __init__(self): self.etcd_client = shared.etcd_client self.storage_handler = shared.storage_handler self.running_vms = [] def get_start_command_args(self, vm_entry, vnc_sock_filename: str, migration=False, migration_port=None): threads_per_core = 1 vm_memory = int(bitmath.parse_string_unsafe(vm_entry.specs['ram']).to_MB()) vm_cpus = int(vm_entry.specs['cpu']) vm_uuid = vm_entry.uuid vm_networks = vm_entry.network command = '-name {}_{}'.format(vm_entry.owner, vm_entry.name) command += ' -drive file={},format=raw,if=virtio,cache=none'.format( self.storage_handler.qemu_path_string(vm_uuid) ) command += ' -device virtio-rng-pci -vnc unix:{}'.format(vnc_sock_filename) command += ' -m {} -smp cores={},threads={}'.format( vm_memory, vm_cpus, threads_per_core ) if migration: command += ' -incoming tcp:[::]:{}'.format(migration_port) for network_mac_and_tap in vm_networks: network_name, mac, tap = network_mac_and_tap _key = os.path.join(settings['etcd']['network_prefix'], vm_entry.owner, network_name) network = self.etcd_client.get(_key, value_in_json=True) network_type = network.value["type"] network_id = str(network.value["id"]) network_ipv6 = network.value["ipv6"] if network_type == "vxlan": tap = create_vxlan_br_tap(_id=network_id, _dev=settings['network']['vxlan_phy_dev'], tap_id=tap, ip=network_ipv6) all_networks = self.etcd_client.get_prefix('/v1/network/', value_in_json=True) update_radvd_conf(all_networks) command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" \ " -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}" \ .format(tap=tap, net_id=network_id, mac=mac) return command.split(" ") def create_vm_object(self, vm_entry, migration=False, migration_port=None): vnc_sock_file = tempfile.NamedTemporaryFile() qemu_args = self.get_start_command_args( vm_entry=vm_entry, vnc_sock_filename=vnc_sock_file.name, migration=migration, migration_port=migration_port, ) qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args) return VM(vm_entry.key, qemu_machine, vnc_sock_file) @staticmethod def get_vm(vm_list: list, vm_key) -> Union[VM, None]: return next((vm for vm in vm_list if vm.key == vm_key), None) @capture_all_exception def create(self, vm_entry: VMEntry): if self.storage_handler.is_vm_image_exists(vm_entry.uuid): # File Already exists. No Problem Continue logger.debug("Image for vm %s exists", vm_entry.uuid) return None else: vm_hdd = int(bitmath.parse_string_unsafe(vm_entry.specs["os-ssd"]).to_MB()) if self.storage_handler.make_vm_image(src=vm_entry.image_uuid, dest=vm_entry.uuid): if not self.storage_handler.resize_vm_image(path=vm_entry.uuid, size=vm_hdd): vm_entry.status = VMStatus.error else: logger.info("New VM Created") @capture_all_exception def start(self, vm_entry: VMEntry, destination_host_key=None): _vm = self.get_vm(self.running_vms, vm_entry.key) # VM already running. No need to proceed further. if _vm: logger.info("VM %s already running" % vm_entry.uuid) return else: logger.info("Trying to start %s" % vm_entry.uuid) if destination_host_key: migration_port = find_free_port() self.launch_vm(vm_entry, migration=True, migration_port=migration_port, destination_host_key=destination_host_key) else: self.create(vm_entry) self.launch_vm(vm_entry) @capture_all_exception def stop(self, vm_entry): vm = self.get_vm(self.running_vms, vm_entry.key) vm.handle.shutdown() if not vm.handle.is_running(): vm_entry.add_log("Shutdown successfully") vm_entry.declare_stopped() shared.vm_pool.put(vm_entry) self.running_vms.remove(vm) delete_vm_network(vm_entry) @capture_all_exception def delete(self, vm_entry): logger.info("Deleting VM | %s", vm_entry) self.stop(vm_entry) if self.storage_handler.is_vm_image_exists(vm_entry.uuid): r_status = self.storage_handler.delete_vm_image(vm_entry.uuid) if r_status: shared.etcd_client.client.delete(vm_entry.key) else: shared.etcd_client.client.delete(vm_entry.key) @capture_all_exception def transfer(self, request_event): # This function would run on source host i.e host on which the vm # is running initially. This host would be responsible for transferring # vm state to destination host. _host, _port = request_event.parameters["host"], request_event.parameters["port"] _uuid = request_event.uuid _destination = request_event.destination_host_key vm = self.get_vm(self.running_vms, join_path(settings['etcd']['vm_prefix'], _uuid)) if vm: tunnel = sshtunnel.SSHTunnelForwarder( _host, ssh_username=settings['ssh']['username'], ssh_pkey=settings['ssh']['private_key_path'], remote_bind_address=("127.0.0.1", _port), ssh_proxy_enabled=True, ssh_proxy=(_host, 22) ) try: tunnel.start() except sshtunnel.BaseSSHTunnelForwarderError: logger.exception("Couldn't establish connection to (%s, 22)", _host) else: vm.handle.command( "migrate", uri="tcp:0.0.0.0:{}".format(tunnel.local_bind_port) ) status = vm.handle.command("query-migrate")["status"] while status not in ["failed", "completed"]: time.sleep(2) status = vm.handle.command("query-migrate")["status"] with shared.vm_pool.get_put(request_event.uuid) as source_vm: if status == "failed": source_vm.add_log("Migration Failed") elif status == "completed": # If VM is successfully migrated then shutdown the VM # on this host and update hostname to destination host key source_vm.add_log("Successfully migrated") source_vm.hostname = _destination self.running_vms.remove(vm) vm.handle.shutdown() source_vm.in_migration = False # VM transfer finished finally: tunnel.close() @capture_all_exception def launch_vm(self, vm_entry, migration=False, migration_port=None, destination_host_key=None): logger.info("Starting %s" % vm_entry.key) vm = self.create_vm_object(vm_entry, migration=migration, migration_port=migration_port) try: vm.handle.launch() except Exception: logger.exception("Error Occured while starting VM") vm.handle.shutdown() if migration: # We don't care whether MachineError or any other error occurred pass else: # Error during typical launch of a vm vm.handle.shutdown() vm_entry.declare_killed() shared.vm_pool.put(vm_entry) else: vm_entry.vnc_socket = vm.vnc_socket_file.name self.running_vms.append(vm) if migration: vm_entry.in_migration = True r = RequestEntry.from_scratch( type=RequestType.TransferVM, hostname=vm_entry.hostname, parameters={"host": get_ipv6_address(), "port": migration_port}, uuid=vm_entry.uuid, destination_host_key=destination_host_key, request_prefix=settings['etcd']['request_prefix'] ) shared.request_pool.put(r) else: # Typical launching of a vm vm_entry.status = VMStatus.running vm_entry.add_log("Started successfully") shared.vm_pool.put(vm_entry) @capture_all_exception def maintenance(self, host): # To capture vm running according to running_vms list # This is to capture successful migration of a VM. # Suppose, this host is running "vm1" and user initiated # request to migrate this "vm1" to some other host. On, # successful migration the destination host would set # the vm hostname to itself. Thus, we are checking # whether this host vm is successfully migrated. If yes # then we shutdown "vm1" on this host. logger.debug("Starting Maintenance!!") to_be_removed = [] for running_vm in self.running_vms: with shared.vm_pool.get_put(running_vm.key) as vm_entry: if vm_entry.hostname != host.key and not vm_entry.in_migration: running_vm.handle.shutdown() logger.info("VM migration not completed successfully.") to_be_removed.append(running_vm) for r in to_be_removed: self.running_vms.remove(r) # To check vm running according to etcd entries alleged_running_vms = shared.vm_pool.by_status("RUNNING", shared.vm_pool.by_host(host.key)) for vm_entry in alleged_running_vms: _vm = self.get_vm(self.running_vms, vm_entry.key) # Whether, the allegedly running vm is in our # running_vms list or not if it is said to be # running on this host but it is not then we # need to shut it down # This is to capture poweroff/shutdown of a VM # initiated by user inside VM. OR crash of VM by some # user running process if (_vm and not _vm.handle.is_running()) or not _vm: logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running())) vm_entry.add_log("""{} is not running but is said to be running. So, shutting it down and declare it killed""".format(vm_entry.key)) vm_entry.declare_killed() shared.vm_pool.put(vm_entry) if _vm: self.running_vms.remove(_vm) def resolve_network(network_name, network_owner): network = shared.etcd_client.get(join_path(settings['etcd']['network_prefix'], network_owner, network_name), value_in_json=True) return network def delete_vm_network(vm_entry): try: for network in vm_entry.network: network_name = network[0] tap_mac = network[1] tap_id = network[2] delete_network_interface('tap{}'.format(tap_id)) owners_vms = shared.vm_pool.by_owner(vm_entry.owner) owners_running_vms = shared.vm_pool.by_status(VMStatus.running, _vms=owners_vms) networks = map( lambda n: n[0], map(lambda vm: vm.network, owners_running_vms) ) networks_in_use_by_user_vms = [vm[0] for vm in networks] if network_name not in networks_in_use_by_user_vms: network_entry = resolve_network(network[0], vm_entry.owner) if network_entry: network_type = network_entry.value["type"] network_id = network_entry.value["id"] if network_type == "vxlan": delete_network_interface('br{}'.format(network_id)) delete_network_interface('vxlan{}'.format(network_id)) except Exception: logger.exception("Exception in network interface deletion") def create_vxlan_br_tap(_id, _dev, tap_id, ip=None): network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'), _id=_id, dev=_dev) if vxlan: bridge = create_dev(script=os.path.join(network_script_base, 'create-bridge.sh'), _id=_id, dev=vxlan, ip=ip) if bridge: tap = create_dev(script=os.path.join(network_script_base, 'create-tap.sh'), _id=str(tap_id), dev=bridge) if tap: return tap def update_radvd_conf(all_networks): network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network') networks = { net.value['ipv6']: net.value['id'] for net in all_networks if net.value.get('ipv6') } radvd_template = open(os.path.join(network_script_base, 'radvd-template.conf'), 'r').read() radvd_template = Template(radvd_template) content = [ radvd_template.safe_substitute( bridge='br{}'.format(networks[net]), prefix=net ) for net in networks if networks.get(net) ] with open('/etc/radvd.conf', 'w') as radvd_conf: radvd_conf.writelines(content) try: sp.check_output(['systemctl', 'restart', 'radvd']) except Exception: sp.check_output(['service', 'radvd', 'restart'])