* Refactoring

* Fix issue that causes a new image store to be created at every start of ucloud-api.
* VM Migration API call now takes hostname instead of host key.
* StorageHandler Classes are introduced. They transparently handles things related to importing of image, make vm out of image, resize vm image, delete vm image etc.
* Loggers added to __init__.py of every ucloud component's subpackage.
* Non-Trivial Timeout Events are no longer logged.
* Fix issue that prevents removal of stopped VMs (i.e VMs that are successfully migrated).
* Improved unit handling added. e.g MB, Mb, mB, mb are all Mega Bytes.
* VM migration is now possible on IPv6 host.
* Destination VM (receiving side of migration of a vm) now correctly expects incoming data on free ephemeral port.
* Traceback is no longer output to screen, instead it goes to log file.
* All sanity checks are put into a single file. These checks are run by ucloud.py before running any of ucloud component.
This commit is contained in:
ahmadbilalkhalid 2019-11-25 11:52:36 +05:00
commit cc0ca68498
26 changed files with 1101 additions and 294 deletions

13
host/helper.py Normal file
View file

@ -0,0 +1,13 @@
import socket
from contextlib import closing
def find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
try:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except Exception:
return None
else:
return s.getsockname()[1]

View file

@ -1,6 +1,5 @@
import argparse
import multiprocessing as mp
import os
import time
from etcd3_wrapper import Etcd3Wrapper
@ -10,13 +9,17 @@ from config import (vm_pool, request_pool,
etcd_client, running_vms,
etcd_wrapper_args, etcd_wrapper_kwargs,
HostPool, env_vars)
from .helper import find_free_port
from . import virtualmachine
from host import logger
def update_heartbeat(host):
def update_heartbeat(hostname):
"""Update Last HeartBeat Time for :param hostname: in etcd"""
client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs)
host_pool = HostPool(client, env_vars.get('HOST_PREFIX'))
this_host = next(filter(lambda h: h.hostname == host, host_pool.hosts), None)
this_host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
while True:
this_host.update_heartbeat()
@ -35,17 +38,22 @@ def maintenance(host):
# whether this host vm is successfully migrated. If yes
# then we shutdown "vm1" on this host.
to_be_removed = []
for running_vm in running_vms:
with vm_pool.get_put(running_vm.key) as vm_entry:
if vm_entry.hostname != host.key and not vm_entry.in_migration:
running_vm.handle.shutdown()
vm_entry.add_log("VM on source host shutdown.")
logger.info("VM migration not completed successfully.")
to_be_removed.append(running_vm)
for r in to_be_removed:
running_vms.remove(r)
# To check vm running according to etcd entries
alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key))
for vm_entry in alleged_running_vms:
_vm = virtualmachine.get_vm(running_vms, vm_entry.key)
# Whether, the allegedly running vm is in our
# running_vms list or not if it is said to be
# running on this host but it is not then we
@ -64,10 +72,6 @@ def maintenance(host):
def main(hostname):
assert env_vars.get('WITHOUT_CEPH') and os.path.isdir(env_vars.get('VM_DIR')), (
"You have set env_vars.get('WITHOUT_CEPH') to True. So, the vm directory mentioned"
" in .env file must exists. But, it don't.")
heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,))
host_pool = HostPool(etcd_client, env_vars.get('HOST_PREFIX'))
@ -99,7 +103,6 @@ def main(hostname):
request_event = RequestEntry(request_event)
if request_event.type == "TIMEOUT":
logger.info("Timeout Event")
maintenance(host)
continue
@ -121,7 +124,7 @@ def main(hostname):
virtualmachine.delete(vm_entry)
elif request_event.type == RequestType.InitVMMigration:
virtualmachine.init_migration(vm_entry, host.key)
virtualmachine.start(vm_entry, host.key, find_free_port())
elif request_event.type == RequestType.TransferVM:
virtualmachine.transfer(request_event)

View file

@ -304,6 +304,7 @@ class QEMUMachine(object):
LOG.debug('Command: %r', ' '.join(self._qemu_full_args))
if self._iolog:
LOG.debug('Output: %r', self._iolog)
raise Exception(self._iolog)
raise
def _launch(self):

View file

@ -4,27 +4,28 @@
# For QEMU Monitor Protocol Commands Information, See
# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor
import errno
import os
import random
import subprocess as sp
import tempfile
import time
from functools import wraps
from os.path import join
from os.path import join as join_path
from string import Template
from typing import Union
import bitmath
import sshtunnel
from common.helpers import get_ipv4_address
from common.helpers import get_ipv6_address
from common.request import RequestEntry, RequestType
from common.vm import VMEntry, VMStatus
from config import etcd_client, request_pool, running_vms, vm_pool, env_vars
from config import etcd_client, request_pool, running_vms, vm_pool, env_vars, image_storage_handler
from . import qmp
from host import logger
class VM:
def __init__(self, key, handle, vnc_socket_file):
self.key = key # type: str
@ -106,24 +107,16 @@ def update_radvd_conf(etcd_client):
sp.check_output(['systemctl', 'restart', 'radvd'])
def get_start_command_args(
vm_entry, vnc_sock_filename: str, migration=False, migration_port=4444,
):
def get_start_command_args(vm_entry, vnc_sock_filename: str, migration=False, migration_port=None):
threads_per_core = 1
vm_memory = int(bitmath.parse_string(vm_entry.specs["ram"]).to_MB())
vm_memory = int(bitmath.parse_string_unsafe(vm_entry.specs["ram"]).to_MB())
vm_cpus = int(vm_entry.specs["cpu"])
vm_uuid = vm_entry.uuid
vm_networks = vm_entry.network
if env_vars.get('WITHOUT_CEPH'):
command = "-drive file={},format=raw,if=virtio,cache=none".format(
os.path.join(env_vars.get('VM_DIR'), vm_uuid)
)
else:
command = "-drive file=rbd:uservms/{},format=raw,if=virtio,cache=none".format(
vm_uuid
)
command = "-drive file={},format=raw,if=virtio,cache=none".format(
image_storage_handler.qemu_path_string(vm_uuid)
)
command += " -device virtio-rng-pci -vnc unix:{}".format(vnc_sock_filename)
command += " -m {} -smp cores={},threads={}".format(
vm_memory, vm_cpus, threads_per_core
@ -131,7 +124,7 @@ def get_start_command_args(
command += " -name {}".format(vm_uuid)
if migration:
command += " -incoming tcp:0:{}".format(migration_port)
command += " -incoming tcp:[::]:{}".format(migration_port)
tap = None
for network_and_mac in vm_networks:
@ -154,7 +147,7 @@ def get_start_command_args(
return command.split(" ")
def create_vm_object(vm_entry, migration=False, migration_port=4444):
def create_vm_object(vm_entry, migration=False, migration_port=None):
# NOTE: If migration suddenly stop working, having different
# VNC unix filename on source and destination host can
# be a possible cause of it.
@ -198,61 +191,19 @@ def need_running_vm(func):
def create(vm_entry: VMEntry):
vm_hdd = int(bitmath.parse_string(vm_entry.specs["os-ssd"]).to_MB())
if env_vars.get('WITHOUT_CEPH'):
_command_to_create = [
"cp",
os.path.join(env_vars.get('IMAGE_DIR'), vm_entry.image_uuid),
os.path.join(env_vars.get('VM_DIR'), vm_entry.uuid),
]
_command_to_extend = [
"qemu-img",
"resize",
"-f", "raw",
os.path.join(env_vars.get('VM_DIR'), vm_entry.uuid),
"{}M".format(vm_hdd),
]
if image_storage_handler.is_vm_image_exists(vm_entry.uuid):
# File Already exists. No Problem Continue
logger.debug("Image for vm %s exists", vm_entry.uuid)
else:
_command_to_create = [
"rbd",
"clone",
"images/{}@protected".format(vm_entry.image_uuid),
"uservms/{}".format(vm_entry.uuid),
]
_command_to_extend = [
"rbd",
"resize",
"uservms/{}".format(vm_entry.uuid),
"--size",
vm_hdd,
]
try:
sp.check_output(_command_to_create)
except sp.CalledProcessError as e:
if e.returncode == errno.EEXIST:
logger.debug("Image for vm %s exists", vm_entry.uuid)
# File Already exists. No Problem Continue
return
# This exception catches all other exceptions
# i.e FileNotFound (BaseImage), pool Does Not Exists etc.
logger.exception(e)
vm_entry.status = "ERROR"
else:
try:
sp.check_output(_command_to_extend)
except Exception as e:
logger.exception(e)
else:
logger.info("New VM Created")
vm_hdd = int(bitmath.parse_string_unsafe(vm_entry.specs["os-ssd"]).to_MB())
if image_storage_handler.make_vm_image(src=vm_entry.image_uuid, dest=vm_entry.uuid):
if not image_storage_handler.resize_vm_image(path=vm_entry.uuid, size=vm_hdd):
vm_entry.status = "ERROR"
else:
logger.info("New VM Created")
def start(vm_entry: VMEntry):
def start(vm_entry: VMEntry, destination_host_key=None, migration_port=None):
_vm = get_vm(running_vms, vm_entry.key)
# VM already running. No need to proceed further.
@ -260,8 +211,12 @@ def start(vm_entry: VMEntry):
logger.info("VM %s already running", vm_entry.uuid)
return
else:
create(vm_entry)
launch_vm(vm_entry)
if destination_host_key:
launch_vm(vm_entry, migration=True, migration_port=migration_port,
destination_host_key=destination_host_key)
else:
create(vm_entry)
launch_vm(vm_entry)
@need_running_vm
@ -278,18 +233,9 @@ def stop(vm_entry):
def delete(vm_entry):
logger.info("Deleting VM | %s", vm_entry)
stop(vm_entry)
path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1:]
if env_vars.get('WITHOUT_CEPH'):
vm_deletion_command = ["rm", os.path.join(env_vars.get('VM_DIR'), vm_entry.uuid)]
else:
vm_deletion_command = ["rbd", "rm", path_without_protocol]
try:
sp.check_output(vm_deletion_command)
except Exception as e:
logger.exception(e)
else:
r_status = image_storage_handler.delete_vm_image(vm_entry.uuid)
if r_status:
etcd_client.client.delete(vm_entry.key)
@ -301,15 +247,16 @@ def transfer(request_event):
_host, _port = request_event.parameters["host"], request_event.parameters["port"]
_uuid = request_event.uuid
_destination = request_event.destination_host_key
vm = get_vm(running_vms, join(env_vars.get('VM_PREFIX'), _uuid))
vm = get_vm(running_vms, join_path(env_vars.get('VM_PREFIX'), _uuid))
if vm:
tunnel = sshtunnel.SSHTunnelForwarder(
(_host, 22),
_host,
ssh_username=env_vars.get("ssh_username"),
ssh_pkey=env_vars.get("ssh_pkey"),
ssh_private_key_password=env_vars.get("ssh_private_key_password"),
remote_bind_address=("127.0.0.1", _port),
ssh_proxy_enabled=True,
ssh_proxy=(_host, 22)
)
try:
tunnel.start()
@ -317,7 +264,7 @@ def transfer(request_event):
logger.exception("Couldn't establish connection to (%s, 22)", _host)
else:
vm.handle.command(
"migrate", uri="tcp:{}:{}".format(_host, tunnel.local_bind_port)
"migrate", uri="tcp:0.0.0.0:{}".format(tunnel.local_bind_port)
)
status = vm.handle.command("query-migrate")["status"]
@ -340,38 +287,22 @@ def transfer(request_event):
tunnel.close()
def init_migration(vm_entry, destination_host_key):
# This function would run on destination host i.e host on which the vm
# would be transferred after migration.
# This host would be responsible for starting VM that would receive
# state of VM running on source host.
_vm = get_vm(running_vms, vm_entry.key)
if _vm:
# VM already running. No need to proceed further.
logger.info("%s Already running", _vm.key)
return
launch_vm(vm_entry, migration=True, migration_port=4444,
destination_host_key=destination_host_key)
def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_key=None):
logger.info("Starting %s", vm_entry.key)
vm = create_vm_object(vm_entry, migration=migration, migration_port=migration_port)
try:
vm.handle.launch()
except Exception as e:
logger.exception(e)
except Exception:
logger.exception("Error Occured while starting VM")
vm.handle.shutdown()
if migration:
# We don't care whether MachineError or any other error occurred
vm.handle.shutdown()
pass
else:
# Error during typical launch of a vm
vm_entry.add_log("Error Occurred while starting VM")
vm.handle.shutdown()
vm_entry.declare_killed()
vm_pool.put(vm_entry)
else:
@ -383,7 +314,7 @@ def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_k
r = RequestEntry.from_scratch(
type=RequestType.TransferVM,
hostname=vm_entry.hostname,
parameters={"host": get_ipv4_address(), "port": 4444},
parameters={"host": get_ipv6_address(), "port": migration_port},
uuid=vm_entry.uuid,
destination_host_key=destination_host_key,
request_prefix=env_vars.get("REQUEST_PREFIX")