import argparse # import threading import time import os import sys import virtualmachine import multiprocessing as mp from ucloud_common.host import HostEntry from ucloud_common.request import RequestEntry, RequestType from config import (vm_pool, host_pool, request_pool, etcd_client, logging, running_vms, etcd_wrapper_args, etcd_wrapper_kwargs, REQUEST_PREFIX, HOST_PREFIX, WITHOUT_CEPH, VM_DIR, HostPool) from etcd3_wrapper import Etcd3Wrapper import etcd3 def update_heartbeat(host): client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs) host_pool = HostPool(client, HOST_PREFIX) this_host = host_pool.get(host) while True: this_host.update_heartbeat() host_pool.put(this_host) time.sleep(10) def maintenance(host): # To capture vm running according to running_vms list # This is to capture successful migration of a VM. # Suppose, this host is running "vm1" and user initiated # request to migrate this "vm1" to some other host. On, # successful migration the destination host would set # the vm hostname to itself. Thus, we are checking # whether this host vm is successfully migrated. If yes # then we shutdown "vm1" on this host. for running_vm in running_vms: with vm_pool.get_put(running_vm.key) as vm_entry: if vm_entry.hostname != host.key and not vm_entry.in_migration: running_vm.handle.shutdown() vm_entry.add_log("VM on source host shutdown.") # To check vm running according to etcd entries alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key)) for vm_entry in alleged_running_vms: _vm = virtualmachine.get_vm(running_vms, vm_entry.key) # Whether, the allegedly running vm is in our # running_vms list or not if it is said to be # running on this host but it is not then we # need to shut it down # This is to capture poweroff/shutdown of a VM # initiated by user inside VM. OR crash of VM by some # user running process if (_vm and not _vm.handle.is_running()) or not _vm: vm_entry.add_log("""{} is not running but is said to be running. So, shutting it down and declare it killed""".format(vm_entry.key)) vm_entry.declare_killed() vm_pool.put(vm_entry) if _vm: running_vms.remove(_vm) def main(): argparser = argparse.ArgumentParser() argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1") args = argparser.parse_args() assert WITHOUT_CEPH and os.path.isdir(VM_DIR), ( "You have set WITHOUT_CEPH to True. So, the vm directory mentioned" " in .env file must exists. But, it don't." ) mp.set_start_method('spawn') heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(args.hostname,)) host_pool = HostPool(etcd_client, HOST_PREFIX) host = host_pool.get(args.hostname) assert host, "No such host" try: heartbeat_updating_process.start() except Exception as e: logging.info("No Need To Go Further. Our heartbeat updating mechanism is not working") logging.exception(e) exit(-1) logging.info("%s Session Started %s", '*' * 5, '*' * 5) # It is seen that under heavy load, timeout event doesn't come # in a predictive manner (which is intentional because we give # higher priority to customer's requests) which delays heart # beat update which in turn misunderstood by scheduler that the # host is dead when it is actually alive. So, to ensure that we # update the heart beat in a predictive manner we start Heart # beat updating mechanism in separated thread for events_iterator in [ etcd_client.get_prefix(REQUEST_PREFIX, value_in_json=True), etcd_client.watch_prefix(REQUEST_PREFIX, timeout=10, value_in_json=True), ]: for request_event in events_iterator: request_event = RequestEntry(request_event) if request_event.type == "TIMEOUT": logging.info("Timeout Event") maintenance(host) continue # If the event is directed toward me OR I am destination of a InitVMMigration if (request_event.hostname == host.key or request_event.destination == host.key): logging.debug("EVENT: %s", request_event) request_pool.client.client.delete(request_event.key) vm_entry = vm_pool.get(request_event.uuid) if vm_entry: if request_event.type == RequestType.StartVM: virtualmachine.start(vm_entry) elif request_event.type == RequestType.StopVM: virtualmachine.stop(vm_entry) elif request_event.type == RequestType.DeleteVM: virtualmachine.delete(vm_entry) elif request_event.type == RequestType.InitVMMigration: virtualmachine.init_migration(vm_entry, host.key) elif request_event.type == RequestType.TransferVM: virtualmachine.transfer(request_event) else: logging.info("VM Entry missing") logging.info("Running VMs %s", running_vms) if __name__ == "__main__": main()