From 726839f4c6fda461a61e4b32f81c60a3ac647217 Mon Sep 17 00:00:00 2001 From: Ahmed Bilal Khalid Date: Mon, 12 Aug 2019 17:38:34 +0500 Subject: [PATCH] Request mechanism rolled out, migration mechanism is also incorporated --- helper.py | 117 ++++++++++++++++++++++++++++++++++++ main.py | 176 +++++++++++++++++------------------------------------- 2 files changed, 171 insertions(+), 122 deletions(-) create mode 100644 helper.py diff --git a/helper.py b/helper.py new file mode 100644 index 0000000..2dc2dff --- /dev/null +++ b/helper.py @@ -0,0 +1,117 @@ +from collections import Counter +from functools import reduce + +from ucloud_common.vm import VmPool, VMStatus +from ucloud_common.host import HostPool, HostStatus +from ucloud_common.request import RequestEntry, RequestPool, RequestType + +from etcd3_wrapper import Etcd3Wrapper +from decouple import config + +client = Etcd3Wrapper( + host=config("ETCD_HOST"), port=int(config("ETCD_PORT")) + ) +vm_pool = VmPool(client, config("VM_PREFIX")) +host_pool = HostPool(client, config("HOST_PREFIX")) +request_pool = RequestPool(client, config("REQUEST_PREFIX")) + + +def accumulated_specs(vms_specs): + if not vms_specs: + return {} + return reduce((lambda x, y: Counter(x) + Counter(y)), vms_specs) + + +def remaining_resources(host_specs, vms_specs): + """Return remaining resources host_specs - vms""" + vms_specs = Counter(vms_specs) + remaining = Counter(host_specs) + remaining.subtract(vms_specs) + + return remaining + + +def get_suitable_host(vm_specs, hosts=None): + if hosts is None: + hosts = host_pool.by_status(HostStatus.alive) + + for host in hosts: + # Filter them by host_name + vms = vm_pool.by_host(host.key) + + # Filter them by status + vms = vm_pool.except_status(VMStatus.requested_new, vms) + + running_vms_specs = [vm.specs for vm in vms] + # Accumulate all of their combined specs + running_vms_accumulated_specs = accumulated_specs( + running_vms_specs + ) + + # Find out remaining resources after + # host_specs - already running vm_specs + remaining = remaining_resources( + host.specs, running_vms_accumulated_specs + ) + + # Find out remaining - new_vm_specs + remaining = remaining_resources(remaining, vm_specs) + # if remaining resources >= 0 return this host_name + if all( + map( + lambda x: True if remaining[x] >= 0 else False, + remaining, + ) + ): + return host.key + + return None + + +def dead_host_detection(): + # Bring out your dead! - Monty Python and the Holy Grail + hosts = host_pool.by_status(HostStatus.alive) + dead_hosts_keys = [] + + for host in hosts: + # Only check those who claims to be alive + if host.status == HostStatus.alive: + if not host.is_alive(): + dead_hosts_keys.append(host.key) + + return dead_hosts_keys + + +def dead_host_mitigation(dead_hosts_keys): + for host_key in dead_hosts_keys: + host = host_pool.get(host_key) + host.declare_dead() + + vms_hosted_on_dead_host = vm_pool.by_host(host_key) + for vm in vms_hosted_on_dead_host: + vm.declare_killed() + vm_pool.put(vm) + host_pool.put(host) + + +def assign_host(vm): + host_name = get_suitable_host(vm.specs) + if host_name: + # if vm.status == VMStatus.requested_new: + # vm.status = VMStatus.scheduled_deploy + # + # if vm.status == VMStatus.killed: + # vm.status = VMStatus.requested_start + + vm.hostname = host_name + vm_pool.put(vm) + + r = RequestEntry.from_scratch(type=RequestType.StartVM, + uuid=vm.uuid, + hostname=vm.hostname) + request_pool.put(r) + + vm.log.append("VM scheduled for starting") + + return host_name + return None diff --git a/main.py b/main.py index 6b1b03b..79fbe4b 100644 --- a/main.py +++ b/main.py @@ -8,12 +8,14 @@ import argparse import logging from decouple import config -from collections import Counter -from functools import reduce + from etcd3_wrapper import Etcd3Wrapper -from ucloud_common.vm import VmPool, VMEntry, VMStatus -from ucloud_common.host import HostPool, HostStatus +from ucloud_common.vm import VmPool +from ucloud_common.host import HostPool +from ucloud_common.request import RequestEntry, RequestPool, RequestType +from helper import (get_suitable_host, dead_host_mitigation, dead_host_detection, + assign_host) logging.basicConfig( level=logging.DEBUG, @@ -23,143 +25,70 @@ logging.basicConfig( datefmt="%d-%b-%y %H:%M:%S", ) -VM_POOL = None -HOST_POOL = None - -def accumulated_specs(vms_specs): - if not vms_specs: - return {} - return reduce((lambda x, y: Counter(x) + Counter(y)), vms_specs) - - -def remaining_resources(host_specs, vms_specs): - """Return remaining resources host_specs - vms""" - vms_specs = Counter(vms_specs) - remaining = Counter(host_specs) - remaining.subtract(vms_specs) - - return remaining - - -def get_suitable_host(vm_specs): - hosts = HOST_POOL.by_status(HostStatus.alive) - - for host in hosts: - # Filter them by host_name - vms = VM_POOL.by_host(host.key) - - # Filter them by status - vms = VM_POOL.except_status(VMStatus.requested_new, vms) - - running_vms_specs = [vm.specs for vm in vms] - # Accumulate all of their combined specs - running_vms_accumulated_specs = accumulated_specs( - running_vms_specs - ) - - # Find out remaining resources after - # host_specs - already running vm_specs - remaining = remaining_resources( - host.specs, running_vms_accumulated_specs - ) - - # Find out remaining - new_vm_specs - remaining = remaining_resources(remaining, vm_specs) - # if remaining resources >= 0 return this host_name - if all( - map( - lambda x: True if remaining[x] >= 0 else False, - remaining, - ) - ): - return host.key - - return None - - -def dead_host_detection(): - # Bring out your dead! - Monty Python and the Holy Grail - hosts = HOST_POOL.by_status(HostStatus.alive) - dead_hosts_keys = [] - - for host in hosts: - # Only check those who claims to be alive - if host.status == HostStatus.alive: - if not host.is_alive(): - dead_hosts_keys.append(host.key) - - return dead_hosts_keys - - -def dead_host_mitigation(dead_hosts_keys): - for host_key in dead_hosts_keys: - host = HOST_POOL.get(host_key) - host.declare_dead() - - vms_hosted_on_dead_host = VM_POOL.by_host(host_key) - for vm in vms_hosted_on_dead_host: - vm.declare_killed() - VM_POOL.put(vm) - HOST_POOL.put(host) - - -def assign_host(vm): - host_name = get_suitable_host(vm.specs) - if host_name: - if vm.status == VMStatus.requested_new: - vm.status = VMStatus.scheduled_deploy - - if vm.status == VMStatus.killed: - vm.status = VMStatus.requested_start - - vm.hostname = host_name - VM_POOL.put(vm) - - return host_name - return None - - -def main(vm_prefix, host_prefix): +def main(vm_prefix, host_prefix, request_prefix): logging.info(f"{'*' * 5} SESSION STARTED {'*' * 5}") client = Etcd3Wrapper( host=config("ETCD_HOST"), port=int(config("ETCD_PORT")) ) - global VM_POOL, HOST_POOL - VM_POOL = VmPool(client, vm_prefix) - HOST_POOL = HostPool(client, host_prefix) + vm_pool = VmPool(client, vm_prefix) + host_pool = HostPool(client, host_prefix) + request_pool = RequestPool(client, request_prefix) PENDING_VMS = [] for events_iterator in [ - client.get_prefix(vm_prefix, value_in_json=True), - client.watch_prefix(vm_prefix, timeout=5, value_in_json=True), + client.get_prefix(request_prefix, value_in_json=True), + client.watch_prefix(request_prefix, timeout=5, value_in_json=True), ]: for e in events_iterator: - e = VMEntry(e) + if not e.value: + continue + e = RequestEntry(e) logging.debug(f"{e.key}, {e.value}") # Never Run time critical mechanism inside timeout # mechanism because timeout mechanism only comes # when no other event is happening. It means under - # heavy load there would not be a timeout - if e.status == "TIMEOUT": + # heavy load there would not be a timeout< + if e.type == "TIMEOUT": + logging.debug("TIMEOUT event occured") dead_hosts = dead_host_detection() logging.debug(f"Dead hosts: {dead_hosts}") dead_host_mitigation(dead_hosts) + # + # vm_scheduled = [] + # for vm in PENDING_VMS: + # if assign_host(vm) is not None: + # vm_scheduled.append(vm) + # + # for vm in vm_scheduled: + # PENDING_VMS.remove(vm) + # logging.debug(f"Remaining Pending: {PENDING_VMS}") - vm_scheduled = [] - for vm in PENDING_VMS: - if assign_host(vm) is not None: - vm_scheduled.append(vm) + elif e.type == RequestType.ScheduleVM: + if hasattr(e, "migration") and e.migration and\ + hasattr(e, "destination") and e.destination: + client.client.delete(e.key) + vm = vm_pool.get(e.uuid) + host = get_suitable_host(vm.specs, [host_pool.get(e.destination)]) + if host: + r = RequestEntry.from_scratch(type=RequestType.InitVMMigration, + uuid=e.uuid, destination=e.destination) + request_pool.put(r) + print(host, e) + else: + logging.info("Requested destination host doesn't have enough capacity" + f"to hold {vm.uuid}") + else: + client.client.delete(e.key) + vm = vm_pool.get(e.uuid) + if assign_host(vm) is None: + vm.log.append("Can't schedule VM. No Resource Left.") + vm_pool.put(vm) - for vm in vm_scheduled: - PENDING_VMS.remove(vm) - logging.debug(f"Remaining Pending: {PENDING_VMS}") - elif e.status in [VMStatus.requested_new, VMStatus.killed]: - if assign_host(e) is None: - PENDING_VMS.append(e) - logging.info("No Resource Left. Emailing admin....") - logging.debug(f"Pending VMS: {PENDING_VMS}") + PENDING_VMS.append(vm) + logging.info("No Resource Left. Emailing admin....") + logging.debug(f"Pending VMS: {PENDING_VMS}") if __name__ == "__main__": @@ -170,6 +99,9 @@ if __name__ == "__main__": argparser.add_argument( "--host_prefix", required=False, default=config("HOST_PREFIX") ) + argparser.add_argument( + "--request_prefix", required=False, default=config("REQUEST_PREFIX") + ) args = argparser.parse_args() - main(args.vm_prefix, args.host_prefix) + main(args.vm_prefix, args.host_prefix, args.request_prefix)