import argparse import threading import time import virtualmachine from ucloud_common.host import HostEntry from ucloud_common.request import RequestEntry, RequestType from config import (vm_pool, host_pool, request_pool, etcd_client, logging, running_vms) def update_heartbeat(host: HostEntry): while True: host.update_heartbeat() host_pool.put(host) time.sleep(10) logging.info(f"Updated last heartbeat time {host.last_heartbeat}") def maintenance(host): # To capture vm running according to running_vms list # This is to capture successful migration of a VM. # Suppose, this host is running "vm1" and user initiated # request to migrate this "vm1" to some other host. On, # successful migration the destination host would set # the vm hostname to itself. Thus, we are checking # whether this host vm is successfully migrated. If yes # then we shutdown "vm1" on this host. for running_vm in running_vms: with vm_pool.get_put(running_vm.key) as vm_entry: if vm_entry.hostname != host.key and not vm_entry.in_migration: running_vm.handle.shutdown() vm_entry.add_log("VM on source host shutdown.") # To check vm running according to etcd entries alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key)) for vm_entry in alleged_running_vms: _vm = virtualmachine.get_vm(running_vms, vm_entry.key) # Whether, the allegedly running vm is in our # running_vms list or not if it is said to be # running on this host but it is not then we # need to shut it down # This is to capture poweroff/shutdown of a VM # initiated by user inside VM. OR crash of VM by some # user running process if (_vm and not _vm.handle.is_running())\ or not _vm: vm_entry.add_log(f"{vm_entry.key} is not running but is said to be running." "So, shutting it down and declare it killed") vm_entry.declare_killed() vm_pool.put(vm_entry) if _vm: running_vms.remove(_vm) def main(): argparser = argparse.ArgumentParser() argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1") args = argparser.parse_args() host = host_pool.get(args.hostname) if not host: print("No Such Host") exit(1) logging.info(f"{'*' * 5} Session Started {'*' * 5}") # It is seen that under heavy load, timeout event doesn't come # in a predictive manner (which is intentional because we give # higher priority to customer's requests) which delays heart # beat update which in turn misunderstood by scheduler that the # host is dead when it is actually alive. So, to ensure that we # update the heart beat in a predictive manner we start Heart # beat updating mechanism in separated thread heartbeat_updating_thread = threading.Thread(target=update_heartbeat, args=(host,)) try: heartbeat_updating_thread.start() except Exception as e: print("No Need To Go Further. Our heartbeat updating mechanism is not working") logging.exception(e) exit(-1) for events_iterator in [ etcd_client.get_prefix("/v1/request/", value_in_json=True), etcd_client.watch_prefix("/v1/request/", timeout=10, value_in_json=True), ]: for request_event in events_iterator: request_event = RequestEntry(request_event) if request_event.type == "TIMEOUT": logging.info("Timeout Event") maintenance(host) continue # If the event is directed toward me OR I am destination of a InitVMMigration if hasattr(request_event, "hostname") and request_event.hostname == host.key or\ hasattr(request_event, "destination") and request_event.destination == host.key: request_pool.client.client.delete(request_event.key) vm_entry = vm_pool.get(request_event.uuid) logging.debug(f"EVENT: {request_event}") if request_event.type == RequestType.StartVM: virtualmachine.start(vm_entry) elif request_event.type == RequestType.StopVM: virtualmachine.stop(vm_entry) elif request_event.type == RequestType.DeleteVM: virtualmachine.delete(vm_entry) elif request_event.type == RequestType.InitVMMigration: virtualmachine.init_migration(vm_entry, host.key) elif request_event.type == RequestType.TransferVM: virtualmachine.transfer(request_event) logging.info(f"Running VMs {running_vms}") main()