152 lines
		
	
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			152 lines
		
	
	
	
		
			5.9 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| import argparse
 | |
| import multiprocessing as mp
 | |
| import time
 | |
| 
 | |
| from etcd3_wrapper import Etcd3Wrapper
 | |
| 
 | |
| from ucloud.common.request import RequestEntry, RequestType
 | |
| from ucloud.config import (vm_pool, request_pool,
 | |
|                     etcd_client, running_vms,
 | |
|                     etcd_wrapper_args, etcd_wrapper_kwargs,
 | |
|                     HostPool, config)
 | |
| 
 | |
| from .helper import find_free_port
 | |
| from . import virtualmachine
 | |
| from ucloud.host import logger
 | |
| 
 | |
| 
 | |
| def update_heartbeat(hostname):
 | |
|     """Update Last HeartBeat Time for :param hostname: in etcd"""
 | |
|     client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs)
 | |
|     host_pool = HostPool(client, config['etcd']['HOST_PREFIX'])
 | |
|     this_host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
 | |
| 
 | |
|     while True:
 | |
|         this_host.update_heartbeat()
 | |
|         host_pool.put(this_host)
 | |
|         time.sleep(10)
 | |
| 
 | |
| 
 | |
| def maintenance(host):
 | |
|     # To capture vm running according to running_vms list
 | |
| 
 | |
|     # This is to capture successful migration of a VM.
 | |
|     # Suppose, this host is running "vm1" and user initiated
 | |
|     # request to migrate this "vm1" to some other host. On,
 | |
|     # successful migration the destination host would set
 | |
|     # the vm hostname to itself. Thus, we are checking
 | |
|     # whether this host vm is successfully migrated. If yes
 | |
|     # then we shutdown "vm1" on this host.
 | |
| 
 | |
|     to_be_removed = []
 | |
|     for running_vm in running_vms:
 | |
|         with vm_pool.get_put(running_vm.key) as vm_entry:
 | |
|             if vm_entry.hostname != host.key and not vm_entry.in_migration:
 | |
|                 running_vm.handle.shutdown()
 | |
|                 logger.info("VM migration not completed successfully.")
 | |
|                 to_be_removed.append(running_vm)
 | |
| 
 | |
|     for r in to_be_removed:
 | |
|         running_vms.remove(r)
 | |
| 
 | |
|     # To check vm running according to etcd entries
 | |
|     alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key))
 | |
| 
 | |
|     for vm_entry in alleged_running_vms:
 | |
|         _vm = virtualmachine.get_vm(running_vms, vm_entry.key)
 | |
|         # Whether, the allegedly running vm is in our
 | |
|         # running_vms list or not if it is said to be
 | |
|         # running on this host but it is not then we
 | |
|         # need to shut it down
 | |
| 
 | |
|         # This is to capture poweroff/shutdown of a VM
 | |
|         # initiated by user inside VM. OR crash of VM by some
 | |
|         # user running process
 | |
|         if (_vm and not _vm.handle.is_running()) or not _vm:
 | |
|             logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running()))
 | |
|             vm_entry.add_log("""{} is not running but is said to be running.
 | |
|                                 So, shutting it down and declare it killed""".format(vm_entry.key))
 | |
|             vm_entry.declare_killed()
 | |
|             vm_pool.put(vm_entry)
 | |
|             if _vm:
 | |
|                 running_vms.remove(_vm)
 | |
| 
 | |
| def check():
 | |
|     if config['etcd']['STORAGE_BACKEND'] == 'filesystem' and not isdir(config['etcd']['VM_DIR']):
 | |
|         print("You have set STORAGE_BACKEND to filesystem. So, the vm directory mentioned"
 | |
|               " in .env file must exists. But, it don't.")
 | |
|         sys.exit(1)
 | |
| 
 | |
| 
 | |
| 
 | |
| def main(hostname):
 | |
|     check()
 | |
| 
 | |
|     heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,))
 | |
| 
 | |
|     host_pool = HostPool(etcd_client, config['etcd']['HOST_PREFIX'])
 | |
|     host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
 | |
|     assert host is not None, "No such host with name = {}".format(hostname)
 | |
| 
 | |
|     try:
 | |
|         heartbeat_updating_process.start()
 | |
|     except Exception as e:
 | |
|         logger.info("No Need To Go Further. Our heartbeat updating mechanism is not working")
 | |
|         logger.exception(e)
 | |
|         exit(-1)
 | |
| 
 | |
|     logger.info("%s Session Started %s", '*' * 5, '*' * 5)
 | |
| 
 | |
|     # It is seen that under heavy load, timeout event doesn't come
 | |
|     # in a predictive manner (which is intentional because we give
 | |
|     # higher priority to customer's requests) which delays heart
 | |
|     # beat update which in turn misunderstood by scheduler that the
 | |
|     # host is dead when it is actually alive. So, to ensure that we
 | |
|     # update the heart beat in a predictive manner we start Heart
 | |
|     # beat updating mechanism in separated thread
 | |
| 
 | |
|     for events_iterator in [
 | |
|         etcd_client.get_prefix(config['etcd']['REQUEST_PREFIX'], value_in_json=True),
 | |
|         etcd_client.watch_prefix(config['etcd']['REQUEST_PREFIX'], timeout=10, value_in_json=True),
 | |
|     ]:
 | |
|         for request_event in events_iterator:
 | |
|             request_event = RequestEntry(request_event)
 | |
| 
 | |
|             if request_event.type == "TIMEOUT":
 | |
|                 maintenance(host)
 | |
|                 continue
 | |
| 
 | |
|             # If the event is directed toward me OR I am destination of a InitVMMigration
 | |
|             if request_event.hostname == host.key or request_event.destination == host.key:
 | |
|                 logger.debug("VM Request: %s", request_event)
 | |
| 
 | |
|                 request_pool.client.client.delete(request_event.key)
 | |
|                 vm_entry = vm_pool.get(request_event.uuid)
 | |
| 
 | |
|                 if vm_entry:
 | |
|                     if request_event.type == RequestType.StartVM:
 | |
|                         virtualmachine.start(vm_entry)
 | |
| 
 | |
|                     elif request_event.type == RequestType.StopVM:
 | |
|                         virtualmachine.stop(vm_entry)
 | |
| 
 | |
|                     elif request_event.type == RequestType.DeleteVM:
 | |
|                         virtualmachine.delete(vm_entry)
 | |
| 
 | |
|                     elif request_event.type == RequestType.InitVMMigration:
 | |
|                         virtualmachine.start(vm_entry, host.key, find_free_port())
 | |
| 
 | |
|                     elif request_event.type == RequestType.TransferVM:
 | |
|                         virtualmachine.transfer(request_event)
 | |
|                 else:
 | |
|                     logger.info("VM Entry missing")
 | |
| 
 | |
|                 logger.info("Running VMs %s", running_vms)
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     argparser = argparse.ArgumentParser()
 | |
|     argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1")
 | |
|     args = argparser.parse_args()
 | |
|     mp.set_start_method('spawn')
 | |
|     main(args.hostname)
 |