ucloud-scheduler/main.py

106 lines
4.0 KiB
Python
Executable File

# TODO
# 1. send an email to an email address defined by env['admin-email']
# if resources are finished
# 2. v3) Introduce a status endpoint of the scheduler -
# maybe expose a prometheus compatible output
import argparse
import logging
from decouple import config
from config import etcd_client as client
from ucloud_common.vm import VmPool
from ucloud_common.host import HostPool
from ucloud_common.request import RequestEntry, RequestPool, RequestType
from helper import (get_suitable_host, dead_host_mitigation, dead_host_detection,
assign_host)
logging.basicConfig(
level=logging.DEBUG,
filename="log.txt",
filemode="a",
format="%(asctime)s: %(levelname)s - %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
)
def main(vm_prefix, host_prefix, request_prefix):
logging.info(f"{'*' * 5} SESSION STARTED {'*' * 5}")
vm_pool = VmPool(client, vm_prefix)
host_pool = HostPool(client, host_prefix)
request_pool = RequestPool(client, request_prefix)
PENDING_VMS = []
for events_iterator in [
client.get_prefix(request_prefix, value_in_json=True),
client.watch_prefix(request_prefix, timeout=5, value_in_json=True),
]:
for e in events_iterator:
if not e.value:
continue
e = RequestEntry(e)
logging.debug(f"{e.key}, {e.value}")
# Never Run time critical mechanism inside timeout
# mechanism because timeout mechanism only comes
# when no other event is happening. It means under
# heavy load there would not be a timeout<
if e.type == "TIMEOUT":
logging.debug("TIMEOUT event occured")
dead_hosts = dead_host_detection()
logging.debug(f"Dead hosts: {dead_hosts}")
dead_host_mitigation(dead_hosts)
#
# vm_scheduled = []
# for vm in PENDING_VMS:
# if assign_host(vm) is not None:
# vm_scheduled.append(vm)
#
# for vm in vm_scheduled:
# PENDING_VMS.remove(vm)
# logging.debug(f"Remaining Pending: {PENDING_VMS}")
elif e.type == RequestType.ScheduleVM:
if hasattr(e, "migration") and e.migration and\
hasattr(e, "destination") and e.destination:
client.client.delete(e.key)
vm = vm_pool.get(e.uuid)
host = get_suitable_host(vm.specs, [host_pool.get(e.destination)])
if host:
r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
uuid=e.uuid, destination=e.destination)
request_pool.put(r)
print(host, e)
else:
logging.info("Requested destination host doesn't have enough capacity"
f"to hold {vm.uuid}")
else:
client.client.delete(e.key)
vm = vm_pool.get(e.uuid)
if assign_host(vm) is None:
vm.log.append("Can't schedule VM. No Resource Left.")
vm_pool.put(vm)
PENDING_VMS.append(vm)
logging.info("No Resource Left. Emailing admin....")
logging.debug(f"Pending VMS: {PENDING_VMS}")
if __name__ == "__main__":
argparser = argparse.ArgumentParser()
argparser.add_argument(
"--vm_prefix", required=False, default=config("VM_PREFIX")
)
argparser.add_argument(
"--host_prefix", required=False, default=config("HOST_PREFIX")
)
argparser.add_argument(
"--request_prefix", required=False, default=config("REQUEST_PREFIX")
)
args = argparser.parse_args()
main(args.vm_prefix, args.host_prefix, args.request_prefix)