# QEMU Manual # https://qemu.weilnetz.de/doc/qemu-doc.html # For QEMU Monitor Protocol Commands Information, See # https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor import subprocess import traceback import errno import qmp import tempfile import bitmath import time from ucloud_common.vm import VMStatus, VMEntry from config import (vm_pool, request_pool, etcd_client, logging, running_vms) from typing import Union from functools import wraps from dataclasses import dataclass from ucloud_common.request import RequestEntry, RequestType import sshtunnel from decouple import config from os.path import join from ucloud_common.helpers import get_ipv4_address @dataclass class VM: key: str handle: qmp.QEMUMachine vnc_socket_file: tempfile.NamedTemporaryFile def __repr__(self): return f"VM({self.key})" def get_start_command_args(vm_entry, vnc_sock_filename: str, migration=False, migration_port=4444): vm_memory = int(bitmath.Byte(int(vm_entry.specs["ram"])).to_MB()) vm_cpus = int(vm_entry.specs["cpu"]) vm_uuid = vm_entry.uuid threads_per_core = 1 command = (f"-drive file=rbd:uservms/{vm_uuid},format=raw,if=virtio,cache=none" f" -device virtio-rng-pci -vnc unix:{vnc_sock_filename}" f" -m {vm_memory} -smp cores={vm_cpus},threads={threads_per_core}" f" -name {vm_uuid}") if migration: command += f" -incoming tcp:0:{migration_port}" return command.split(" ") def create_vm_object(vm_entry, migration=False, migration_port=4444): # NOTE: If migration suddenly stop working, having different # VNC unix filename on source and destination host can # be a possible cause of it. # REQUIREMENT: Use Unix Socket instead of TCP Port for VNC vnc_sock_file = tempfile.NamedTemporaryFile() qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=get_start_command_args(vm_entry, vnc_sock_file.name, migration=migration, migration_port=migration_port ) ) return VM(vm_entry.key, qemu_machine, vnc_sock_file) def get_vm(vm_list: list, vm_key) -> Union[VM, None]: return next((vm for vm in vm_list if vm.key == vm_key), None) def need_running_vm(func): @wraps(func) def wrapper(e): vm = get_vm(running_vms, e.key) if vm: try: status = vm.handle.command("query-status") logging.debug(f"VM Status Check - {status}") except OSError: logging.info( f"{func.__name__} failed - VM {e.key} - Unknown Error" ) return func(e) else: logging.info( f"{func.__name__} failed because VM {e.key} is not running" ) return return wrapper def create(vm_entry: VMEntry): vm_hdd = int(bitmath.Byte(int(vm_entry.specs["hdd"])).to_MB()) _command_to_create = f"rbd clone images/{vm_entry.image_uuid}@protected uservms/{vm_entry.uuid}" _command_to_extend = f"rbd resize uservms/{vm_entry.uuid} --size {vm_hdd}" try: subprocess.check_call(_command_to_create.split(" ")) except subprocess.CalledProcessError as e: if e.returncode == errno.EEXIST: logging.debug(f"Image for vm {vm_entry.uuid} exists") # File Already exists. No Problem Continue return else: # This exception catches all other exceptions # i.e FileNotFound (BaseImage), pool Does Not Exists etc. logging.exception(f"Can't clone image - {e}") else: # TODO: Check whether the below suprocess.check_call # is executed successfully subprocess.check_call(_command_to_extend.split(" ")) logging.info("New VM Created") def start(vm_entry: VMEntry): _vm = get_vm(running_vms, vm_entry.key) # VM already running. No need to proceed further. if _vm: logging.info(f"VM {vm_entry.uuid} already running") return else: create(vm_entry) logging.info(f"Starting {vm_entry.key}") vm = create_vm_object(vm_entry) try: vm.handle.launch() except (qmp.QEMUMachineError, TypeError, Exception): vm_entry.declare_killed() vm_entry.add_log(f"Machine Error occurred - {traceback.format_exc()}") vm_pool.put(vm_entry) else: running_vms.append(vm) vm_entry.status = VMStatus.running vm_entry.add_log("Started successfully") vm_pool.put(vm_entry) @need_running_vm def stop(vm_entry): vm = get_vm(running_vms, vm_entry.key) vm.handle.shutdown() if not vm.handle.is_running(): vm_entry.add_log("Shutdown successfully") vm_entry.declare_stopped() vm_pool.put(vm_entry) running_vms.remove(vm) def delete(vm_entry): logging.info(f"Deleting VM {vm_entry}") stop(vm_entry) path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1:] try: rc = subprocess.call(f"rbd rm {path_without_protocol}".split(" ")) except FileNotFoundError as e: logging.exception(e) except Exception as e: logging.exception(f"Unknown error occurred - {e}") else: if rc == 0: etcd_client.client.delete(vm_entry.key) else: logging.info("Some unknown problem occur while deleting vm file") def transfer(request_event): # This function would run on source host i.e host on which the vm # is running initially. This host would be responsible for transferring # vm state to destination host. _host, _port = request_event.parameters["host"], request_event.parameters["port"] _uuid = request_event.uuid _destination = request_event.destination_host_key vm = get_vm(running_vms, join("/v1/vm", _uuid)) if vm: tunnel = sshtunnel.SSHTunnelForwarder( (_host, 22), ssh_username=config("ssh_username"), ssh_pkey=config("ssh_pkey"), ssh_private_key_password=config("ssh_private_key_password"), remote_bind_address=('127.0.0.1', _port), ) try: tunnel.start() except sshtunnel.BaseSSHTunnelForwarderError: logging.exception(f"Couldn't establish connection to ({_host}, 22)") else: vm.handle.command("migrate", uri=f"tcp:{_host}:{tunnel.local_bind_port}") status = vm.handle.command("query-migrate")["status"] while status not in ["failed", "completed"]: time.sleep(2) status = vm.handle.command("query-migrate")["status"] with vm_pool.get_put(request_event.uuid) as source_vm: if status == "failed": source_vm.add_log("Migration Failed") elif status == "completed": # If VM is successfully migrated then shutdown the VM # on this host and update hostname to destination host key source_vm.add_log("Successfully migrated") source_vm.hostname = _destination running_vms.remove(vm) vm.handle.shutdown() source_vm.in_migration = False # VM transfer finished finally: tunnel.close() def init_migration(vm_entry, destination_host_key): # This function would run on destination host i.e host on which the vm # would be transferred after migration. # This host would be responsible for starting VM that would receive # state of VM running on source host. _vm = get_vm(running_vms, vm_entry.key) if _vm: # VM already running. No need to proceed further. logging.info(f"{_vm.key} Already running") return logging.info(f"Starting {vm_entry.key}") vm = create_vm_object(vm_entry, migration=True, migration_port=4444) try: vm.handle.launch() except Exception as e: # We don't care whether MachineError or any other error occurred logging.exception(e) vm.handle.shutdown() else: vm_entry.in_migration = True vm_pool.put(vm_entry) running_vms.append(vm) r = RequestEntry.from_scratch(type=RequestType.TransferVM, hostname=vm_entry.hostname, parameters={ "host": get_ipv4_address(), "port": 4444, }, uuid=vm_entry.uuid, destination_host_key=destination_host_key ) request_pool.put(r)