uncloud-mravi/uncloud_etcd_based/uncloud/host/main.py

124 lines
4.7 KiB
Python
Raw Normal View History

import argparse
import multiprocessing as mp
import time
2020-01-03 13:38:59 +00:00
2019-12-31 13:06:51 +00:00
from uuid import uuid4
from uncloud.common.request import RequestEntry, RequestType
from uncloud.common.shared import shared
from uncloud.common.vm import VMStatus
from uncloud.vmm import VMM
from os.path import join as join_path
2019-12-21 09:36:55 +00:00
from . import virtualmachine, logger
2020-01-03 13:38:59 +00:00
arg_parser = argparse.ArgumentParser('host', add_help=False)
arg_parser.add_argument('--hostname', required=True)
def update_heartbeat(hostname):
"""Update Last HeartBeat Time for :param hostname: in etcd"""
host_pool = shared.host_pool
this_host = next(
filter(lambda h: h.hostname == hostname, host_pool.hosts), None
)
while True:
this_host.update_heartbeat()
host_pool.put(this_host)
2019-12-30 10:18:25 +00:00
time.sleep(10)
def maintenance(host):
vmm = VMM()
running_vms = vmm.discover()
for vm_uuid in running_vms:
if vmm.is_running(vm_uuid) and vmm.get_status(vm_uuid) == 'running':
logger.debug('VM {} is running on {}'.format(vm_uuid, host))
vm = shared.vm_pool.get(
join_path(shared.settings['etcd']['vm_prefix'], vm_uuid)
)
vm.status = VMStatus.running
vm.vnc_socket = vmm.get_vnc(vm_uuid)
vm.hostname = host
shared.vm_pool.put(vm)
def main(arguments):
hostname = arguments['hostname']
host_pool = shared.host_pool
host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
2019-12-31 13:06:51 +00:00
# Does not yet exist, create it
if not host:
host_key = join_path(
shared.settings['etcd']['host_prefix'], uuid4().hex
2019-12-31 13:06:51 +00:00
)
host_entry = {
'specs': '',
'hostname': hostname,
'status': 'DEAD',
'last_heartbeat': '',
2019-12-31 13:06:51 +00:00
}
shared.etcd_client.put(
host_key, host_entry, value_in_json=True
)
2019-12-31 13:13:08 +00:00
# update, get ourselves now for sure
host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
try:
heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,))
heartbeat_updating_process.start()
except Exception as e:
raise Exception('uncloud-host heartbeat updating mechanism is not working') from e
# The below while True is neccessary for gracefully handling leadership transfer and temporary
# unavailability in etcd. Why does it work? It works because the get_prefix,watch_prefix return
# iter([]) that is iterator of empty list on exception (that occur due to above mentioned reasons)
# which ends the loop immediately. So, having it inside infinite loop we try again and again to
# get prefix until either success or deamon death comes.
while True:
for events_iterator in [
shared.etcd_client.get_prefix(shared.settings['etcd']['request_prefix'], value_in_json=True,
raise_exception=False),
shared.etcd_client.watch_prefix(shared.settings['etcd']['request_prefix'], value_in_json=True,
raise_exception=False)
]:
for request_event in events_iterator:
request_event = RequestEntry(request_event)
maintenance(host.key)
if request_event.hostname == host.key:
logger.debug('VM Request: %s on Host %s', request_event, host.hostname)
shared.request_pool.client.client.delete(request_event.key)
vm_entry = shared.etcd_client.get(
join_path(shared.settings['etcd']['vm_prefix'], request_event.uuid)
)
logger.debug('VM hostname: {}'.format(vm_entry.value))
vm = virtualmachine.VM(vm_entry)
if request_event.type == RequestType.StartVM:
vm.start()
elif request_event.type == RequestType.StopVM:
vm.stop()
elif request_event.type == RequestType.DeleteVM:
vm.delete()
elif request_event.type == RequestType.InitVMMigration:
vm.start(destination_host_key=host.key)
elif request_event.type == RequestType.TransferVM:
destination_host = host_pool.get(request_event.destination_host_key)
if destination_host:
vm.migrate(
destination_host=destination_host.hostname,
destination_sock_path=request_event.destination_sock_path,
)
else:
logger.error('Host %s not found!', request_event.destination_host_key)