import argparse import multiprocessing as mp import time from etcd3_wrapper import Etcd3Wrapper from ucloud.common.request import RequestEntry, RequestType from ucloud.config import (vm_pool, request_pool, etcd_client, running_vms, etcd_wrapper_args, etcd_wrapper_kwargs, HostPool, env_vars) from .helper import find_free_port from . import virtualmachine from ucloud.host import logger def update_heartbeat(hostname): """Update Last HeartBeat Time for :param hostname: in etcd""" client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs) host_pool = HostPool(client, env_vars.get('HOST_PREFIX')) this_host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) while True: this_host.update_heartbeat() host_pool.put(this_host) time.sleep(10) def maintenance(host): # To capture vm running according to running_vms list # This is to capture successful migration of a VM. # Suppose, this host is running "vm1" and user initiated # request to migrate this "vm1" to some other host. On, # successful migration the destination host would set # the vm hostname to itself. Thus, we are checking # whether this host vm is successfully migrated. If yes # then we shutdown "vm1" on this host. to_be_removed = [] for running_vm in running_vms: with vm_pool.get_put(running_vm.key) as vm_entry: if vm_entry.hostname != host.key and not vm_entry.in_migration: running_vm.handle.shutdown() logger.info("VM migration not completed successfully.") to_be_removed.append(running_vm) for r in to_be_removed: running_vms.remove(r) # To check vm running according to etcd entries alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key)) for vm_entry in alleged_running_vms: _vm = virtualmachine.get_vm(running_vms, vm_entry.key) # Whether, the allegedly running vm is in our # running_vms list or not if it is said to be # running on this host but it is not then we # need to shut it down # This is to capture poweroff/shutdown of a VM # initiated by user inside VM. OR crash of VM by some # user running process if (_vm and not _vm.handle.is_running()) or not _vm: logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running())) vm_entry.add_log("""{} is not running but is said to be running. So, shutting it down and declare it killed""".format(vm_entry.key)) vm_entry.declare_killed() vm_pool.put(vm_entry) if _vm: running_vms.remove(_vm) def main(hostname): heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,)) host_pool = HostPool(etcd_client, env_vars.get('HOST_PREFIX')) host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) assert host is not None, "No such host with name = {}".format(hostname) try: heartbeat_updating_process.start() except Exception as e: logger.info("No Need To Go Further. Our heartbeat updating mechanism is not working") logger.exception(e) exit(-1) logger.info("%s Session Started %s", '*' * 5, '*' * 5) # It is seen that under heavy load, timeout event doesn't come # in a predictive manner (which is intentional because we give # higher priority to customer's requests) which delays heart # beat update which in turn misunderstood by scheduler that the # host is dead when it is actually alive. So, to ensure that we # update the heart beat in a predictive manner we start Heart # beat updating mechanism in separated thread for events_iterator in [ etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True), etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=10, value_in_json=True), ]: for request_event in events_iterator: request_event = RequestEntry(request_event) if request_event.type == "TIMEOUT": maintenance(host) continue # If the event is directed toward me OR I am destination of a InitVMMigration if request_event.hostname == host.key or request_event.destination == host.key: logger.debug("VM Request: %s", request_event) request_pool.client.client.delete(request_event.key) vm_entry = vm_pool.get(request_event.uuid) if vm_entry: if request_event.type == RequestType.StartVM: virtualmachine.start(vm_entry) elif request_event.type == RequestType.StopVM: virtualmachine.stop(vm_entry) elif request_event.type == RequestType.DeleteVM: virtualmachine.delete(vm_entry) elif request_event.type == RequestType.InitVMMigration: virtualmachine.start(vm_entry, host.key, find_free_port()) elif request_event.type == RequestType.TransferVM: virtualmachine.transfer(request_event) else: logger.info("VM Entry missing") logger.info("Running VMs %s", running_vms) if __name__ == "__main__": argparser = argparse.ArgumentParser() argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1") args = argparser.parse_args() mp.set_start_method('spawn') main(args.hostname)