# QEMU Manual # https://qemu.weilnetz.de/doc/qemu-doc.html # For QEMU Monitor Protocol Commands Information, See # https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor import subprocess import traceback import errno import qmp import tempfile import bitmath import time import os from config import (vm_pool, request_pool, etcd_client, logging, running_vms, WITHOUT_CEPH) from ucloud_common.vm import VMStatus, VMEntry from typing import Union from functools import wraps from dataclasses import dataclass from ucloud_common.request import RequestEntry, RequestType import sshtunnel from decouple import config from os.path import join from ucloud_common.helpers import get_ipv4_address @dataclass class VM: key: str handle: qmp.QEMUMachine vnc_socket_file: tempfile.NamedTemporaryFile def __repr__(self): return f"VM({self.key})" def get_start_command_args(vm_entry, vnc_sock_filename: str, migration=False, migration_port=4444): threads_per_core = 1 vm_memory = int(bitmath.Byte(int(vm_entry.specs["ram"])).to_MB()) vm_cpus = int(vm_entry.specs["cpu"]) vm_uuid = vm_entry.uuid if WITHOUT_CEPH: command = f"-drive file={os.path.join('/var/vm', vm_uuid)},format=raw,if=virtio,cache=none" else: command = f"-drive file=rbd:uservms/{vm_uuid},format=raw,if=virtio,cache=none" command += (f" -device virtio-rng-pci -vnc unix:{vnc_sock_filename}" f" -m {vm_memory} -smp cores={vm_cpus},threads={threads_per_core}" f" -name {vm_uuid}") if migration: command += f" -incoming tcp:0:{migration_port}" return command.split(" ") def create_vm_object(vm_entry, migration=False, migration_port=4444): # NOTE: If migration suddenly stop working, having different # VNC unix filename on source and destination host can # be a possible cause of it. # REQUIREMENT: Use Unix Socket instead of TCP Port for VNC vnc_sock_file = tempfile.NamedTemporaryFile() qemu_args = get_start_command_args(vm_entry=vm_entry, vnc_sock_filename=vnc_sock_file.name, migration=migration, migration_port=migration_port) qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args) return VM(vm_entry.key, qemu_machine, vnc_sock_file) def get_vm(vm_list: list, vm_key) -> Union[VM, None]: return next((vm for vm in vm_list if vm.key == vm_key), None) def need_running_vm(func): @wraps(func) def wrapper(e): vm = get_vm(running_vms, e.key) if vm: try: status = vm.handle.command("query-status") logging.debug(f"VM Status Check - {status}") except Exception as exception: logging.info( f"{func.__name__} failed - VM {e} {exception} - Unknown Error" ) else: return func(e) else: logging.info( f"{func.__name__} failed because VM {e.key} is not running" ) return return wrapper def create(vm_entry: VMEntry): vm_hdd = int(bitmath.Byte(int(vm_entry.specs["hdd"])).to_MB()) if WITHOUT_CEPH: _command_to_create = ["cp", os.path.join("/var/image", vm_entry.image_uuid), os.path.join("/var/vm", vm_entry.uuid)] _command_to_extend = ["qemu-img", "resize", os.path.join("/var/vm", vm_entry.uuid), vm_entry.specs["hdd"]] else: _command_to_create = ["rbd", "clone", f"images/{vm_entry.image_uuid}@protected", f"uservms/{vm_entry.uuid}"] _command_to_extend = ["rbd", "resize", f"uservms/{vm_entry.uuid}", "--size", vm_hdd] try: subprocess.check_output(_command_to_create) except subprocess.CalledProcessError as e: if e.returncode == errno.EEXIST: logging.debug(f"Image for vm {vm_entry.uuid} exists") # File Already exists. No Problem Continue return # This exception catches all other exceptions # i.e FileNotFound (BaseImage), pool Does Not Exists etc. logging.exception(e) vm_entry.status = "ERROR" else: try: subprocess.check_output(_command_to_extend) except Exception as e: logging.exception(e) else: logging.info("New VM Created") def start(vm_entry: VMEntry): _vm = get_vm(running_vms, vm_entry.key) # VM already running. No need to proceed further. if _vm: logging.info(f"VM {vm_entry.uuid} already running") return else: create(vm_entry) logging.info(f"Starting {vm_entry.key}") vm = create_vm_object(vm_entry) try: vm.handle.launch() except (qmp.QEMUMachineError, TypeError, Exception): vm_entry.declare_killed() vm_entry.add_log(f"Machine Error occurred - {traceback.format_exc()}") vm_pool.put(vm_entry) else: running_vms.append(vm) vm_entry.status = VMStatus.running vm_entry.add_log("Started successfully") vm_pool.put(vm_entry) @need_running_vm def stop(vm_entry): vm = get_vm(running_vms, vm_entry.key) vm.handle.shutdown() if not vm.handle.is_running(): vm_entry.add_log("Shutdown successfully") vm_entry.declare_stopped() vm_pool.put(vm_entry) running_vms.remove(vm) def delete(vm_entry): logging.info(f"Deleting VM {vm_entry}") stop(vm_entry) path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1:] if WITHOUT_CEPH: vm_deletion_command = ["rm", os.path.join("/var/vm", vm_entry.uuid)] else: vm_deletion_command = ["rbd", "rm", path_without_protocol] try: subprocess.check_output(vm_deletion_command) except Exception as e: logging.exception(e) else: etcd_client.client.delete(vm_entry.key) def transfer(request_event): # This function would run on source host i.e host on which the vm # is running initially. This host would be responsible for transferring # vm state to destination host. _host, _port = request_event.parameters["host"], request_event.parameters["port"] _uuid = request_event.uuid _destination = request_event.destination_host_key vm = get_vm(running_vms, join("/v1/vm", _uuid)) if vm: tunnel = sshtunnel.SSHTunnelForwarder( (_host, 22), ssh_username=config("ssh_username"), ssh_pkey=config("ssh_pkey"), ssh_private_key_password=config("ssh_private_key_password"), remote_bind_address=('127.0.0.1', _port), ) try: tunnel.start() except sshtunnel.BaseSSHTunnelForwarderError: logging.exception(f"Couldn't establish connection to ({_host}, 22)") else: vm.handle.command("migrate", uri=f"tcp:{_host}:{tunnel.local_bind_port}") status = vm.handle.command("query-migrate")["status"] while status not in ["failed", "completed"]: time.sleep(2) status = vm.handle.command("query-migrate")["status"] with vm_pool.get_put(request_event.uuid) as source_vm: if status == "failed": source_vm.add_log("Migration Failed") elif status == "completed": # If VM is successfully migrated then shutdown the VM # on this host and update hostname to destination host key source_vm.add_log("Successfully migrated") source_vm.hostname = _destination running_vms.remove(vm) vm.handle.shutdown() source_vm.in_migration = False # VM transfer finished finally: tunnel.close() def init_migration(vm_entry, destination_host_key): # This function would run on destination host i.e host on which the vm # would be transferred after migration. # This host would be responsible for starting VM that would receive # state of VM running on source host. _vm = get_vm(running_vms, vm_entry.key) if _vm: # VM already running. No need to proceed further. logging.info(f"{_vm.key} Already running") return logging.info(f"Starting {vm_entry.key}") vm = create_vm_object(vm_entry, migration=True, migration_port=4444) try: vm.handle.launch() except Exception as e: # We don't care whether MachineError or any other error occurred logging.exception(e) vm.handle.shutdown() else: vm_entry.in_migration = True vm_pool.put(vm_entry) running_vms.append(vm) r = RequestEntry.from_scratch(type=RequestType.TransferVM, hostname=vm_entry.hostname, parameters={ "host": get_ipv4_address(), "port": 4444, }, uuid=vm_entry.uuid, destination_host_key=destination_host_key ) request_pool.put(r)