diff --git a/conf/ucloud.conf b/conf/ucloud.conf index 574874e..2c3178a 100644 --- a/conf/ucloud.conf +++ b/conf/ucloud.conf @@ -59,3 +59,8 @@ FILE_PREFIX = noclue-ahmed [ssh] SSH_USERNAME = SSH_PRIVATEKEY = + +# unkown vars: +IMAGE_DIR = +BASE_DIR = +IMAGE_STORE_PREFIX = diff --git a/ucloud/host/main.py.old b/ucloud/host/main.py.old new file mode 100755 index 0000000..ae4c069 --- /dev/null +++ b/ucloud/host/main.py.old @@ -0,0 +1,152 @@ +import argparse +import multiprocessing as mp +import time + +from etcd3_wrapper import Etcd3Wrapper + +from ucloud.common.request import RequestEntry, RequestType +from ucloud.config import (vm_pool, request_pool, + etcd_client, running_vms, + etcd_wrapper_args, etcd_wrapper_kwargs, + HostPool, config) + +from .helper import find_free_port +from . import virtualmachine +from ucloud.host import logger + + +def update_heartbeat(hostname): + """Update Last HeartBeat Time for :param hostname: in etcd""" + client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs) + host_pool = HostPool(client, env_vars.get('HOST_PREFIX')) + this_host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) + + while True: + this_host.update_heartbeat() + host_pool.put(this_host) + time.sleep(10) + + +def maintenance(host): + # To capture vm running according to running_vms list + + # This is to capture successful migration of a VM. + # Suppose, this host is running "vm1" and user initiated + # request to migrate this "vm1" to some other host. On, + # successful migration the destination host would set + # the vm hostname to itself. Thus, we are checking + # whether this host vm is successfully migrated. If yes + # then we shutdown "vm1" on this host. + + to_be_removed = [] + for running_vm in running_vms: + with vm_pool.get_put(running_vm.key) as vm_entry: + if vm_entry.hostname != host.key and not vm_entry.in_migration: + running_vm.handle.shutdown() + logger.info("VM migration not completed successfully.") + to_be_removed.append(running_vm) + + for r in to_be_removed: + running_vms.remove(r) + + # To check vm running according to etcd entries + alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key)) + + for vm_entry in alleged_running_vms: + _vm = virtualmachine.get_vm(running_vms, vm_entry.key) + # Whether, the allegedly running vm is in our + # running_vms list or not if it is said to be + # running on this host but it is not then we + # need to shut it down + + # This is to capture poweroff/shutdown of a VM + # initiated by user inside VM. OR crash of VM by some + # user running process + if (_vm and not _vm.handle.is_running()) or not _vm: + logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running())) + vm_entry.add_log("""{} is not running but is said to be running. + So, shutting it down and declare it killed""".format(vm_entry.key)) + vm_entry.declare_killed() + vm_pool.put(vm_entry) + if _vm: + running_vms.remove(_vm) + +def check(): + if env_vars.get('STORAGE_BACKEND') == 'filesystem' and not isdir(env_vars.get('VM_DIR')): + print("You have set STORAGE_BACKEND to filesystem. So, the vm directory mentioned" + " in .env file must exists. But, it don't.") + sys.exit(1) + + + +def main(hostname): + check() + + heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,)) + + host_pool = HostPool(etcd_client, env_vars.get('HOST_PREFIX')) + host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) + assert host is not None, "No such host with name = {}".format(hostname) + + try: + heartbeat_updating_process.start() + except Exception as e: + logger.info("No Need To Go Further. Our heartbeat updating mechanism is not working") + logger.exception(e) + exit(-1) + + logger.info("%s Session Started %s", '*' * 5, '*' * 5) + + # It is seen that under heavy load, timeout event doesn't come + # in a predictive manner (which is intentional because we give + # higher priority to customer's requests) which delays heart + # beat update which in turn misunderstood by scheduler that the + # host is dead when it is actually alive. So, to ensure that we + # update the heart beat in a predictive manner we start Heart + # beat updating mechanism in separated thread + + for events_iterator in [ + etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True), + etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=10, value_in_json=True), + ]: + for request_event in events_iterator: + request_event = RequestEntry(request_event) + + if request_event.type == "TIMEOUT": + maintenance(host) + continue + + # If the event is directed toward me OR I am destination of a InitVMMigration + if request_event.hostname == host.key or request_event.destination == host.key: + logger.debug("VM Request: %s", request_event) + + request_pool.client.client.delete(request_event.key) + vm_entry = vm_pool.get(request_event.uuid) + + if vm_entry: + if request_event.type == RequestType.StartVM: + virtualmachine.start(vm_entry) + + elif request_event.type == RequestType.StopVM: + virtualmachine.stop(vm_entry) + + elif request_event.type == RequestType.DeleteVM: + virtualmachine.delete(vm_entry) + + elif request_event.type == RequestType.InitVMMigration: + virtualmachine.start(vm_entry, host.key, find_free_port()) + + elif request_event.type == RequestType.TransferVM: + virtualmachine.transfer(request_event) + else: + logger.info("VM Entry missing") + + logger.info("Running VMs %s", running_vms) + + +if __name__ == "__main__": + argparser = argparse.ArgumentParser() + argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1") + args = argparser.parse_args() + mp.set_start_method('spawn') + main(args.hostname) diff --git a/ucloud/scheduler/main.py.old b/ucloud/scheduler/main.py.old new file mode 100755 index 0000000..e2c975a --- /dev/null +++ b/ucloud/scheduler/main.py.old @@ -0,0 +1,93 @@ +# TODO +# 1. send an email to an email address defined by env['admin-email'] +# if resources are finished +# 2. Introduce a status endpoint of the scheduler - +# maybe expose a prometheus compatible output + +from ucloud.common.request import RequestEntry, RequestType +from ucloud.config import etcd_client +from ucloud.config import host_pool, request_pool, vm_pool, env_vars +from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection, + assign_host, NoSuitableHostFound) +from . import logger + + +def main(): + logger.info("%s SESSION STARTED %s", '*' * 5, '*' * 5) + + pending_vms = [] + + for request_iterator in [ + etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True), + etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=5, value_in_json=True), + ]: + for request_event in request_iterator: + request_entry = RequestEntry(request_event) + # Never Run time critical mechanism inside timeout + # mechanism because timeout mechanism only comes + # when no other event is happening. It means under + # heavy load there would not be a timeout event. + if request_entry.type == "TIMEOUT": + + # Detect hosts that are dead and set their status + # to "DEAD", and their VMs' status to "KILLED" + dead_hosts = dead_host_detection() + if dead_hosts: + logger.debug("Dead hosts: %s", dead_hosts) + dead_host_mitigation(dead_hosts) + + # If there are VMs that weren't assigned a host + # because there wasn't a host available which + # meets requirement of that VM then we would + # create a new ScheduleVM request for that VM + # on our behalf. + while pending_vms: + pending_vm_entry = pending_vms.pop() + r = RequestEntry.from_scratch(type="ScheduleVM", + uuid=pending_vm_entry.uuid, + hostname=pending_vm_entry.hostname, + request_prefix=env_vars.get("REQUEST_PREFIX")) + request_pool.put(r) + + elif request_entry.type == RequestType.ScheduleVM: + logger.debug("%s, %s", request_entry.key, request_entry.value) + + vm_entry = vm_pool.get(request_entry.uuid) + if vm_entry is None: + logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid)) + continue + etcd_client.client.delete(request_entry.key) # consume Request + + # If the Request is about a VM which is labelled as "migration" + # and has a destination + if hasattr(request_entry, "migration") and request_entry.migration \ + and hasattr(request_entry, "destination") and request_entry.destination: + try: + get_suitable_host(vm_specs=vm_entry.specs, + hosts=[host_pool.get(request_entry.destination)]) + except NoSuitableHostFound: + logger.info("Requested destination host doesn't have enough capacity" + "to hold %s" % vm_entry.uuid) + else: + r = RequestEntry.from_scratch(type=RequestType.InitVMMigration, + uuid=request_entry.uuid, + destination=request_entry.destination, + request_prefix=env_vars.get("REQUEST_PREFIX")) + request_pool.put(r) + + # If the Request is about a VM that just want to get started/created + else: + # assign_host only returns None when we couldn't be able to assign + # a host to a VM because of resource constraints + try: + assign_host(vm_entry) + except NoSuitableHostFound: + vm_entry.add_log("Can't schedule VM. No Resource Left.") + vm_pool.put(vm_entry) + + pending_vms.append(vm_entry) + logger.info("No Resource Left. Emailing admin....") + + +if __name__ == "__main__": + main()