# QEMU Manual # https://qemu.weilnetz.de/doc/qemu-doc.html # For QEMU Monitor Protocol Commands Information, See # https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor import errno import os import subprocess import tempfile import time from functools import wraps from os.path import join from typing import Union import bitmath import sshtunnel from decouple import config import qmp from config import (WITHOUT_CEPH, etcd_client, logging, request_pool, running_vms, vm_pool) from ucloud_common.helpers import get_ipv4_address from ucloud_common.request import RequestEntry, RequestType from ucloud_common.vm import VMEntry, VMStatus class VM: def __init__(self, key, handle, vnc_socket_file): self.key = key # type: str self.handle = handle # type: qmp.QEMUMachine self.vnc_socket_file = vnc_socket_file # type: tempfile.NamedTemporaryFile def __repr__(self): return "VM({})".format(self.key) def get_start_command_args( vm_entry, vnc_sock_filename: str, migration=False, migration_port=4444 ): threads_per_core = 1 vm_memory = int(bitmath.Byte(int(vm_entry.specs["ram"])).to_MB()) vm_cpus = int(vm_entry.specs["cpu"]) vm_uuid = vm_entry.uuid if WITHOUT_CEPH: command = "-drive file={},format=raw,if=virtio,cache=none".format( os.path.join("/var/vm", vm_uuid) ) else: command = "-drive file=rbd:uservms/{},format=raw,if=virtio,cache=none".format( vm_uuid ) command += " -device virtio-rng-pci -vnc unix:{}".format(vnc_sock_filename) command += " -m {} -smp cores={},threads={}".format( vm_memory, vm_cpus, threads_per_core ) command += " -name {}".format(vm_uuid) if migration: command += " -incoming tcp:0:{}".format(migration_port) return command.split(" ") def create_vm_object(vm_entry, migration=False, migration_port=4444): # NOTE: If migration suddenly stop working, having different # VNC unix filename on source and destination host can # be a possible cause of it. # REQUIREMENT: Use Unix Socket instead of TCP Port for VNC vnc_sock_file = tempfile.NamedTemporaryFile() qemu_args = get_start_command_args( vm_entry=vm_entry, vnc_sock_filename=vnc_sock_file.name, migration=migration, migration_port=migration_port, ) qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args) return VM(vm_entry.key, qemu_machine, vnc_sock_file) def get_vm(vm_list: list, vm_key) -> Union[VM, None]: return next((vm for vm in vm_list if vm.key == vm_key), None) def need_running_vm(func): @wraps(func) def wrapper(e): vm = get_vm(running_vms, e.key) if vm: try: status = vm.handle.command("query-status") logging.debug("VM Status Check - %s", status) except Exception as exception: logging.info("%s failed - VM %s %s", func.__name__, e, exception) else: return func(e) else: logging.info("%s failed because VM %s is not running", func.__name__, e.key) return return wrapper def create(vm_entry: VMEntry): vm_hdd = int(bitmath.Byte(int(vm_entry.specs["hdd"])).to_MB()) if WITHOUT_CEPH: _command_to_create = [ "cp", os.path.join("/var/image", vm_entry.image_uuid), os.path.join("/var/vm", vm_entry.uuid), ] _command_to_extend = [ "qemu-img", "resize", os.path.join("/var/vm", vm_entry.uuid), vm_entry.specs["hdd"], ] else: _command_to_create = [ "rbd", "clone", "images/{}@protected".format(vm_entry.image_uuid), "uservms/{}".format(vm_entry.uuid), ] _command_to_extend = [ "rbd", "resize", "uservms/{}".format(vm_entry.uuid), "--size", vm_hdd, ] try: subprocess.check_output(_command_to_create) except subprocess.CalledProcessError as e: if e.returncode == errno.EEXIST: logging.debug("Image for vm %s exists", vm_entry.uuid) # File Already exists. No Problem Continue return # This exception catches all other exceptions # i.e FileNotFound (BaseImage), pool Does Not Exists etc. logging.exception(e) vm_entry.status = "ERROR" else: try: subprocess.check_output(_command_to_extend) except Exception as e: logging.exception(e) else: logging.info("New VM Created") def start(vm_entry: VMEntry): _vm = get_vm(running_vms, vm_entry.key) # VM already running. No need to proceed further. if _vm: logging.info("VM %s already running", vm_entry.uuid) return else: create(vm_entry) launch_vm(vm_entry) @need_running_vm def stop(vm_entry): vm = get_vm(running_vms, vm_entry.key) vm.handle.shutdown() if not vm.handle.is_running(): vm_entry.add_log("Shutdown successfully") vm_entry.declare_stopped() vm_pool.put(vm_entry) running_vms.remove(vm) def delete(vm_entry): logging.info("Deleting VM | %s", vm_entry) stop(vm_entry) path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1 :] if WITHOUT_CEPH: vm_deletion_command = ["rm", os.path.join("/var/vm", vm_entry.uuid)] else: vm_deletion_command = ["rbd", "rm", path_without_protocol] try: subprocess.check_output(vm_deletion_command) except Exception as e: logging.exception(e) else: etcd_client.client.delete(vm_entry.key) def transfer(request_event): # This function would run on source host i.e host on which the vm # is running initially. This host would be responsible for transferring # vm state to destination host. _host, _port = request_event.parameters["host"], request_event.parameters["port"] _uuid = request_event.uuid _destination = request_event.destination_host_key vm = get_vm(running_vms, join("/v1/vm", _uuid)) if vm: tunnel = sshtunnel.SSHTunnelForwarder( (_host, 22), ssh_username=config("ssh_username"), ssh_pkey=config("ssh_pkey"), ssh_private_key_password=config("ssh_private_key_password"), remote_bind_address=("127.0.0.1", _port), ) try: tunnel.start() except sshtunnel.BaseSSHTunnelForwarderError: logging.exception("Couldn't establish connection to (%s, 22)", _host) else: vm.handle.command( "migrate", uri="tcp:{}:{}".format(_host, tunnel.local_bind_port) ) status = vm.handle.command("query-migrate")["status"] while status not in ["failed", "completed"]: time.sleep(2) status = vm.handle.command("query-migrate")["status"] with vm_pool.get_put(request_event.uuid) as source_vm: if status == "failed": source_vm.add_log("Migration Failed") elif status == "completed": # If VM is successfully migrated then shutdown the VM # on this host and update hostname to destination host key source_vm.add_log("Successfully migrated") source_vm.hostname = _destination running_vms.remove(vm) vm.handle.shutdown() source_vm.in_migration = False # VM transfer finished finally: tunnel.close() def init_migration(vm_entry, destination_host_key): # This function would run on destination host i.e host on which the vm # would be transferred after migration. # This host would be responsible for starting VM that would receive # state of VM running on source host. _vm = get_vm(running_vms, vm_entry.key) if _vm: # VM already running. No need to proceed further. logging.info("%s Already running", _vm.key) return launch_vm(vm_entry, migration=True, migration_port=4444, destination_host_key=destination_host_key) def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_key=None): logging.info("Starting %s", vm_entry.key) vm = create_vm_object(vm_entry, migration=migration, migration_port=migration_port) try: vm.handle.launch() except Exception as e: logging.exception(e) if migration: # We don't care whether MachineError or any other error occurred vm.handle.shutdown() else: # Error during typical launch of a vm vm_entry.add_log("Error Occurred while starting VM") vm_entry.declare_killed() vm_pool.put(vm_entry) else: vm_entry.vnc_socket = vm.vnc_socket_file.name running_vms.append(vm) if migration: vm_entry.in_migration = True r = RequestEntry.from_scratch( type=RequestType.TransferVM, hostname=vm_entry.hostname, parameters={"host": get_ipv4_address(), "port": 4444}, uuid=vm_entry.uuid, destination_host_key=destination_host_key, ) request_pool.put(r) else: # Typical launching of a vm vm_entry.status = VMStatus.running vm_entry.add_log("Started successfully") vm_pool.put(vm_entry)