# TODO # 1. send an email to an email address defined by env['admin-email'] # if resources are finished # 2. v3) Introduce a status endpoint of the scheduler - # maybe expose a prometheus compatible output import json import argparse import logging from decouple import config from collections import Counter from functools import reduce from etcd3_wrapper import Etcd3Wrapper, EtcdEntry, PseudoEtcdMeta from datetime import datetime logging.basicConfig( level=logging.DEBUG, filename="log.txt", filemode="a", format="%(asctime)s: %(levelname)s - %(message)s", datefmt="%d-%b-%y %H:%M:%S", ) class VmPool(object): def __init__(self, etcd_client, vm_prefix): self.client = etcd_client self.vms = [] _vms = self.client.get_prefix(vm_prefix) self.vms = [(vm.key, json.loads(vm.value)) for vm in _vms] @staticmethod def by_host(vms, host): return list(filter(lambda x: x[1]["hostname"] == host, vms)) @staticmethod def by_status(vms, status): return list(filter(lambda x: x[1]["status"] == status, vms)) @staticmethod def except_status(vms, status): return list(filter(lambda x: x[1]["status"] != status, vms)) def accumulated_specs(vms_specs): if not vms_specs: return {} return reduce((lambda x, y: Counter(x) + Counter(y)), vms_specs) def remaining_resources(host_specs, vms_specs): """Return remaining resources host_specs - vms""" vms_specs = Counter(vms_specs) remaining = Counter(host_specs) remaining.subtract(vms_specs) return remaining def get_suitable_host(etcd_client, vm_prefix, host_prefix, vm_specs): vm_pool = VmPool(etcd_client, vm_prefix) hosts = etcd_client.get_prefix(host_prefix, value_in_json=True) hosts = filter(lambda h: h.value["status"] == "ALIVE", hosts) for host in hosts: _host_name, host_value = (host.key, host.value) # Get All Virtual Machines vms = vm_pool.vms # Filter them by host_name vms = VmPool.by_host(vms, _host_name) # Filter them by status vms = VmPool.except_status(vms, "REQUESTED_NEW") running_vms_specs = [vm[1]["specs"] for vm in vms] # Accumulate all of their combined specs running_vms_accumulated_specs = accumulated_specs(running_vms_specs) # print(running_vms_accumulated_specs) # Find out remaining resources after # host_specs - already running vm_specs # print(host_value) remaining = remaining_resources( host_value["specs"], running_vms_accumulated_specs ) # print(remaining) # Find out remaining - new_vm_specs remaining = remaining_resources(remaining, vm_specs) # if remaining resources >= 0 return this host_name if all( map(lambda x: True if remaining[x] >= 0 else False, remaining) ): return _host_name return None def dead_host_detection(hosts): dead_hosts_keys = [] for host in hosts: # Bring out your dead! - Monty Python and the Holy Grail if "status" in host.value and "last_heartbeat" in host.value: # Don't count that is already buried if host.value["status"] == "DEAD": continue last_heartbeat = datetime.fromisoformat( host.value["last_heartbeat"] ) delta = datetime.utcnow() - last_heartbeat if delta.total_seconds() > 60: dead_hosts_keys.append(host.key) else: dead_hosts_keys.append(host.key) return dead_hosts_keys def dead_host_mitigation(client: Etcd3Wrapper, dead_hosts_keys): for host_key in dead_hosts_keys: host = client.get(host_key, value_in_json=True) host.value["status"] = "DEAD" host.value["last_heartbeat"] = datetime.utcnow().isoformat() client.put(host.key, host.value, value_in_json=True) # Find all vms that were hosted on this dead host all_vms = client.get_prefix(config("VM_PREFIX"), value_in_json=True) vms_hosted_on_dead_host = filter( lambda _vm: _vm.value["hostname"] == host_key, all_vms ) for vm in vms_hosted_on_dead_host: vm.value["hostname"] = "" if vm.value["status"] != "STOPPED": vm.value["status"] = "REQUESTED_NEW" client.put(vm.key, vm.value, value_in_json=True) def assign_host(client, vm_prefix, host_prefix, e): host_name = get_suitable_host( client, vm_prefix, host_prefix, e.value["specs"] ) if host_name: if e.value["status"] == "REQUESTED_NEW": e.value["status"] = "SCHEDULED_DEPLOY" else: e.value["status"] = "REQUESTED_START" e.value["hostname"] = host_name client.put(e.key, json.dumps(e.value)) return host_name return None def main(vm_prefix, host_prefix): client = Etcd3Wrapper( host=config("ETCD_HOST"), port=int(config("ETCD_PORT")) ) RESCAN_VMS = False for events_iterator in [ client.get_prefix(vm_prefix), client.watch_prefix(vm_prefix, timeout=10), ]: for e in events_iterator: try: e.value = json.loads(e.value) except json.JSONDecodeError: logging.error(f"Invalid JSON {e.value}") continue logging.debug(e.key, e.value) e_status = e.value["status"] if e_status == "TIMEOUT": client.client.delete(e.key) logging.info("Timeout") hosts = client.get_prefix(host_prefix, value_in_json=True) dead_hosts = dead_host_detection(hosts) dead_host_mitigation(client, dead_hosts) if RESCAN_VMS: RESCAN_VMS = False # Assume we won't need it after this vms = client.get_prefix(vm_prefix) for vm in vms: fake_e = EtcdEntry( PseudoEtcdMeta(key=vm.key.encode("utf-8")), value=vm.value.encode("utf-8"), value_in_json=True ) if (assign_host(client, vm_prefix, host_prefix, fake_e) is None): # We need it because we still have vm left # to schedule RESCAN_VMS = True elif e_status in ["REQUESTED_NEW", "REQUESTED_START"]: if assign_host(client, vm_prefix, host_prefix, e) is None: print("No Resource Left. Emailing admin....") RESCAN_VMS = True if __name__ == "__main__": argparser = argparse.ArgumentParser() argparser.add_argument( "--vm_prefix", required=False, default=config("VM_PREFIX") ) argparser.add_argument( "--host_prefix", required=False, default=config("HOST_PREFIX") ) args = argparser.parse_args() main(args.vm_prefix, args.host_prefix)