forked from uncloud/uncloud
		
	2. uncloud.shared moved under uncloud.common
3. Refactoring in etcd_wrapper e.g timeout mechanism removed and few other things
4. uncloud-{scheduler,host} now better handle etcd events in their block state (waiting for requests to come)
		
	
			
		
			
				
	
	
		
			123 lines
		
	
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
			
		
		
	
	
			123 lines
		
	
	
	
		
			4.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable file
		
	
	
	
	
| import argparse
 | |
| import multiprocessing as mp
 | |
| import time
 | |
| 
 | |
| from uuid import uuid4
 | |
| 
 | |
| from uncloud.common.request import RequestEntry, RequestType
 | |
| from uncloud.common.shared import shared
 | |
| from uncloud.common.settings import settings
 | |
| from uncloud.common.vm import VMStatus
 | |
| from uncloud.vmm import VMM
 | |
| from os.path import join as join_path
 | |
| 
 | |
| from . import virtualmachine, logger
 | |
| 
 | |
| arg_parser = argparse.ArgumentParser('host', add_help=False)
 | |
| arg_parser.add_argument('--hostname', required=True)
 | |
| 
 | |
| 
 | |
| def update_heartbeat(hostname):
 | |
|     """Update Last HeartBeat Time for :param hostname: in etcd"""
 | |
|     host_pool = shared.host_pool
 | |
|     this_host = next(
 | |
|         filter(lambda h: h.hostname == hostname, host_pool.hosts), None
 | |
|     )
 | |
|     while True:
 | |
|         this_host.update_heartbeat()
 | |
|         host_pool.put(this_host)
 | |
|         time.sleep(10)
 | |
| 
 | |
| 
 | |
| def maintenance(host):
 | |
|     vmm = VMM()
 | |
|     running_vms = vmm.discover()
 | |
|     for vm_uuid in running_vms:
 | |
|         if vmm.is_running(vm_uuid) and vmm.get_status(vm_uuid) == 'running':
 | |
|             logger.debug('VM {} is running on {}'.format(vm_uuid, host))
 | |
|             vm = shared.vm_pool.get(
 | |
|                 join_path(settings['etcd']['vm_prefix'], vm_uuid)
 | |
|             )
 | |
|             vm.status = VMStatus.running
 | |
|             vm.vnc_socket = vmm.get_vnc(vm_uuid)
 | |
|             vm.hostname = host
 | |
|             shared.vm_pool.put(vm)
 | |
| 
 | |
| 
 | |
| def main(hostname, debug=False):
 | |
|     host_pool = shared.host_pool
 | |
|     host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
 | |
| 
 | |
|     # Does not yet exist, create it
 | |
|     if not host:
 | |
|         host_key = join_path(
 | |
|             settings['etcd']['host_prefix'], uuid4().hex
 | |
|         )
 | |
|         host_entry = {
 | |
|             'specs': '',
 | |
|             'hostname': hostname,
 | |
|             'status': 'DEAD',
 | |
|             'last_heartbeat': '',
 | |
|         }
 | |
|         shared.etcd_client.put(
 | |
|             host_key, host_entry, value_in_json=True
 | |
|         )
 | |
| 
 | |
|     # update, get ourselves now for sure
 | |
|     host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
 | |
| 
 | |
|     try:
 | |
|         heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,))
 | |
|         heartbeat_updating_process.start()
 | |
|     except Exception as e:
 | |
|         raise Exception('uncloud-host heartbeat updating mechanism is not working') from e
 | |
| 
 | |
|     # The below while True is neccessary for gracefully handling leadership transfer and temporary
 | |
|     # unavailability in etcd. Why does it work? It works because the get_prefix,watch_prefix return
 | |
|     # iter([]) that is iterator of empty list on exception (that occur due to above mentioned reasons)
 | |
|     # which ends the loop immediately. So, having it inside infinite loop we try again and again to
 | |
|     # get prefix until either success or deamon death comes.
 | |
|     while True:
 | |
|         for events_iterator in [
 | |
|             shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True,
 | |
|                                           raise_exception=False),
 | |
|             shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], value_in_json=True,
 | |
|                                             raise_exception=False)
 | |
|         ]:
 | |
|             for request_event in events_iterator:
 | |
|                 request_event = RequestEntry(request_event)
 | |
| 
 | |
|                 maintenance(host.key)
 | |
| 
 | |
|                 if request_event.hostname == host.key:
 | |
|                     logger.debug('VM Request: %s on Host %s', request_event, host.hostname)
 | |
| 
 | |
|                     shared.request_pool.client.client.delete(request_event.key)
 | |
|                     vm_entry = shared.etcd_client.get(
 | |
|                         join_path(settings['etcd']['vm_prefix'], request_event.uuid)
 | |
|                     )
 | |
| 
 | |
|                     logger.debug('VM hostname: {}'.format(vm_entry.value))
 | |
| 
 | |
|                     vm = virtualmachine.VM(vm_entry)
 | |
|                     if request_event.type == RequestType.StartVM:
 | |
|                         vm.start()
 | |
| 
 | |
|                     elif request_event.type == RequestType.StopVM:
 | |
|                         vm.stop()
 | |
| 
 | |
|                     elif request_event.type == RequestType.DeleteVM:
 | |
|                         vm.delete()
 | |
| 
 | |
|                     elif request_event.type == RequestType.InitVMMigration:
 | |
|                         vm.start(destination_host_key=host.key)
 | |
| 
 | |
|                     elif request_event.type == RequestType.TransferVM:
 | |
|                         destination_host = host_pool.get(request_event.destination_host_key)
 | |
|                         if destination_host:
 | |
|                             vm.migrate(
 | |
|                                 destination_host=destination_host.hostname,
 | |
|                                 destination_sock_path=request_event.destination_sock_path,
 | |
|                             )
 | |
|                         else:
 | |
|                             logger.error('Host %s not found!', request_event.destination_host_key)
 |