# TODO: Use Unix File Socket for VNC instead of TCP # QEMU Manual # https://qemu.weilnetz.de/doc/qemu-doc.html # For QEMU Monitor Protocol Commands Information, See # https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor import argparse import qmp import logging import subprocess import threading import time from etcd3_wrapper import Etcd3Wrapper from dataclasses import dataclass from typing import Union from functools import wraps from ucloud_common.rbd import RBD from ucloud_common.vm import VmPool, VMEntry, VMStatus from ucloud_common.host import HostPool, HostEntry running_vms = [] vnc_port_pool = list(range(0, 100)) client = Etcd3Wrapper() VM_POOL = None HOST_POOL = None logging.basicConfig( level=logging.DEBUG, filename="log.txt", filemode="a", format="%(asctime)s: %(levelname)s - %(message)s", datefmt="%d-%b-%y %H:%M:%S", ) @dataclass class VM: key: str vm: qmp.QEMUMachine def update_heartbeat(host: HostEntry): while True: host.update_heartbeat() HOST_POOL.put(host) time.sleep(10) logging.info(f"Updated last heartbeat time {host.last_heartbeat}") def need_running_vm(func): @wraps(func) def wrapper(e): vm = get_vm(running_vms, e.key) if vm: try: status = vm.vm.command("query-status") logging.debug(f"VM Status Check - {status}") except OSError: logging.info( f"{func.__name__} failed - VM {e.key} - Unknown Error" ) return func(e) else: logging.info( f"{func.__name__} failed because VM {e.key} is not running" ) return return wrapper def create_vm(vm): image = client.get(f"/v1/image/{vm.value['image_uuid']}", value_in_json=True) if image: logging.debug(image) logging.info("Creating New VM...") _command_to_create = f"rbd clone images/{vm.image_uuid}@protected uservms/{vm.uuid}" try: subprocess.call(_command_to_create.split(" ")) vm.status = VMStatus.requested_start VM_POOL.put(vm) except: logging.exception("Can't clone image") else: logging.info(f"Image not found for {vm.image_uuid}") def start_vm(e): vm_path = f"rbd:uservms/{e.uuid}" try: user_vms = RBD.ls("uservms") except: logging.info("Can't access uservms pool") return if e.uuid not in user_vms: logging.info(f"Image file of vm {e.key} does not exists") logging.info(f"Deleting vm {e.key}") client.client.delete(e.key) return _vm = get_vm(running_vms, e.key) if _vm: logging.info(f"{e.key} already running") e.status = VMStatus.running VM_POOL.put(e) return # FIXME: There should be better vnc port allocation scheme vm = qmp.QEMUMachine( "/usr/bin/qemu-system-x86_64", test_dir="vm_socklog", args=[ vm_path, "-boot", "c", # First Boot Hard drive "-m", "1024", # RAM limit # Ever growing port number "-vnc", f":{vnc_port_pool.pop(0)}", # Allow VNC ], ) try: logging.info(f"Starting {e.key}") vm.launch() if vm.is_running(): running_vms.append(VM(e.key, vm)) e.status = VMStatus.running VM_POOL.put(e) else: e.declare_killed() VM_POOL.put(e) return except (qmp.QEMUMachineError, TypeError): logging.exception(f"Machine Error Occurred on {e.key}") e.declare_killed() VM_POOL.put(e) else: logging.info(f"Started Successfully {e.key}") @need_running_vm def suspend_vm(e): vm = get_vm(running_vms, e.key) vm.vm.command("stop") if vm.vm.command("query-status")["status"] == "paused": e.status = VMStatus.suspended VM_POOL.put(e) logging.info(f"Successfully suspended VM {e.key}") else: logging.info(f"Suspending VM {e.key} failed") @need_running_vm def resume_vm(e): vm = get_vm(running_vms, e.key) vm.vm.command("cont") if vm.vm.command("query-status")["status"] == "running": e.status = VMStatus.running VM_POOL.put(e) logging.info(f"Successfully resumed VM {e.key}") else: logging.info(f"Resuming VM {e.key} failed") @need_running_vm def shutdown_vm(e): vm = get_vm(running_vms, e.key) vm.vm.shutdown() if not vm.vm.is_running(): logging.info(f"VM {e.key} shutdown successfully") e.status = VMStatus.stopped VM_POOL.put(e) running_vms.remove(vm) def delete_vm(e): # TODO: Delete VM Image From CEPH logging.info(f"Deleting VM {e.key}") shutdown_vm(e) client.client.delete(e.key) def get_vm(vm_list: list, vm_key) -> Union[VM, None]: return next((vm for vm in vm_list if vm.key == vm_key), None) def maintenance(host): _vms = VM_POOL.by_host(host.key) alleged_running_vms = VM_POOL.by_status("RUNNING", _vms) for vm in alleged_running_vms: _vm = get_vm(running_vms, vm.key) if (_vm and not _vm.vm.is_running()) or _vm is None: logging.debug(f"{_vm} {vm.key}") logging.info( f"{vm.key} is not running but is said to be running" ) logging.info(f"Updating {vm.key} status to KILLED") vm.declare_killed() VM_POOL.put(vm) running_vms.remove(vm) def main(): argparser = argparse.ArgumentParser() argparser.add_argument( "hostname", help="Name of this host. e.g /v1/host/1" ) args = argparser.parse_args() global HOST_POOL, VM_POOL HOST_POOL = HostPool(client, "/v1/host") VM_POOL = VmPool(client, "/v1/vm") host = HOST_POOL.get(args.hostname) if not host: print("No Such Host") exit(1) # It is seen that under heavy load, timeout event doesn't come # in a predictive manner which delays heart beat update which # in turn misunderstood by scheduler that the host is dead # when it is actually alive. So, to ensure that we update the # heart beat in a predictive manner we start Heart beat updating # mechanism in separated thread heartbeat_updating_thread = threading.Thread(target=update_heartbeat, args=(host,)) heartbeat_updating_thread.start() for events_iterator in [ client.get_prefix("/v1/vm/", value_in_json=True), client.watch_prefix("/v1/vm/", timeout=10, value_in_json=True), ]: for e in events_iterator: e = VMEntry(e) if e.status == "TIMEOUT": logging.info("Timeout") maintenance(host) continue # TODO: Re-evaluate Migration Design if hasattr(e, "migration_destination"): e_migration_destination = e.value["migration_destination"] else: e_migration_destination = "" # If the event is directed toward me or # I am destination of a REQUESTED_MIGRATION if e.hostname == host.key or e_migration_destination == host.key: logging.debug(f"EVENT: {e}") if e.status == "SCHEDULED_DEPLOY": create_vm(e) elif e.status == "REQUESTED_SUSPEND": suspend_vm(e) elif e.status == "REQUESTED_RESUME": resume_vm(e) elif e.status == "REQUESTED_START": start_vm(e) elif e.status == "REQUESTED_SHUTDOWN": shutdown_vm(e) elif e.status == "DELETED": delete_vm(e) logging.info(f"Running VMs {running_vms}") main()