ucloud-vm/main.py

385 lines
13 KiB
Python

# TODO: Use Unix File Socket for VNC instead of TCP
# QEMU Manual
# https://qemu.weilnetz.de/doc/qemu-doc.html
# For QEMU Monitor Protocol Commands Information, See
# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor
import argparse
import qmp
import logging
import subprocess
import threading
import time
import traceback
import sshtunnel
import errno
from dataclasses import dataclass
from decouple import config
from typing import Union
from functools import wraps
from string import Template
from os.path import join
from etcd3_wrapper import Etcd3Wrapper
from ucloud_common.vm import VmPool, VMStatus, VMEntry
from ucloud_common.host import HostPool, HostEntry
from ucloud_common.request import RequestEntry, RequestPool, RequestType
from ucloud_common.helpers import get_ipv4_address
running_vms = []
vnc_port_pool = list(range(0, 100))
client = Etcd3Wrapper()
vm_pool = None
host_pool = None
request_pool = None
logging.basicConfig(
level=logging.DEBUG,
filename="log.txt",
filemode="a",
format="%(asctime)s: %(levelname)s - %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
)
def get_start_command_args(image_path, vnc_port, migration=False, migration_port=4444):
_args = ("-drive file=$image_path,format=raw,if=virtio,cache=none"
" -m 1024 -device virtio-rng-pci -enable-kvm -vnc :$vnc_port")
if migration:
_args = _args + " -incoming tcp:0:$migration_port"
args_template = Template(_args)
if migration:
args = args_template.substitute(image_path=image_path, vnc_port=vnc_port,
migration_port=migration_port)
else:
args = args_template.substitute(image_path=image_path, vnc_port=vnc_port)
return args.split(" ")
@dataclass
class VM:
key: str
handle: qmp.QEMUMachine
def __repr__(self):
return f"VM({self.key})"
def update_heartbeat(host: HostEntry):
while True:
host.update_heartbeat()
host_pool.put(host)
time.sleep(10)
logging.info(f"Updated last heartbeat time {host.last_heartbeat}")
def need_running_vm(func):
@wraps(func)
def wrapper(e):
vm = get_vm(running_vms, e.key)
if vm:
try:
status = vm.handle.command("query-status")
logging.debug(f"VM Status Check - {status}")
except OSError:
logging.info(
f"{func.__name__} failed - VM {e.key} - Unknown Error"
)
return func(e)
else:
logging.info(
f"{func.__name__} failed because VM {e.key} is not running"
)
return
return wrapper
def create_vm(vm_entry: VMEntry):
_command_to_create = f"rbd clone images/{vm_entry.image_uuid}@protected uservms/{vm_entry.uuid}"
try:
subprocess.check_call(_command_to_create.split(" "))
except subprocess.CalledProcessError as e:
if e.returncode == errno.EEXIST:
logging.debug(f"Image for vm {vm_entry.uuid} exists")
# File Already exists. No Problem Continue
return
else:
# This exception catches all other exceptions
# i.e FileNotFound (BaseImage), pool Does Not Exists etc.
logging.exception(f"Can't clone image - {e}")
else:
logging.info("New VM Created")
def start_vm(vm_entry: VMEntry):
_vm = get_vm(running_vms, vm_entry.key)
# VM already running. No need to proceed further.
if _vm:
logging.info(f"VM {vm_entry.uuid} already running")
return
else:
create_vm(vm_entry)
logging.info(f"Starting {vm_entry.key}")
# FIXME: There should be better vnc port allocation scheme
vm = qmp.QEMUMachine(
"/usr/bin/qemu-system-x86_64",
args=get_start_command_args(vm_entry.path, vnc_port_pool.pop(0)),
)
try:
vm.launch()
except (qmp.QEMUMachineError, TypeError, Exception):
vm_entry.declare_killed()
vm_entry.add_log(f"Machine Error occurred - {traceback.format_exc()}")
vm_pool.put(vm_entry)
else:
running_vms.append(VM(vm_entry.key, vm))
vm_entry.status = VMStatus.running
vm_entry.add_log("Started successfully")
vm_pool.put(vm_entry)
@need_running_vm
def stop_vm(vm_entry):
vm = get_vm(running_vms, vm_entry.key)
vm.handle.shutdown()
if not vm.handle.is_running():
vm_entry.add_log("Shutdown successfully")
vm_entry.declare_stopped()
vm_pool.put(vm_entry)
running_vms.remove(vm)
def delete_vm(vm_entry):
logging.info(f"Deleting VM {vm_entry}")
stop_vm(vm_entry)
path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1:]
try:
rc = subprocess.call(f"rbd rm {path_without_protocol}".split(" "))
except FileNotFoundError as e:
logging.exception(e)
except Exception as e:
logging.exception(f"Unknown error occurred - {e}")
else:
if rc == 0:
client.client.delete(vm_entry.key)
else:
logging.info("Some unknown problem occur while deleting vm file")
def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
return next((vm for vm in vm_list if vm.key == vm_key), None)
def maintenance(host):
# To capture vm running according to running_vms list
# This is to capture successful migration of a VM.
# Suppose, this host is running "vm1" and user initiated
# request to migrate this "vm1" to some other host. On,
# successful migration the destination host would set
# the vm hostname to itself. Thus, we are checking
# whether this host vm is successfully migrated. If yes
# then we shutdown "vm1" on this host.
for vm in running_vms:
with vm_pool.get_put(vm.key) as vm_entry:
if vm_entry.hostname != host.key and not vm_entry.in_migration:
vm.handle.shutdown()
vm_entry.add_log("VM on source host shutdown.")
# To check vm running according to etcd entries
alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key))
for vm_entry in alleged_running_vms:
_vm = get_vm(running_vms, vm_entry.key)
# Whether, the allegedly running vm is in our
# running_vms list or not if it is said to be
# running on this host but it is not then we
# need to shut it down
# This is to capture poweroff/shutdown of a VM
# initiated by user inside VM. OR crash of VM by some
# user running process
if (_vm and not _vm.handle.is_running())\
or not _vm:
vm_entry.add_log(f"{vm_entry.key} is not running but is said to be running."
"So, shutting it down and declare it killed")
vm_entry.declare_killed()
vm_pool.put(vm_entry)
if _vm:
running_vms.remove(_vm)
def transfer_vm(request_event):
# This function would run on source host i.e host on which the vm
# is running initially. This host would be responsible for transferring
# vm state to destination host.
_host, _port = request_event.parameters["host"], request_event.parameters["port"]
_uuid = request_event.uuid
_destination = request_event.destination_host_key
vm = get_vm(running_vms, join("/v1/vm", _uuid))
if vm:
tunnel = sshtunnel.SSHTunnelForwarder(
(_host, 22),
ssh_username=config("ssh_username"),
ssh_pkey=config("ssh_pkey"),
ssh_private_key_password=config("ssh_private_key_password"),
remote_bind_address=('127.0.0.1', _port),
)
try:
tunnel.start()
except sshtunnel.BaseSSHTunnelForwarderError:
logging.exception(f"Couldn't establish connection to ({_host}, 22)")
else:
vm.handle.command("migrate", uri=f"tcp:{_host}:{tunnel.local_bind_port}")
status = vm.handle.command("query-migrate")["status"]
while status not in ["failed", "completed"]:
time.sleep(2)
status = vm.handle.command("query-migrate")["status"]
with vm_pool.get_put(request_event.uuid) as source_vm:
if status == "failed":
source_vm.add_log("Migration Failed")
elif status == "completed":
# If VM is successfully migrated then shutdown the VM
# on this host and update hostname to destination host key
source_vm.add_log("Successfully migrated")
source_vm.hostname = _destination
running_vms.remove(vm)
vm.handle.shutdown()
source_vm.in_migration = False # VM transfer finished
finally:
tunnel.close()
def init_vm_migration(vm_entry, destination_host_key):
# This function would run on destination host i.e host on which the vm
# would be transferred after migration. This host would be responsible
# for starting VM that would receive state of VM running on source host.
_vm = get_vm(running_vms, vm_entry.key)
if _vm:
# VM already running. No need to proceed further.
logging.log(f"{_vm.key} Already running")
return
logging.info(f"Starting {vm_entry.key}")
# FIXME: There should be better vnc port allocation scheme
actual_vm = qmp.QEMUMachine(
"/usr/bin/qemu-system-x86_64",
args=get_start_command_args(vm_entry.path, 100, migration=True, migration_port=4444),
)
try:
actual_vm.launch()
except Exception as e:
# We don't care whether MachineError or any other error occurred
logging.exception(e)
actual_vm.shutdown()
else:
vm_entry.in_migration = True
vm_pool.put(vm_entry)
running_vms.append(VM(vm_entry.key, actual_vm))
r = RequestEntry.from_scratch(type=RequestType.TransferVM,
hostname=vm_entry.hostname,
parameters={
"host": get_ipv4_address(),
"port": 4444,
},
uuid=vm_entry.uuid,
destination_host_key=destination_host_key
)
request_pool.put(r)
def main():
argparser = argparse.ArgumentParser()
argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1")
args = argparser.parse_args()
global host_pool, vm_pool, request_pool
host_pool = HostPool(client, "/v1/host")
vm_pool = VmPool(client, "/v1/vm")
request_pool = RequestPool(client, "/v1/request")
host = host_pool.get(args.hostname)
if not host:
print("No Such Host")
exit(1)
logging.info(f"{'*' * 5} Session Started {'*' * 5}")
# It is seen that under heavy load, timeout event doesn't come
# in a predictive manner which delays heart beat update which
# in turn misunderstood by scheduler that the host is dead
# when it is actually alive. So, to ensure that we update the
# heart beat in a predictive manner we start Heart beat updating
# mechanism in separated thread
heartbeat_updating_thread = threading.Thread(target=update_heartbeat, args=(host,))
try:
heartbeat_updating_thread.start()
except Exception as e:
print("No Need To Go Further. Our heartbeat updating mechanism is not working")
logging.exception(e)
exit(-1)
for events_iterator in [
client.get_prefix("/v1/request/", value_in_json=True),
client.watch_prefix("/v1/request/", timeout=10, value_in_json=True),
]:
for request_event in events_iterator:
request_event = RequestEntry(request_event)
if request_event.type == "TIMEOUT":
logging.info("Timeout Event")
maintenance(host)
continue
# If the event is directed toward me OR I am destination of a InitVMMigration
if hasattr(request_event, "hostname") and request_event.hostname == host.key or\
hasattr(request_event, "destination") and request_event.destination == host.key:
request_pool.client.client.delete(request_event.key)
vm_entry = vm_pool.get(request_event.uuid)
logging.debug(f"EVENT: {request_event}")
if request_event.type == RequestType.StartVM:
start_vm(vm_entry)
elif request_event.type == RequestType.StopVM:
stop_vm(vm_entry)
elif request_event.type == RequestType.DeleteVM:
delete_vm(vm_entry)
elif request_event.type == RequestType.InitVMMigration:
init_vm_migration(vm_entry, host.key)
elif request_event.type == RequestType.TransferVM:
transfer_vm(request_event)
logging.info(f"Running VMs {running_vms}")
main()