new tests added, dead host detection/mitigation, fix bug in specs difference computing code
This commit is contained in:
parent
5f4b7088ff
commit
f078b9b245
8 changed files with 450 additions and 138 deletions
110
main.py
110
main.py
|
|
@ -5,13 +5,23 @@
|
|||
# 3. v3) Introduce a status endpoint of the scheduler -
|
||||
# maybe expose a prometheus compatible output
|
||||
|
||||
import etcd3
|
||||
import json
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
from decouple import config
|
||||
from collections import Counter
|
||||
from functools import reduce
|
||||
from etcd3_wrapper import Etcd3Wrapper
|
||||
from datetime import datetime
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.DEBUG,
|
||||
filename="log.txt",
|
||||
filemode="a",
|
||||
format="%(asctime)s: %(levelname)s - %(message)s",
|
||||
datefmt="%d-%b-%y %H:%M:%S",
|
||||
)
|
||||
|
||||
|
||||
class VmPool(object):
|
||||
|
|
@ -20,9 +30,7 @@ class VmPool(object):
|
|||
self.vms = []
|
||||
|
||||
_vms = self.client.get_prefix(vm_prefix)
|
||||
self.vms = [
|
||||
(vm[1].key.decode("utf-8"), json.loads(vm[0])) for vm in _vms
|
||||
]
|
||||
self.vms = [(vm.key, json.loads(vm.value)) for vm in _vms]
|
||||
|
||||
@staticmethod
|
||||
def by_host(vms, host):
|
||||
|
|
@ -38,7 +46,7 @@ class VmPool(object):
|
|||
|
||||
|
||||
def accumulated_specs(vms_specs):
|
||||
if vms_specs == []:
|
||||
if not vms_specs:
|
||||
return {}
|
||||
return reduce((lambda x, y: Counter(x) + Counter(y)), vms_specs)
|
||||
|
||||
|
|
@ -51,16 +59,13 @@ def remaining_resources(host_specs, vms_specs):
|
|||
|
||||
return remaining
|
||||
|
||||
|
||||
def get_suitable_host(etcd_client, vm_prefix, host_prefix, vm_specs):
|
||||
vm_pool = VmPool(etcd_client, vm_prefix)
|
||||
hosts = etcd_client.get_prefix(host_prefix)
|
||||
hosts = etcd_client.get_prefix(host_prefix, value_in_json=True)
|
||||
hosts = filter(lambda h: h.value["status"] == "ALIVE", hosts)
|
||||
|
||||
for host in hosts:
|
||||
_host_name, host_specs = (
|
||||
host[1].key.decode("utf-8"),
|
||||
json.loads(host[0]),
|
||||
)
|
||||
_host_name, host_value = (host.key, host.value)
|
||||
|
||||
# Get All Virtual Machines
|
||||
vms = vm_pool.vms
|
||||
|
|
@ -72,19 +77,19 @@ def get_suitable_host(etcd_client, vm_prefix, host_prefix, vm_specs):
|
|||
vms = VmPool.except_status(vms, "REQUESTED_NEW")
|
||||
|
||||
running_vms_specs = [vm[1]["specs"] for vm in vms]
|
||||
|
||||
# Accumulate all of their combined specs
|
||||
running_vms_accumulated_specs = accumulated_specs(running_vms_specs)
|
||||
print(running_vms_accumulated_specs)
|
||||
|
||||
# Find out remaining resources after
|
||||
# host_specs - already running vm_specs
|
||||
print(host_value)
|
||||
remaining = remaining_resources(
|
||||
host_specs, running_vms_accumulated_specs
|
||||
host_value["specs"], running_vms_accumulated_specs
|
||||
)
|
||||
|
||||
print(remaining)
|
||||
# Find out remaining - new_vm_specs
|
||||
remaining = remaining_resources(remaining, vm_specs)
|
||||
|
||||
# if remaining resources >= 0 return this host_name
|
||||
if all(
|
||||
map(lambda x: True if remaining[x] >= 0 else False, remaining)
|
||||
|
|
@ -94,31 +99,78 @@ def get_suitable_host(etcd_client, vm_prefix, host_prefix, vm_specs):
|
|||
return None
|
||||
|
||||
|
||||
def main(vm_prefix, host_prefix):
|
||||
def dead_host_detection(hosts):
|
||||
dead_hosts_keys = []
|
||||
for host in hosts:
|
||||
# Bring out your dead! - Monty Python and the Holy Grail
|
||||
|
||||
client = etcd3.client(
|
||||
if "status" in host.value and "last_heartbeat" in host.value:
|
||||
# Don't count that is already buried
|
||||
if host.value["status"] == "DEAD":
|
||||
continue
|
||||
|
||||
last_heartbeat = datetime.fromisoformat(
|
||||
host.value["last_heartbeat"]
|
||||
)
|
||||
delta = datetime.utcnow() - last_heartbeat
|
||||
if delta.total_seconds() > 60:
|
||||
dead_hosts_keys.append(host.key)
|
||||
else:
|
||||
dead_hosts_keys.append(host.key)
|
||||
|
||||
return dead_hosts_keys
|
||||
|
||||
|
||||
def dead_host_mitigation(client: Etcd3Wrapper, dead_hosts_keys):
|
||||
for host_key in dead_hosts_keys:
|
||||
host = client.get(host_key, value_in_json=True)
|
||||
host.value["status"] = "DEAD"
|
||||
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
|
||||
client.put(host.key, host.value, value_in_json=True)
|
||||
|
||||
# Find all vms that were hosted on this dead host
|
||||
all_vms = client.get_prefix(config("VM_PREFIX"), value_in_json=True)
|
||||
vms_hosted_on_dead_host = filter(
|
||||
lambda _vm: _vm.value["hostname"] == host_key, all_vms
|
||||
)
|
||||
for vm in vms_hosted_on_dead_host:
|
||||
vm.value["host"] = ""
|
||||
vm.value["status"] = "REQUESTED_NEW"
|
||||
client.put(vm.key, vm.value, value_in_json=True)
|
||||
|
||||
|
||||
def main(vm_prefix, host_prefix):
|
||||
client = Etcd3Wrapper(
|
||||
host=config("ETCD_HOST"), port=int(config("ETCD_PORT"))
|
||||
)
|
||||
|
||||
events_iterator, _ = client.watch_prefix(vm_prefix)
|
||||
events_iterator = client.watch_prefix(vm_prefix, timeout=10)
|
||||
|
||||
for event in events_iterator:
|
||||
key = event.key
|
||||
value = event.value
|
||||
if not value:
|
||||
for e in events_iterator:
|
||||
try:
|
||||
e.value = json.loads(e.value)
|
||||
except json.JSONDecodeError:
|
||||
logging.error(f"Invalid JSON {e.value}")
|
||||
continue
|
||||
value = json.loads(event.value)
|
||||
|
||||
print(key, value)
|
||||
logging.debug(e.key, e.value)
|
||||
|
||||
if value["status"] == "REQUESTED_NEW":
|
||||
e_status = e.value["status"]
|
||||
|
||||
if e_status == "TIMEOUT":
|
||||
logging.info("Timeout")
|
||||
hosts = client.get_prefix(host_prefix, value_in_json=True)
|
||||
dead_hosts = dead_host_detection(hosts)
|
||||
dead_host_mitigation(client, dead_hosts)
|
||||
|
||||
elif e_status == "REQUESTED_NEW":
|
||||
host_name = get_suitable_host(
|
||||
client, vm_prefix, host_prefix, value["specs"]
|
||||
client, vm_prefix, host_prefix, e.value["specs"]
|
||||
)
|
||||
if host_name:
|
||||
value["status"] = "SCHEDULED_DEPLOY"
|
||||
value["hostname"] = host_name
|
||||
client.put(key, json.dumps(value))
|
||||
e.value["status"] = "SCHEDULED_DEPLOY"
|
||||
e.value["hostname"] = host_name
|
||||
client.put(e.key, json.dumps(e.value))
|
||||
else:
|
||||
# email admin
|
||||
print("No Resource Left. Emailing admin....")
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue