# TODO: Use Unix File Socket for VNC instead of TCP # QEMU Manual # https://qemu.weilnetz.de/doc/qemu-doc.html # For QEMU Monitor Protocol Commands Information, See # https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor import json import argparse import qmp import logging import os import subprocess import atexit import signal from etcd3_wrapper import Etcd3Wrapper from dataclasses import dataclass from typing import Union from functools import wraps from decouple import config from datetime import datetime from ucloud_common.enums import VMStatus, RUNNING_VM_STATUES running_vms = [] vnc_port_pool = list(range(0, 100)) client = Etcd3Wrapper() @dataclass class VM: key: str vm: qmp.QEMUMachine class RBD(object): @staticmethod def ls(pool): output = "" try: output = subprocess.check_output( ["rbd", "ls", pool], stderr=subprocess.PIPE ).decode("utf-8").strip() except subprocess.CalledProcessError as e: raise Exception(e.stderr) return output.split("\n") logging.basicConfig( level=logging.DEBUG, filename="log.txt", filemode="a", format="%(asctime)s: %(levelname)s - %(message)s", datefmt="%d-%b-%y %H:%M:%S", ) def goodbye(host): host.value["status"] = "DEAD" host.value["last_heartbeat"] = datetime.utcnow().isoformat() client.put(host.key, json.dumps(host.value)) vms = client.get_prefix("/v1/vm", value_in_json=True) vms = filter(lambda v: v.value["hostname"] == host.key, vms) for vm in vms: vm.value["hostname"] = "" if vm.value["status"] in RUNNING_VM_STATUES: vm.value["status"] = VMStatus.requested_start client.put(vm.key, vm.value, value_in_json=True) logging.info(f"Host {host.key} dead! at {host.value['last_heartbeat']}") print("Goodbye") os.kill(os.getpid(), signal.SIGKILL) def need_running_vm(func): @wraps(func) def wrapper(e): vm = get_vm(running_vms, e.key) if vm: try: status = vm.vm.command("query-status") logging.debug(f"VM Status Check - {status}") except OSError: logging.info(f"{func.__name__} failed - VM {e.key} - Unknown Error") return func(e) else: logging.info(f"{func.__name__} failed because VM {e.key} is not running") return return wrapper def create_vm(vm_uuid, e): image = client.get( f"/v1/image/{e.value['image_uuid']}", value_in_json=True ) if image: logging.debug(image) image_uuid = e.value["image_uuid"] logging.info("Creating New VM...") _command_to_create = f"rbd clone images/{image_uuid}@protected uservms/{vm_uuid}" subprocess.call(_command_to_create.split(" ")) e.value["status"] = "REQUESTED_START" client.put(e.key, json.dumps(e.value)) def start_vm(vm_path, e): if not vm_path.split("/")[-1] in RBD.ls("uservms"): logging.info(f"Image file of vm {e.key} does not exists") logging.info(f"Setting vm {e.key} status to DELETED") e.value["status"] = "DELETED" client.put(e.key, json.dumps(e.value)) return _vm = get_vm(running_vms, e.key) if _vm: logging.info(f"{e.key} already running") e.value["status"] = "RUNNING" client.put(e.key, json.dumps(e.value)) return # FIXME: There should be better vnc port allocation scheme vm = qmp.QEMUMachine( "/usr/bin/qemu-system-x86_64", test_dir="vm_socklog", args=[ vm_path, "-boot", "c", # First Boot Hard drive "-m", "1024", # RAM limit # Ever growing port number "-vnc", f":{vnc_port_pool.pop(0)}", # Allow VNC ], ) try: logging.info(f"Starting {e.key}") vm.launch() if vm.is_running(): running_vms.append(VM(e.key, vm)) e.value["status"] = "RUNNING" client.put(e.key, e.value, value_in_json=True) else: e.value["status"] = "KILLED" client.put(e.key, e.value, value_in_json=True) return except (qmp.QEMUMachineError, TypeError): logging.exception(f"Machine Error Occurred on {e.key}") e.value["status"] = "KILLED" client.put(e.key, e.value, value_in_json=True) else: logging.info(f"Started Successfully {e.key}") @need_running_vm def suspend_vm(e): vm = get_vm(running_vms, e.key) vm.vm.command("stop") if vm.vm.command("query-status")["status"] == "paused": e.value["status"] = "SUSPENDED" client.put(e.key, json.dumps(e.value)) logging.info(f"Successfully suspended VM {e.key}") else: logging.info(f"Suspending VM {e.key} failed") @need_running_vm def resume_vm(e): vm = get_vm(running_vms, e.key) vm.vm.command("cont") if vm.vm.command("query-status")["status"] == "running": e.value["status"] = "RUNNING" client.put(e.key, json.dumps(e.value)) logging.info(f"Successfully resumed VM {e.key}") else: logging.info(f"Resuming VM {e.key} failed") @need_running_vm def shutdown_vm(e): vm = get_vm(running_vms, e.key) vm.vm.shutdown() if not vm.vm.is_running(): logging.info(f"VM {e.key} shutdown successfully") e.value["status"] = "STOPPED" client.put(e.key, json.dumps(e.value)) running_vms.remove(vm) def delete_vm(e): #FIXME: Implementation Obseleted after CEPH Integeration logging.info(f"Deleting VM {e.key}") shutdown_vm(e) vm = client.get(e.key, value_in_json=True) if vm: vm_id = e.key.split('/')[-1] vm_owner = e.value['owner'] vm_path = f"{config('BASE_DIR')}/{vm_owner}/.vm/{vm_id}" if os.path.exists(vm_path): os.remove(vm_path) client.client.delete(e.key) logging.info(f"VM {vm.key} deleted") else: logging.info(f"Cannot delete key {e.key} because it doesn't exists") def get_vm(vm_list: list, vm_key) -> Union[VM, None]: return next((vm for vm in vm_list if vm.key == vm_key), None) def maintenence(e, host): # VMs on this Host _vms = filter(lambda v: v.value["hostname"] == host.key, client.get_prefix("/v1/vm", value_in_json=True)) alleged_running_vms = filter(lambda v: v.value["status"] == "RUNNING", _vms) # TODO: Delete this. This was intended to start VMs that # requested to be started when ucloud-vm is not running. # This is no longer needed as we check for pending requests # at the start and handle them. # should_be_running = filter(lambda v: v.value["status"] == "REQUESTED_START", _vms) for vm in alleged_running_vms: _vm = get_vm(running_vms, vm.key) if (_vm and not _vm.vm.is_running()) or _vm is None: logging.debug(f"{_vm} {vm.key}") logging.info(f"{vm.key} is not running but is said to be running") logging.info(f"Updating {vm.key} status to KILLED") vm.value["status"] = "KILLED" client.put(vm.key, json.dumps(vm.value)) # TODO: Delete this. This was intended to start VMs that # requested to be started when ucloud-vm is not running. # This is no longer needed as we check for pending requests # at the start and handle them. # for vm in should_be_running: # vm_path = f"rbd:uservms/{vm.key.split('/')[-1]}" # start_vm(vm_path, e) # TODO: Check whether a vm is running on this host that # is not supposed to be running on this host host.value["status"] = "ALIVE" host.value["last_heartbeat"] = datetime.utcnow().isoformat() client.put(host.key, json.dumps(host.value)) logging.info(f"Updated last heartbeat time {host.value['last_heartbeat']}") def main(): argparser = argparse.ArgumentParser() argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1") args = argparser.parse_args() host = client.get(args.hostname, value_in_json=True) if not host: print("No Such Host") exit(1) host.value["status"] = "ALIVE" host.value["last_heartbeat"] = datetime.utcnow().isoformat() client.put(host.key, host.value, value_in_json=True) atexit.register(goodbye, host=host) for events_iterator in [client.get_prefix("/v1/vm/"), client.watch_prefix("/v1/vm/", timeout=10)]: for e in events_iterator: # TODO: Should we disable timeout alarm inside # event handling code and enable it while going outside try: e.value = json.loads(e.value) except json.JSONDecodeError: logging.error(f"Invalid JSON {e.value}") continue e_status = e.value["status"] if e_status == "TIMEOUT": logging.info("Timeout") maintenence(e, host) continue e_hostname = e.value["hostname"] if hasattr(e.value, "migration_destination"): e_migration_destination = e.value["migration_destination"] else: e_migration_destination = "" vm_uuid = e.key.split("/")[-1] # If the event is directed toward me or # I am destination of a REQUESTED_MIGRATION if e_hostname == host.key or\ e_migration_destination == host.key: logging.debug(f"EVENT: {e}") if e_status == "SCHEDULED_DEPLOY": create_vm(vm_uuid, e) elif e_status == "REQUESTED_SUSPEND": suspend_vm(e) elif e_status == "REQUESTED_RESUME": resume_vm(e) elif e_status == "REQUESTED_START": vm_path = f"rbd:uservms/{vm_uuid}" start_vm(vm_path, e) elif e_status == "REQUESTED_SHUTDOWN": shutdown_vm(e) elif e_status == "DELETED": delete_vm(e) # elif e_status == "REQUESTED_MIGRATION": # if e.value["migration_destination"] logging.info(f"Running VMs {running_vms}") main()