From 17b40b13cceea308ab1c4b6519caf07b7e56d0c0 Mon Sep 17 00:00:00 2001 From: Ahmad Bilal Khalid Date: Tue, 25 Jun 2019 16:39:29 +0500 Subject: [PATCH] initial code --- .gitignore | 4 ++ Pipfile | 13 +++++++ main.py | 109 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 126 insertions(+) create mode 100644 .gitignore create mode 100644 Pipfile create mode 100644 main.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..514dde7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.env +.idea/ +__pycache__/ +venv/ \ No newline at end of file diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..4ce6cef --- /dev/null +++ b/Pipfile @@ -0,0 +1,13 @@ +[[source]] +name = "pypi" +url = "https://pypi.org/simple" +verify_ssl = true + +[dev-packages] + +[packages] +etcd3 = "*" +python-decouple = "*" + +[requires] +python_version = "3.7" diff --git a/main.py b/main.py new file mode 100644 index 0000000..c70e65f --- /dev/null +++ b/main.py @@ -0,0 +1,109 @@ +# TODO +# send an email to an email address defined by env['admin-email'] if resources are finished +# v3) Introduce a status endpoint of the scheduler - maybe expose a prometheus compatible output + +import etcd3 +import json + +from decouple import config +from collections import Counter + + +class VmPool(object): + def __init__(self, etcd_client, vm_prefix): + self.client = etcd_client + self.vms = [] + + _vms = self.client.get_prefix(vm_prefix) + self.vms = [(vm[1].key.decode("utf-8"), json.loads(vm[0])) for vm in _vms] + + @staticmethod + def by_host(vms, host): + return list(filter(lambda x: x[1]["hostname"] == host, vms)) + + @staticmethod + def by_status(vms, status): + return list(filter(lambda x: x[1]["status"] == status, vms)) + + @staticmethod + def except_status(vms, status): + return list(filter(lambda x: x[1]["status"] != status, vms)) + + +def accumulated_specs(vms): + """Accumulate specs of all :param vms:""" + specs = Counter() + for vm in vms: + _, _value = vm + vm_specs = _value["specs"] + specs += Counter(vm_specs) + return specs + + +def remaining_resources(host_specs, vms): + """Return remaining resources host_specs - vms""" + vms_specs = Counter(vms) + remaining = Counter(host_specs) + remaining.subtract(vms_specs) + + return remaining + + +def get_suitable_host(etcd_client, vm_prefix, host_prefix, vm_specs): + vm_pool = VmPool(etcd_client, vm_prefix) + hosts = client.get_prefix(host_prefix) + + for host in hosts: + _host_name, host_specs = host[1].key.decode("utf-8"), json.loads(host[0]) + + # Get All Virtual Machines + vms = vm_pool.vms + + # Filter them by host_name + vms = VmPool.by_host(vms, _host_name) + + # Filter them by status + vms = VmPool.except_status(vms, "REQUESTED_NEW") + + # Accumulate all of their combined specs + vms_accumulated_specs = accumulated_specs(vms) + + # Find out remaining resources after host_specs - already running vm_specs + remaining = remaining_resources(host_specs, vms_accumulated_specs) + + # Find out remaining - new_vm_specs + remaining = remaining_resources(remaining, vm_specs) + + # if remaining resources >= 0 return this host_name + if all(map(lambda x: True if remaining[x] >= 0 else False, remaining)): + return _host_name + + return None + + +VM_PREFIX = "/v1/vm/" +HOST_PREFIX = "/v1/host/" + +client = etcd3.client(host=config("ETCD_HOST"), port=int(config("ETCD_PORT"))) + +while True: + events_iterator, cancel = client.watch_prefix(VM_PREFIX) + for event in events_iterator: + key = event.key + value = event.value + if not value: + continue + value = json.loads(event.value) + + print(key, value) + + if value["status"] == "REQUESTED_NEW": + host_name = get_suitable_host(client, VM_PREFIX, HOST_PREFIX, value["specs"]) + print("hostname", host_name) + if host_name: + value["status"] = "SCHEDULED_DEPLOY" + value["hostname"] = host_name + client.put(key, json.dumps(value)) + else: + # email admin + print("No Resource Left. Emailing admin....")