diff --git a/.gitignore b/.gitignore index 817f5fa..188d100 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,5 @@ __pycache__/ venv/ log.txt +etcd3_wrapper +ucloud_common diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 316b9ea..0000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "etcd3_wrapper"] - path = etcd3_wrapper - url = https://code.ungleich.ch/ahmedbilal/etcd3_wrapper diff --git a/Pipfile b/Pipfile index 291da6d..b89aadf 100644 --- a/Pipfile +++ b/Pipfile @@ -9,10 +9,10 @@ flake8 = "*" black = "==19.3b0" [packages] -etcd3 = "*" python-decouple = "*" pytest = "*" coverage = "*" +python-etcd3 = {editable = true,git = "https://github.com/kragniz/python-etcd3"} [requires] python_version = "3.7" diff --git a/Pipfile.lock b/Pipfile.lock index 627e5b1..e57798a 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "8a70e21524bd2bfc041ea7520f90d4a0e981a6c888623f62b76c9df05e9028e1" + "sha256": "f0d725d20f218d8ac861af45435d0473721c4a511bb9dfbdce8b68b2653fccaa" }, "pipfile-spec": 6, "requires": { @@ -67,13 +67,6 @@ "index": "pypi", "version": "==4.5.3" }, - "etcd3": { - "hashes": [ - "sha256:25a524b9f032c6631ff0097532907dea81243eaa63c3744510fd1598cc4e0e87" - ], - "index": "pypi", - "version": "==0.10.0" - }, "grpcio": { "hashes": [ "sha256:03b78b4e7dcdfe3e257bb528cc93923f9cbbab6d5babf15a60d21e9a4a70b1a2", @@ -120,10 +113,10 @@ }, "more-itertools": { "hashes": [ - "sha256:3ad685ff8512bf6dc5a8b82ebf73543999b657eded8c11803d9ba6b648986f4d", - "sha256:8bb43d1f51ecef60d81854af61a3a880555a14643691cc4b64a6ee269c78f09a" + "sha256:409cd48d4db7052af495b09dec721011634af3753ae1ef92d2b32f73a745f832", + "sha256:92b8c4b06dac4f0611c0729b2f2ede52b2e1bac1ab48f089c7ddc12e26bb60c4" ], - "version": "==7.1.0" + "version": "==7.2.0" }, "packaging": { "hashes": [ @@ -171,10 +164,10 @@ }, "pyparsing": { "hashes": [ - "sha256:1873c03321fc118f4e9746baf201ff990ceb915f433f23b395f5580d1840cb2a", - "sha256:9b6323ef4ab914af344ba97510e966d64ba91055d6b9afa6b30799340e89cc03" + "sha256:43c5486cefefa536c9aab528881c992328f020eefe4f6d06332449c365218580", + "sha256:d6c5ffe9d0305b9b977f7a642d36b9370954d1da7ada4c62393382cbadad4265" ], - "version": "==2.4.0" + "version": "==2.4.1.1" }, "pytest": { "hashes": [ @@ -191,6 +184,11 @@ "index": "pypi", "version": "==3.1" }, + "python-etcd3": { + "editable": true, + "git": "https://github.com/kragniz/python-etcd3", + "ref": "cdc4c48bde88a795230a02aa574df84ed9ccfa52" + }, "six": { "hashes": [ "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", @@ -258,6 +256,13 @@ ], "version": "==7.0" }, + "ddt": { + "hashes": [ + "sha256:474546b4020ce8a2f9550ba8899c28aa2c284c7bbf175bddede98be949d1ca7c", + "sha256:d13e6af8f36238e89d00f4ebccf2bda4f6d1878be560a6600689e42077e164e3" + ], + "version": "==1.2.1" + }, "entrypoints": { "hashes": [ "sha256:589f874b313739ad35be6e0cd7efde2a4e9b6fea91edcc34e58ecbb8dbe56d19", @@ -273,6 +278,12 @@ "index": "pypi", "version": "==3.7.8" }, + "gitdb": { + "hashes": [ + "sha256:a3ebbc27be035a2e874ed904df516e35f4a29a778a764385de09de9e0f139658" + ], + "version": "==0.6.4" + }, "gitdb2": { "hashes": [ "sha256:83361131a1836661a155172932a13c08bda2db3674e4caa32368aa6eb02f38c2", @@ -282,10 +293,10 @@ }, "gitpython": { "hashes": [ - "sha256:563221e5a44369c6b79172f455584c9ebbb122a13368cc82cb4b5addff788f82", - "sha256:8237dc5bfd6f1366abeee5624111b9d6879393d84745a507de0fda86043b65a8" + "sha256:7428f1cc5e72d53e65c3259d5cebc22fb2b07f973c49d95b3c3d26c64890a3c3", + "sha256:a0f744a4941eac5d99033eb0adcbec83bf443ee173fda4292d92a906aedce952" ], - "version": "==2.1.11" + "version": "==2.1.12" }, "mccabe": { "hashes": [ @@ -296,10 +307,10 @@ }, "pbr": { "hashes": [ - "sha256:36ebd78196e8c9588c972f5571230a059ff83783fabbbbedecc07be263ccd7e6", - "sha256:5a03f59455ad54f01a94c15829b8b70065462b7bd8d5d7e983306b59127fc841" + "sha256:0ca44dc9fd3b04a22297c2a91082d8df2894862e8f4c86a49dac69eae9e85ca0", + "sha256:4aed6c1b1fa5020def0f22aed663d87b81bb3235f112490b07d2643d7a98c5b5" ], - "version": "==5.4.0" + "version": "==5.4.1" }, "pycodestyle": { "hashes": [ @@ -338,6 +349,12 @@ ], "version": "==1.12.0" }, + "smmap": { + "hashes": [ + "sha256:0e2b62b497bd5f0afebc002eda4d90df9d209c30ef257e8673c90a6b5c119d62" + ], + "version": "==0.9.0" + }, "smmap2": { "hashes": [ "sha256:0555a7bf4df71d1ef4218e4807bbf9b201f910174e6e08af2e138d4e517b4dde", diff --git a/__init__ b/__init__ deleted file mode 100644 index e69de29..0000000 diff --git a/etcd3_wrapper b/etcd3_wrapper deleted file mode 160000 index 9abc74e..0000000 --- a/etcd3_wrapper +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 9abc74e387bf94832a0ad95644d667abb87e5529 diff --git a/main.py b/main.py index 74be3e4..1c78d4f 100644 --- a/main.py +++ b/main.py @@ -11,9 +11,11 @@ import logging from decouple import config from collections import Counter from functools import reduce -from etcd3_wrapper import Etcd3Wrapper, EtcdEntry, PseudoEtcdMeta -from datetime import datetime -from ucloud_common.enums import VMStatus, RUNNING_VM_STATUSES +from etcd3_wrapper import Etcd3Wrapper + +from ucloud_common.enums import HostStatus +from ucloud_common.vm import VmPool, VMEntry +from ucloud_common.host import HostPool logging.basicConfig( level=logging.DEBUG, @@ -23,27 +25,8 @@ logging.basicConfig( datefmt="%d-%b-%y %H:%M:%S", ) - -class VmPool(object): - def __init__(self, etcd_client, vm_prefix): - self.client = etcd_client - self.vms = [] - - _vms = self.client.get_prefix(vm_prefix) - self.vms = [(vm.key, json.loads(vm.value)) for vm in _vms] - - @staticmethod - def by_host(vms, host): - print(vms) - return list(filter(lambda x: x[1]["hostname"] == host, vms)) - - @staticmethod - def by_status(vms, status): - return list(filter(lambda x: x[1]["status"] == status, vms)) - - @staticmethod - def except_status(vms, status): - return list(filter(lambda x: x[1]["status"] != status, vms)) +VM_POOL = None +HOST_POOL = None def accumulated_specs(vms_specs): @@ -61,149 +44,124 @@ def remaining_resources(host_specs, vms_specs): return remaining -def get_suitable_host(etcd_client, vm_prefix, host_prefix, vm_specs): - vm_pool = VmPool(etcd_client, vm_prefix) - hosts = etcd_client.get_prefix(host_prefix, value_in_json=True) - hosts = filter(lambda h: h.value["status"] == "ALIVE", hosts) +def get_suitable_host(vm_specs): + hosts = HOST_POOL.by_status("ALIVE") for host in hosts: - _host_name, host_value = (host.key, host.value) - - # Get All Virtual Machines - vms = vm_pool.vms - # Filter them by host_name - vms = VmPool.by_host(vms, _host_name) + vms = VM_POOL.by_host(host.key) # Filter them by status - vms = VmPool.except_status(vms, "REQUESTED_NEW") + vms = VM_POOL.except_status("REQUESTED_NEW", vms=vms) - running_vms_specs = [vm[1]["specs"] for vm in vms] + running_vms_specs = [vm.specs for vm in vms] # Accumulate all of their combined specs - running_vms_accumulated_specs = accumulated_specs(running_vms_specs) - # print(running_vms_accumulated_specs) + running_vms_accumulated_specs = accumulated_specs( + running_vms_specs + ) # Find out remaining resources after # host_specs - already running vm_specs - # print(host_value) remaining = remaining_resources( - host_value["specs"], running_vms_accumulated_specs + host.specs, running_vms_accumulated_specs ) - # print(remaining) + # Find out remaining - new_vm_specs remaining = remaining_resources(remaining, vm_specs) # if remaining resources >= 0 return this host_name if all( - map(lambda x: True if remaining[x] >= 0 else False, remaining) + map( + lambda x: True if remaining[x] >= 0 else False, + remaining, + ) ): - return _host_name + return host.key return None -def dead_host_detection(hosts): +def dead_host_detection(): + # Bring out your dead! - Monty Python and the Holy Grail + hosts = HOST_POOL.by_status(HostStatus.alive) dead_hosts_keys = [] + for host in hosts: - # Bring out your dead! - Monty Python and the Holy Grail - - if "status" in host.value and "last_heartbeat" in host.value: - # Don't count that is already buried - if host.value["status"] == "DEAD": - continue - - last_heartbeat = datetime.fromisoformat( - host.value["last_heartbeat"] - ) - delta = datetime.utcnow() - last_heartbeat - if delta.total_seconds() > 60: + # Only check those who claims to be alive + if host.status == HostStatus.alive: + if not host.is_alive(): dead_hosts_keys.append(host.key) - else: - dead_hosts_keys.append(host.key) return dead_hosts_keys -def dead_host_mitigation(client: Etcd3Wrapper, dead_hosts_keys): +def dead_host_mitigation(dead_hosts_keys): for host_key in dead_hosts_keys: - host = client.get(host_key, value_in_json=True) - host.value["status"] = "DEAD" - host.value["last_heartbeat"] = datetime.utcnow().isoformat() + host = HOST_POOL.get(host_key) + host.declare_dead() - # Find all vms that were hosted on this dead host - all_vms = client.get_prefix(config("VM_PREFIX"), value_in_json=True) - vms_hosted_on_dead_host = filter( - lambda _vm: _vm.value["hostname"] == host_key, all_vms - ) + vms_hosted_on_dead_host = VM_POOL.by_host(host_key) for vm in vms_hosted_on_dead_host: - vm.value["hostname"] = "" - if vm.value["status"] in RUNNING_VM_STATUSES: - vm.value["status"] = VMStatus.requested_start - client.put(vm.key, vm.value, value_in_json=True) - - client.put(host.key, host.value, value_in_json=True) + vm.declare_killed() + VM_POOL.put(vm) + HOST_POOL.put(host) -def assign_host(client, vm_prefix, host_prefix, e): - host_name = get_suitable_host( - client, vm_prefix, host_prefix, e.value["specs"] - ) +def assign_host(vm): + host_name = get_suitable_host(vm.specs) if host_name: - if e.value["status"] == "REQUESTED_NEW": - e.value["status"] = "SCHEDULED_DEPLOY" - else: - e.value["status"] = "REQUESTED_START" - - e.value["hostname"] = host_name - client.put(e.key, json.dumps(e.value)) + if vm.status == "REQUESTED_NEW": + vm.status = "SCHEDULED_DEPLOY" + + if vm.status == "KILLED": + vm.status = "REQUESTED_START" + + vm.hostname = host_name + VM_POOL.put(vm) + return host_name return None def main(vm_prefix, host_prefix): + logging.info(f"{'*' * 5} SESSION STARTED {'*' * 5}") client = Etcd3Wrapper( host=config("ETCD_HOST"), port=int(config("ETCD_PORT")) ) - RESCAN_VMS = False + global VM_POOL, HOST_POOL + VM_POOL = VmPool(client, vm_prefix) + HOST_POOL = HostPool(client, host_prefix) + + PENDING_VMS = [] for events_iterator in [ - client.get_prefix(vm_prefix), - client.watch_prefix(vm_prefix, timeout=10), + client.get_prefix(vm_prefix, value_in_json=True), + client.watch_prefix(vm_prefix, timeout=10, value_in_json=True), ]: for e in events_iterator: - try: - e.value = json.loads(e.value) - except json.JSONDecodeError: - logging.error(f"Invalid JSON {e.value}") - continue + e = VMEntry(e) + logging.debug(f"{e.key}, {e.value}") - logging.debug(e.key, e.value) + # Never Run time critical mechanism inside timeout + # mechanism because timeout mechanism only comes + # when no other event is happening. It means under + # heavy load there would not be a timeout + if e.status == "TIMEOUT": + dead_hosts = dead_host_detection() + logging.debug(f"Dead hosts: {dead_hosts}") + dead_host_mitigation(dead_hosts) - e_status = e.value["status"] + vm_scheduled = [] + for vm in PENDING_VMS: + if assign_host(vm) is not None: + vm_scheduled.append(vm) - if e_status == "TIMEOUT": - logging.info("Timeout") - hosts = client.get_prefix(host_prefix, value_in_json=True) - dead_hosts = dead_host_detection(hosts) - dead_host_mitigation(client, dead_hosts) - - if RESCAN_VMS: - RESCAN_VMS = False # Assume we won't need it after this - vms = client.get_prefix(vm_prefix) - - for vm in vms: - fake_e = EtcdEntry( - PseudoEtcdMeta(key=vm.key.encode("utf-8")), - value=vm.value.encode("utf-8"), value_in_json=True - ) - if (assign_host(client, vm_prefix, host_prefix, - fake_e) is None): - # We need it because we still have vm left - # to schedule - RESCAN_VMS = True - - elif e_status in ["REQUESTED_NEW", "REQUESTED_START"]: - if assign_host(client, vm_prefix, host_prefix, e) is None: - print("No Resource Left. Emailing admin....") - RESCAN_VMS = True + for vm in vm_scheduled: + PENDING_VMS.remove(vm) + logging.debug(f"Remaining Pending: {PENDING_VMS}") + elif e.status in ["REQUESTED_NEW", "KILLED"]: + if assign_host(e) is None: + PENDING_VMS.append(e) + logging.info("No Resource Left. Emailing admin....") + logging.debug(f"Pending VMS: {PENDING_VMS}") if __name__ == "__main__": diff --git a/ucloud_common/__init__.py b/ucloud_common/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/ucloud_common/enums.py b/ucloud_common/enums.py deleted file mode 100644 index 6d5f403..0000000 --- a/ucloud_common/enums.py +++ /dev/null @@ -1,34 +0,0 @@ -from enum import Enum - -class VMStatus(Enum): - # Must be only assigned to brand new VM - requested_new = "REQUESTED_NEW" - - # Only Assigned to already created vm - requested_start = "REQUESTED_START" - - # These all are for running vms - requested_shutdown = "REQUESTED_SHUTDOWN" - requested_suspend = "REQUESTED_SUSPEND" - requested_resume = "REQUESTED_RESUME" - requested_migrate = "REQUESTED_MIGRATE" - - # either its image is not found or user requested - # to delete it - deleted = "DELETED" - - stopped = "STOPPED" # After requested_shutdown - killed = "KILLED" # either host died or vm died itself - - running = "RUNNING" - suspended = "SUSPENDED" - - -class HostStatus(Enum): - alive = "ALIVE" - dead = "DEAD" - - -RUNNING_VM_STATUSES = [VMStatus.requested_shutdown, VMStatus.requested_suspend, - VMStatus.requested_resume, VMStatus.requested_migrate, - VMStatus.running, VMStatus.suspended] \ No newline at end of file