ucloud-vm/main.py

286 lines
8.9 KiB
Python

# TODO: Use Unix File Socket for VNC instead of TCP
# QEMU Manual
# https://qemu.weilnetz.de/doc/qemu-doc.html
# For QEMU Monitor Protocol Commands Information, See
# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor
import json
import argparse
import qmp
import logging
import os
import subprocess
import atexit
import signal
from etcd3_wrapper import Etcd3Wrapper
from dataclasses import dataclass
from typing import Union
from functools import wraps
from decouple import config
from datetime import datetime
running_vms = []
vnc_port_pool = list(range(5900, 6000))
client = Etcd3Wrapper()
@dataclass
class VM:
key: str
vm: qmp.QEMUMachine
logging.basicConfig(
level=logging.DEBUG,
filename="log.txt",
filemode="a",
format="%(asctime)s: %(levelname)s - %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
)
def goodbye(host):
host.value["status"] = "DEAD"
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
client.put(host.key, json.dumps(host.value))
logging.info(f"Host {host.key} dead! at {host.value['last_heartbeat']}")
os.kill(os.getpid(), signal.SIGKILL)
def need_running_vm(func):
@wraps(func)
def wrapper(e):
vm = get_vm(running_vms, e.key)
if vm:
try:
status = vm.vm.command("query-status")
logging.debug(f"VM Status Check - {status}")
except OSError:
logging.info(f"{func.__name__} failed - VM {e.key} - Unknown Error")
return func(e)
else:
logging.info(f"{func.__name__} failed because VM {e.key} is not running")
return
return wrapper
def create_vm(vm_uuid, e):
image = client.get(
f"/v1/image/{e.value['image_uuid']}", value_in_json=True
)
if image:
logging.debug(image)
image_uuid = e.value["image_uuid"]
logging.info("Creating New VM...")
_command_to_create = f"rbd clone images/{image_uuid}@protected uservms/{vm_uuid}"
subprocess.call(_command_to_create.split(" "))
# DELETEME: Delete when CEPH integeration is complete
# os.makedirs(f"{owner_dir}/.vm", exist_ok=True)
# if not os.path.isfile(f"{owner_dir}/.vm/{vm_uuid}.raw"):
# shutil.copy(
# f"/var/vm/{image_uuid}.raw", f"{owner_dir}/.vm/{vm_uuid}.raw"
# )
e.value["status"] = "REQUESTED_START"
client.put(e.key, json.dumps(e.value))
def start_vm(vm_path, e):
# FIXME: Assume for the moment that the image exists
# Use librados to list files that exists in
# uservms pool then checkwhether the e.key.split("/").pop()
# exists in rbd_ls(uservms_pool)
# if not os.path.isfile(vm_path):
# logging.info(f"Image file of vm {e.key} does not exists")
# logging.info(f"Setting vm {e.key} status to DELETED")
# e.value["status"] = "DELETED"
# client.put(e.key, json.dumps(e.value))
# return
_vm = get_vm(running_vms, e.key)
if _vm:
logging.info(f"{e.key} already running")
e.value["status"] = "RUNNING"
client.put(e.key, json.dumps(e.value))
return
# FIXME: There should be better vnc port allocation scheme
vm = qmp.QEMUMachine(
"/usr/bin/qemu-system-x86_64",
test_dir="vm_socklog",
args=[
vm_path,
"-boot", "c", # First Boot Hard drive
"-m", "1024", # RAM limit
# Ever growing port number
"-vnc", f":{vnc_port_pool.pop(0)}", # Allow VNC
],
)
try:
logging.info(f"Starting {e.key}")
vm.launch()
if vm.is_running():
running_vms.append(VM(e.key, vm))
e.value["status"] = "RUNNING"
client.put(e.key, e.value, value_in_json=True)
else:
e.value["status"] = "KILLED"
client.put(e.key, e.value, value_in_json=True)
return
except (qmp.QEMUMachineError, TypeError):
logging.info()
logging.exception(f"Machine Error Occurred on {e.key}")
e.value["status"] = "KILLED"
client.put(e.key, e.value, value_in_json=True)
else:
logging.info(f"Started Successfully {e.key}")
@need_running_vm
def suspend_vm(e):
vm = get_vm(running_vms, e.key)
vm.vm.command("stop")
if vm.vm.command("query-status")["status"] == "paused":
e.value["status"] = "SUSPENDED"
client.put(e.key, json.dumps(e.value))
logging.info(f"Successfully suspended VM {e.key}")
else:
logging.info(f"Suspending VM {e.key} failed")
@need_running_vm
def resume_vm(e):
vm = get_vm(running_vms, e.key)
vm.vm.command("cont")
if vm.vm.command("query-status")["status"] == "running":
e.value["status"] = "RUNNING"
client.put(e.key, json.dumps(e.value))
logging.info(f"Successfully resumed VM {e.key}")
else:
logging.info(f"Resuming VM {e.key} failed")
@need_running_vm
def shutdown_vm(e):
vm = get_vm(running_vms, e.key)
vm.vm.shutdown()
if not vm.vm.is_running():
logging.info(f"VM {e.key} shutdown successfully")
e.value["status"] = "STOPPED"
client.put(e.key, json.dumps(e.value))
running_vms.remove(vm)
def delete_vm(e):
logging.info(f"Deleting VM {e.key}")
shutdown_vm(e)
vm = client.get(e.key, value_in_json=True)
if vm:
vm_id = e.key.split('/')[-1]
vm_owner = e.value['owner']
vm_path = f"{config('BASE_DIR')}/{vm_owner}/.vm/{vm_id}"
if os.path.exists(vm_path):
os.remove(vm_path)
client.client.delete(e.key)
logging.info(f"VM {vm.key} deleted")
else:
logging.info(f"Cannot delete key {e.key} because it doesn't exists")
def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
return next((vm for vm in vm_list if vm.key == vm_key), None)
def main():
argparser = argparse.ArgumentParser()
argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1")
args = argparser.parse_args()
host = client.get(args.hostname, value_in_json=True)
host.value["status"] = "ALIVE"
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
atexit.register(goodbye, host=host)
for events_iterator in [client.get_prefix("/v1/vm/"),
client.watch_prefix("/v1/vm/", timeout=10)]:
for e in events:
try:
e.value = json.loads(e.value)
except json.JSONDecodeError:
logging.error(f"Invalid JSON {e.value}")
continue
e_status = e.value["status"]
if e_status == "TIMEOUT":
logging.info("Timeout")
_vms = filter(lambda v: v.value["hostname"] == args.hostname, client.get_prefix("/v1/vm", value_in_json=True))
alleged_running_vms = filter(lambda v: v.value["status"] == "RUNNING", _vms)
should_be_running = filter(lambda v: v.value["status"] == "REQUESTED_START", _vms)
for vm in alleged_running_vms:
_vm = get_vm(running_vms, vm.key)
if (_vm and not _vm.vm.is_running()) or _vm is None:
logging.debug(f"{_vm} {vm.key}")
logging.info(f"{vm.key} is not running but is said to be running")
logging.info(f"Updating {vm.key} status to KILLED")
vm.value["status"] = "KILLED"
client.put(vm.key, json.dumps(vm.value))
for vm in should_be_running:
vm_path = f"rbd:uservms/{vm.key.split('/')[-1]}"
start_vm(vm_path, e)
host.value["status"] = "ALIVE"
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
client.put(host.key, json.dumps(host.value))
logging.info(f"Updated last heartbeat time {host.value['last_heartbeat']}")
continue
e_hostname = e.value["hostname"]
vm_uuid = e.key.split("/")[-1]
# If it is not for me then skip it
if e_hostname != args.hostname:
continue
logging.debug(f"EVENT: {e}")
if e_status == "SCHEDULED_DEPLOY":
create_vm(vm_uuid, e)
elif e_status == "REQUESTED_SUSPEND":
suspend_vm(e)
elif e_status == "REQUESTED_RESUME":
resume_vm(e)
elif e_status == "REQUESTED_START":
# DELETEME: Delete when CEPH integeration is complete
# vm_path = f"{owner_dir}/.vm/{vm_uuid}.raw"
vm_path = f"rbd:uservms/{vm_uuid}"
start_vm(vm_path, e)
elif e_status == "REQUESTED_SHUTDOWN":
shutdown_vm(e)
elif e_status == "DELETED":
delete_vm(e)
logging.info(f"Running VMs {running_vms}")
main()