uncloud-mravi/uncloud/host/main.py

120 lines
4.2 KiB
Python
Raw Normal View History

import argparse
import multiprocessing as mp
import time
2019-12-31 13:06:51 +00:00
from uuid import uuid4
from uncloud.common.request import RequestEntry, RequestType
from uncloud.shared import shared
from uncloud.settings import settings
from uncloud.common.vm import VMStatus
from uncloud.vmm import VMM
from os.path import join as join_path
2019-12-21 09:36:55 +00:00
from . import virtualmachine, logger
def update_heartbeat(hostname):
"""Update Last HeartBeat Time for :param hostname: in etcd"""
host_pool = shared.host_pool
this_host = next(
filter(lambda h: h.hostname == hostname, host_pool.hosts), None
)
while True:
this_host.update_heartbeat()
host_pool.put(this_host)
2019-12-30 10:18:25 +00:00
time.sleep(10)
def maintenance(host):
vmm = VMM()
running_vms = vmm.discover()
for vm_uuid in running_vms:
if vmm.is_running(vm_uuid) and vmm.get_status(vm_uuid) == "running":
logger.debug('VM {} is running on {}'.format(vm_uuid, host))
vm = shared.vm_pool.get(
join_path(settings["etcd"]["vm_prefix"], vm_uuid)
)
vm.status = VMStatus.running
vm.vnc_socket = vmm.get_vnc(vm_uuid)
vm.hostname = host
shared.vm_pool.put(vm)
def main(hostname, debug=False):
host_pool = shared.host_pool
host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
2019-12-31 13:06:51 +00:00
# Does not yet exist, create it
if not host:
host_key = join_path(
settings["etcd"]["host_prefix"], uuid4().hex
)
host_entry = {
"specs": "",
"hostname": hostname,
"status": "DEAD",
"last_heartbeat": "",
}
shared.etcd_client.put(
host_key, host_entry, value_in_json=True
)
2019-12-31 13:13:08 +00:00
# update, get ourselves now for sure
host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
try:
heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,))
heartbeat_updating_process.start()
except Exception as e:
raise Exception("uncloud-host heartbeat updating mechanism is not working") from e
for events_iterator in [
shared.etcd_client.get_prefix(settings["etcd"]["request_prefix"], value_in_json=True),
shared.etcd_client.watch_prefix(settings["etcd"]["request_prefix"], timeout=10, value_in_json=True)
]:
for request_event in events_iterator:
request_event = RequestEntry(request_event)
if request_event.type == "TIMEOUT":
maintenance(host.key)
elif request_event.hostname == host.key:
logger.debug("VM Request: %s on Host %s", request_event, host.hostname)
shared.request_pool.client.client.delete(request_event.key)
vm_entry = shared.etcd_client.get(
join_path(settings["etcd"]["vm_prefix"], request_event.uuid)
)
logger.debug("VM hostname: {}".format(vm_entry.value))
vm = virtualmachine.VM(vm_entry)
if request_event.type == RequestType.StartVM:
vm.start()
elif request_event.type == RequestType.StopVM:
vm.stop()
elif request_event.type == RequestType.DeleteVM:
vm.delete()
elif request_event.type == RequestType.InitVMMigration:
vm.start(destination_host_key=host.key)
elif request_event.type == RequestType.TransferVM:
destination_host = host_pool.get(request_event.destination_host_key)
if destination_host:
vm.migrate(
destination_host=destination_host.hostname,
destination_sock_path=request_event.destination_sock_path,
)
else:
logger.error("Host %s not found!", request_event.destination_host_key)
if __name__ == "__main__":
argparser = argparse.ArgumentParser()
argparser.add_argument(
"hostname", help="Name of this host. e.g uncloud1.ungleich.ch"
)
args = argparser.parse_args()
mp.set_start_method("spawn")
main(args.hostname)