# TODO: Use Unix File Socket for VNC instead of TCP # QEMU Manual # https://qemu.weilnetz.de/doc/qemu-doc.html # For QEMU Monitor Protocol Commands Information, See # https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor import json import argparse import qmp import logging import os import subprocess import atexit import signal from etcd3_wrapper import Etcd3Wrapper from dataclasses import dataclass from typing import Union from functools import wraps from decouple import config from datetime import datetime running_vms = [] vnc_port_pool = list(range(5900, 6000)) client = Etcd3Wrapper() @dataclass class VM: key: str vm: qmp.QEMUMachine logging.basicConfig( level=logging.DEBUG, filename="log.txt", filemode="a", format="%(asctime)s: %(levelname)s - %(message)s", datefmt="%d-%b-%y %H:%M:%S", ) def goodbye(host): host.value["status"] = "DEAD" host.value["last_heartbeat"] = datetime.utcnow().isoformat() client.put(host.key, json.dumps(host.value)) logging.info(f"Host {host.key} dead! at {host.value['last_heartbeat']}") os.kill(os.getpid(), signal.SIGKILL) def need_running_vm(func): @wraps(func) def wrapper(e): vm = get_vm(running_vms, e.key) if vm: try: status = vm.vm.command("query-status") logging.debug(f"VM Status Check - {status}") except OSError: logging.info(f"{func.__name__} failed - VM {e.key} - Unknown Error") return func(e) else: logging.info(f"{func.__name__} failed because VM {e.key} is not running") return return wrapper def create_vm(vm_uuid, e): image = client.get( f"/v1/image/{e.value['image_uuid']}", value_in_json=True ) if image: logging.debug(image) image_uuid = e.value["image_uuid"] logging.info("Creating New VM...") _command_to_create = f"rbd clone images/{image_uuid}@protected uservms/{vm_uuid}" subprocess.call(_command_to_create.split(" ")) # DELETEME: Delete when CEPH integeration is complete # os.makedirs(f"{owner_dir}/.vm", exist_ok=True) # if not os.path.isfile(f"{owner_dir}/.vm/{vm_uuid}.raw"): # shutil.copy( # f"/var/vm/{image_uuid}.raw", f"{owner_dir}/.vm/{vm_uuid}.raw" # ) e.value["status"] = "REQUESTED_START" client.put(e.key, json.dumps(e.value)) def start_vm(vm_path, e): # FIXME: Assume for the moment that the image exists # Use librados to list files that exists in # uservms pool then checkwhether the e.key.split("/").pop() # exists in rbd_ls(uservms_pool) # if not os.path.isfile(vm_path): # logging.info(f"Image file of vm {e.key} does not exists") # logging.info(f"Setting vm {e.key} status to DELETED") # e.value["status"] = "DELETED" # client.put(e.key, json.dumps(e.value)) # return _vm = get_vm(running_vms, e.key) if _vm: logging.info(f"{e.key} already running") e.value["status"] = "RUNNING" client.put(e.key, json.dumps(e.value)) return # FIXME: There should be better vnc port allocation scheme vm = qmp.QEMUMachine( "/usr/bin/qemu-system-x86_64", test_dir="vm_socklog", args=[ vm_path, "-boot", "c", # First Boot Hard drive "-m", "1024", # RAM limit # Ever growing port number "-vnc", f":{vnc_port_pool.pop(0)}", # Allow VNC ], ) try: logging.info(f"Starting {e.key}") vm.launch() if vm.is_running(): running_vms.append(VM(e.key, vm)) e.value["status"] = "RUNNING" client.put(e.key, e.value, value_in_json=True) else: e.value["status"] = "KILLED" client.put(e.key, e.value, value_in_json=True) return except (qmp.QEMUMachineError, TypeError): logging.info() logging.exception(f"Machine Error Occurred on {e.key}") e.value["status"] = "KILLED" client.put(e.key, e.value, value_in_json=True) else: logging.info(f"Started Successfully {e.key}") @need_running_vm def suspend_vm(e): vm = get_vm(running_vms, e.key) vm.vm.command("stop") if vm.vm.command("query-status")["status"] == "paused": e.value["status"] = "SUSPENDED" client.put(e.key, json.dumps(e.value)) logging.info(f"Successfully suspended VM {e.key}") else: logging.info(f"Suspending VM {e.key} failed") @need_running_vm def resume_vm(e): vm = get_vm(running_vms, e.key) vm.vm.command("cont") if vm.vm.command("query-status")["status"] == "running": e.value["status"] = "RUNNING" client.put(e.key, json.dumps(e.value)) logging.info(f"Successfully resumed VM {e.key}") else: logging.info(f"Resuming VM {e.key} failed") @need_running_vm def shutdown_vm(e): vm = get_vm(running_vms, e.key) vm.vm.shutdown() if not vm.vm.is_running(): logging.info(f"VM {e.key} shutdown successfully") e.value["status"] = "STOPPED" client.put(e.key, json.dumps(e.value)) running_vms.remove(vm) def delete_vm(e): logging.info(f"Deleting VM {e.key}") shutdown_vm(e) vm = client.get(e.key, value_in_json=True) if vm: vm_id = e.key.split('/')[-1] vm_owner = e.value['owner'] vm_path = f"{config('BASE_DIR')}/{vm_owner}/.vm/{vm_id}" if os.path.exists(vm_path): os.remove(vm_path) client.client.delete(e.key) logging.info(f"VM {vm.key} deleted") else: logging.info(f"Cannot delete key {e.key} because it doesn't exists") def get_vm(vm_list: list, vm_key) -> Union[VM, None]: return next((vm for vm in vm_list if vm.key == vm_key), None) def main(): argparser = argparse.ArgumentParser() argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1") args = argparser.parse_args() host = client.get(args.hostname, value_in_json=True) host.value["status"] = "ALIVE" host.value["last_heartbeat"] = datetime.utcnow().isoformat() atexit.register(goodbye, host=host) for events_iterator in [client.get_prefix("/v1/vm/"), client.watch_prefix("/v1/vm/", timeout=10)]: for e in events_iterator: try: e.value = json.loads(e.value) except json.JSONDecodeError: logging.error(f"Invalid JSON {e.value}") continue e_status = e.value["status"] if e_status == "TIMEOUT": logging.info("Timeout") _vms = filter(lambda v: v.value["hostname"] == args.hostname, client.get_prefix("/v1/vm", value_in_json=True)) alleged_running_vms = filter(lambda v: v.value["status"] == "RUNNING", _vms) should_be_running = filter(lambda v: v.value["status"] == "REQUESTED_START", _vms) for vm in alleged_running_vms: _vm = get_vm(running_vms, vm.key) if (_vm and not _vm.vm.is_running()) or _vm is None: logging.debug(f"{_vm} {vm.key}") logging.info(f"{vm.key} is not running but is said to be running") logging.info(f"Updating {vm.key} status to KILLED") vm.value["status"] = "KILLED" client.put(vm.key, json.dumps(vm.value)) for vm in should_be_running: vm_path = f"rbd:uservms/{vm.key.split('/')[-1]}" start_vm(vm_path, e) host.value["status"] = "ALIVE" host.value["last_heartbeat"] = datetime.utcnow().isoformat() client.put(host.key, json.dumps(host.value)) logging.info(f"Updated last heartbeat time {host.value['last_heartbeat']}") continue e_hostname = e.value["hostname"] vm_uuid = e.key.split("/")[-1] # If it is not for me then skip it if e_hostname != args.hostname: continue logging.debug(f"EVENT: {e}") if e_status == "SCHEDULED_DEPLOY": create_vm(vm_uuid, e) elif e_status == "REQUESTED_SUSPEND": suspend_vm(e) elif e_status == "REQUESTED_RESUME": resume_vm(e) elif e_status == "REQUESTED_START": # DELETEME: Delete when CEPH integeration is complete # vm_path = f"{owner_dir}/.vm/{vm_uuid}.raw" vm_path = f"rbd:uservms/{vm_uuid}" start_vm(vm_path, e) elif e_status == "REQUESTED_SHUTDOWN": shutdown_vm(e) elif e_status == "DELETED": delete_vm(e) logging.info(f"Running VMs {running_vms}") main()