import argparse import multiprocessing as mp import time from uuid import uuid4 from uncloud.common.request import RequestEntry, RequestType from uncloud.common.shared import shared from uncloud.common.vm import VMStatus from uncloud.vmm import VMM from os.path import join as join_path from . import virtualmachine, logger arg_parser = argparse.ArgumentParser('host', add_help=False) arg_parser.add_argument('--hostname', required=True) def update_heartbeat(hostname): """Update Last HeartBeat Time for :param hostname: in etcd""" host_pool = shared.host_pool this_host = next( filter(lambda h: h.hostname == hostname, host_pool.hosts), None ) while True: this_host.update_heartbeat() host_pool.put(this_host) time.sleep(10) def maintenance(host): vmm = VMM() running_vms = vmm.discover() for vm_uuid in running_vms: if vmm.is_running(vm_uuid) and vmm.get_status(vm_uuid) == 'running': logger.debug('VM {} is running on {}'.format(vm_uuid, host)) vm = shared.vm_pool.get( join_path(shared.settings['etcd']['vm_prefix'], vm_uuid) ) vm.status = VMStatus.running vm.vnc_socket = vmm.get_vnc(vm_uuid) vm.hostname = host shared.vm_pool.put(vm) def main(arguments): hostname = arguments['hostname'] host_pool = shared.host_pool host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) # Does not yet exist, create it if not host: host_key = join_path( shared.settings['etcd']['host_prefix'], uuid4().hex ) host_entry = { 'specs': '', 'hostname': hostname, 'status': 'DEAD', 'last_heartbeat': '', } shared.etcd_client.put( host_key, host_entry, value_in_json=True ) # update, get ourselves now for sure host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) try: heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,)) heartbeat_updating_process.start() except Exception as e: raise Exception('uncloud-host heartbeat updating mechanism is not working') from e # The below while True is neccessary for gracefully handling leadership transfer and temporary # unavailability in etcd. Why does it work? It works because the get_prefix,watch_prefix return # iter([]) that is iterator of empty list on exception (that occur due to above mentioned reasons) # which ends the loop immediately. So, having it inside infinite loop we try again and again to # get prefix until either success or deamon death comes. while True: for events_iterator in [ shared.etcd_client.get_prefix(shared.settings['etcd']['request_prefix'], value_in_json=True, raise_exception=False), shared.etcd_client.watch_prefix(shared.settings['etcd']['request_prefix'], value_in_json=True, raise_exception=False) ]: for request_event in events_iterator: request_event = RequestEntry(request_event) maintenance(host.key) if request_event.hostname == host.key: logger.debug('VM Request: %s on Host %s', request_event, host.hostname) shared.request_pool.client.client.delete(request_event.key) vm_entry = shared.etcd_client.get( join_path(shared.settings['etcd']['vm_prefix'], request_event.uuid) ) logger.debug('VM hostname: {}'.format(vm_entry.value)) vm = virtualmachine.VM(vm_entry) if request_event.type == RequestType.StartVM: vm.start() elif request_event.type == RequestType.StopVM: vm.stop() elif request_event.type == RequestType.DeleteVM: vm.delete() elif request_event.type == RequestType.InitVMMigration: vm.start(destination_host_key=host.key) elif request_event.type == RequestType.TransferVM: destination_host = host_pool.get(request_event.destination_host_key) if destination_host: vm.migrate( destination_host=destination_host.hostname, destination_sock_path=request_event.destination_sock_path, ) else: logger.error('Host %s not found!', request_event.destination_host_key)