ucloud-vm/main.py

385 lines
13 KiB
Python
Raw Normal View History

# TODO: Use Unix File Socket for VNC instead of TCP
# QEMU Manual
# https://qemu.weilnetz.de/doc/qemu-doc.html
2019-07-04 05:19:40 +00:00
# For QEMU Monitor Protocol Commands Information, See
# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor
2019-07-03 13:02:21 +00:00
2019-07-04 05:19:40 +00:00
import argparse
2019-07-03 13:02:21 +00:00
import qmp
2019-07-11 08:31:46 +00:00
import logging
import subprocess
2019-07-27 09:05:35 +00:00
import threading
import time
import traceback
import sshtunnel
import errno
2019-07-03 13:02:21 +00:00
2019-07-11 08:31:46 +00:00
from dataclasses import dataclass
from typing import Union
from functools import wraps
from string import Template
from os.path import join
2019-07-27 09:05:35 +00:00
from etcd3_wrapper import Etcd3Wrapper
from ucloud_common.vm import VmPool, VMStatus, VMEntry
2019-07-27 09:05:35 +00:00
from ucloud_common.host import HostPool, HostEntry
from ucloud_common.request import RequestEntry, RequestPool, RequestType
from ucloud_common.helpers import get_ipv4_address
2019-07-03 13:02:21 +00:00
2019-07-11 08:31:46 +00:00
running_vms = []
2019-07-22 07:12:44 +00:00
vnc_port_pool = list(range(0, 100))
2019-07-11 08:31:46 +00:00
client = Etcd3Wrapper()
vm_pool = None
host_pool = None
request_pool = None
2019-07-11 08:31:46 +00:00
logging.basicConfig(
level=logging.DEBUG,
filename="log.txt",
filemode="a",
format="%(asctime)s: %(levelname)s - %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
)
def get_start_command_args(image_path, vnc_port, migration=False, migration_port=4444):
_args = ("-drive file=$image_path,format=raw,if=virtio,cache=none"
" -m 1024 -device virtio-rng-pci -enable-kvm -vnc :$vnc_port")
if migration:
_args = _args + " -incoming tcp:0:$migration_port"
args_template = Template(_args)
if migration:
args = args_template.substitute(image_path=image_path, vnc_port=vnc_port,
migration_port=migration_port)
else:
args = args_template.substitute(image_path=image_path, vnc_port=vnc_port)
return args.split(" ")
2019-07-27 09:05:35 +00:00
@dataclass
class VM:
key: str
handle: qmp.QEMUMachine
def __repr__(self):
return f"VM({self.key})"
2019-07-11 08:31:46 +00:00
2019-07-27 09:05:35 +00:00
def update_heartbeat(host: HostEntry):
while True:
host.update_heartbeat()
host_pool.put(host)
2019-07-27 09:05:35 +00:00
time.sleep(10)
2019-07-27 09:05:35 +00:00
logging.info(f"Updated last heartbeat time {host.last_heartbeat}")
2019-07-11 08:31:46 +00:00
def need_running_vm(func):
@wraps(func)
def wrapper(e):
vm = get_vm(running_vms, e.key)
if vm:
try:
status = vm.handle.command("query-status")
2019-07-11 08:31:46 +00:00
logging.debug(f"VM Status Check - {status}")
except OSError:
2019-07-27 09:05:35 +00:00
logging.info(
f"{func.__name__} failed - VM {e.key} - Unknown Error"
)
2019-07-03 13:02:21 +00:00
2019-07-11 08:31:46 +00:00
return func(e)
else:
2019-07-27 09:05:35 +00:00
logging.info(
f"{func.__name__} failed because VM {e.key} is not running"
)
2019-07-11 08:31:46 +00:00
return
2019-07-27 09:05:35 +00:00
2019-07-11 08:31:46 +00:00
return wrapper
def create_vm(vm_entry: VMEntry):
_command_to_create = f"rbd clone images/{vm_entry.image_uuid}@protected uservms/{vm_entry.uuid}"
try:
subprocess.check_call(_command_to_create.split(" "))
except subprocess.CalledProcessError as e:
if e.returncode == errno.EEXIST:
logging.debug(f"Image for vm {vm_entry.uuid} exists")
# File Already exists. No Problem Continue
return
else:
# This exception catches all other exceptions
# i.e FileNotFound (BaseImage), pool Does Not Exists etc.
logging.exception(f"Can't clone image - {e}")
else:
logging.info("New VM Created")
2019-07-11 08:31:46 +00:00
def start_vm(vm_entry: VMEntry):
_vm = get_vm(running_vms, vm_entry.key)
# VM already running. No need to proceed further.
2019-07-11 08:31:46 +00:00
if _vm:
logging.info(f"VM {vm_entry.uuid} already running")
2019-07-11 08:31:46 +00:00
return
else:
create_vm(vm_entry)
2019-07-11 08:31:46 +00:00
logging.info(f"Starting {vm_entry.key}")
2019-07-11 08:31:46 +00:00
# FIXME: There should be better vnc port allocation scheme
vm = qmp.QEMUMachine(
"/usr/bin/qemu-system-x86_64",
args=get_start_command_args(vm_entry.path, vnc_port_pool.pop(0)),
)
try:
vm.launch()
except (qmp.QEMUMachineError, TypeError, Exception):
vm_entry.declare_killed()
vm_entry.add_log(f"Machine Error occurred - {traceback.format_exc()}")
vm_pool.put(vm_entry)
else:
running_vms.append(VM(vm_entry.key, vm))
vm_entry.status = VMStatus.running
vm_entry.add_log("Started successfully")
vm_pool.put(vm_entry)
2019-07-11 08:31:46 +00:00
@need_running_vm
def stop_vm(vm_entry):
vm = get_vm(running_vms, vm_entry.key)
vm.handle.shutdown()
if not vm.handle.is_running():
vm_entry.add_log("Shutdown successfully")
vm_entry.declare_stopped()
vm_pool.put(vm_entry)
2019-07-11 08:31:46 +00:00
running_vms.remove(vm)
def delete_vm(vm_entry):
logging.info(f"Deleting VM {vm_entry}")
stop_vm(vm_entry)
path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1:]
try:
rc = subprocess.call(f"rbd rm {path_without_protocol}".split(" "))
except FileNotFoundError as e:
logging.exception(e)
except Exception as e:
logging.exception(f"Unknown error occurred - {e}")
else:
if rc == 0:
client.client.delete(vm_entry.key)
else:
logging.info("Some unknown problem occur while deleting vm file")
2019-07-03 13:02:21 +00:00
2019-07-11 08:31:46 +00:00
def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
return next((vm for vm in vm_list if vm.key == vm_key), None)
def maintenance(host):
# To capture vm running according to running_vms list
# This is to capture successful migration of a VM.
# Suppose, this host is running "vm1" and user initiated
# request to migrate this "vm1" to some other host. On,
# successful migration the destination host would set
# the vm hostname to itself. Thus, we are checking
# whether this host vm is successfully migrated. If yes
# then we shutdown "vm1" on this host.
for vm in running_vms:
with vm_pool.get_put(vm.key) as vm_entry:
if vm_entry.hostname != host.key and not vm_entry.in_migration:
vm.handle.shutdown()
vm_entry.add_log("VM on source host shutdown.")
# To check vm running according to etcd entries
alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key))
for vm_entry in alleged_running_vms:
_vm = get_vm(running_vms, vm_entry.key)
# Whether, the allegedly running vm is in our
# running_vms list or not if it is said to be
# running on this host but it is not then we
# need to shut it down
# This is to capture poweroff/shutdown of a VM
# initiated by user inside VM. OR crash of VM by some
# user running process
if (_vm and not _vm.handle.is_running())\
or not _vm:
vm_entry.add_log(f"{vm_entry.key} is not running but is said to be running."
"So, shutting it down and declare it killed")
vm_entry.declare_killed()
vm_pool.put(vm_entry)
if _vm:
running_vms.remove(_vm)
def transfer_vm(request_event):
# This function would run on source host i.e host on which the vm
# is running initially. This host would be responsible for transferring
# vm state to destination host.
_host, _port = request_event.parameters["host"], request_event.parameters["port"]
_uuid = request_event.uuid
_destination = request_event.destination_host_key
vm = get_vm(running_vms, join("/v1/vm", _uuid))
if vm:
tunnel = sshtunnel.SSHTunnelForwarder(
(_host, 22),
ssh_username="meow",
ssh_pkey="~/.ssh/id_rsa",
ssh_private_key_password="***REMOVED***",
remote_bind_address=('127.0.0.1', _port),
)
try:
tunnel.start()
except sshtunnel.BaseSSHTunnelForwarderError:
logging.exception(f"Couldn't establish connection to ({_host}, 22)")
else:
vm.handle.command("migrate", uri=f"tcp:{_host}:{tunnel.local_bind_port}")
status = vm.handle.command("query-migrate")["status"]
while status not in ["failed", "completed"]:
time.sleep(2)
status = vm.handle.command("query-migrate")["status"]
with vm_pool.get_put(request_event.uuid) as source_vm:
if status == "failed":
source_vm.add_log("Migration Failed")
elif status == "completed":
# If VM is successfully migrated then shutdown the VM
# on this host and update hostname to destination host key
source_vm.add_log("Successfully migrated")
source_vm.hostname = _destination
running_vms.remove(vm)
vm.handle.shutdown()
source_vm.in_migration = False # VM transfer finished
finally:
tunnel.close()
def init_vm_migration(vm_entry, destination_host_key):
# This function would run on destination host i.e host on which the vm
# would be transferred after migration. This host would be responsible
# for starting VM that would receive state of VM running on source host.
_vm = get_vm(running_vms, vm_entry.key)
2019-07-27 09:05:35 +00:00
if _vm:
# VM already running. No need to proceed further.
logging.log(f"{_vm.key} Already running")
return
logging.info(f"Starting {vm_entry.key}")
# FIXME: There should be better vnc port allocation scheme
actual_vm = qmp.QEMUMachine(
"/usr/bin/qemu-system-x86_64",
args=get_start_command_args(vm_entry.path, 100, migration=True, migration_port=4444),
)
try:
actual_vm.launch()
except Exception as e:
# We don't care whether MachineError or any other error occurred
logging.exception(e)
actual_vm.shutdown()
else:
vm_entry.in_migration = True
vm_pool.put(vm_entry)
running_vms.append(VM(vm_entry.key, actual_vm))
r = RequestEntry.from_scratch(type=RequestType.TransferVM,
hostname=vm_entry.hostname,
parameters={
"host": get_ipv4_address(),
"port": 4444,
},
uuid=vm_entry.uuid,
destination_host_key=destination_host_key
)
request_pool.put(r)
def main():
argparser = argparse.ArgumentParser()
argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1")
args = argparser.parse_args()
global host_pool, vm_pool, request_pool
host_pool = HostPool(client, "/v1/host")
vm_pool = VmPool(client, "/v1/vm")
request_pool = RequestPool(client, "/v1/request")
2019-07-27 09:05:35 +00:00
host = host_pool.get(args.hostname)
if not host:
print("No Such Host")
exit(1)
logging.info(f"{'*' * 5} Session Started {'*' * 5}")
2019-07-27 09:05:35 +00:00
# It is seen that under heavy load, timeout event doesn't come
# in a predictive manner which delays heart beat update which
# in turn misunderstood by scheduler that the host is dead
# when it is actually alive. So, to ensure that we update the
# heart beat in a predictive manner we start Heart beat updating
# mechanism in separated thread
2019-07-27 09:05:35 +00:00
heartbeat_updating_thread = threading.Thread(target=update_heartbeat, args=(host,))
try:
heartbeat_updating_thread.start()
except Exception as e:
print("No Need To Go Further. Our heartbeat updating mechanism is not working")
logging.exception(e)
exit(-1)
2019-07-27 09:05:35 +00:00
for events_iterator in [
client.get_prefix("/v1/request/", value_in_json=True),
client.watch_prefix("/v1/request/", timeout=10, value_in_json=True),
2019-07-27 09:05:35 +00:00
]:
for request_event in events_iterator:
request_event = RequestEntry(request_event)
if request_event.type == "TIMEOUT":
logging.info("Timeout Event")
maintenance(host)
continue
# If the event is directed toward me OR I am destination of a InitVMMigration
if hasattr(request_event, "hostname") and request_event.hostname == host.key or\
hasattr(request_event, "destination") and request_event.destination == host.key:
request_pool.client.client.delete(request_event.key)
vm_entry = vm_pool.get(request_event.uuid)
logging.debug(f"EVENT: {request_event}")
if request_event.type == RequestType.StartVM:
start_vm(vm_entry)
elif request_event.type == RequestType.StopVM:
stop_vm(vm_entry)
elif request_event.type == RequestType.DeleteVM:
delete_vm(vm_entry)
elif request_event.type == RequestType.InitVMMigration:
init_vm_migration(vm_entry, host.key)
2019-07-03 13:02:21 +00:00
elif request_event.type == RequestType.TransferVM:
transfer_vm(request_event)
2019-07-11 08:31:46 +00:00
logging.info(f"Running VMs {running_vms}")
2019-07-03 13:02:21 +00:00
main()