working till migration

This commit is contained in:
ahmadbilalkhalid 2019-07-27 13:57:51 +05:00
commit 0ecb2d9cb8
9 changed files with 117 additions and 178 deletions

196
main.py
View file

@ -11,9 +11,11 @@ import logging
from decouple import config
from collections import Counter
from functools import reduce
from etcd3_wrapper import Etcd3Wrapper, EtcdEntry, PseudoEtcdMeta
from datetime import datetime
from ucloud_common.enums import VMStatus, RUNNING_VM_STATUSES
from etcd3_wrapper import Etcd3Wrapper
from ucloud_common.enums import HostStatus
from ucloud_common.vm import VmPool, VMEntry
from ucloud_common.host import HostPool
logging.basicConfig(
level=logging.DEBUG,
@ -23,27 +25,8 @@ logging.basicConfig(
datefmt="%d-%b-%y %H:%M:%S",
)
class VmPool(object):
def __init__(self, etcd_client, vm_prefix):
self.client = etcd_client
self.vms = []
_vms = self.client.get_prefix(vm_prefix)
self.vms = [(vm.key, json.loads(vm.value)) for vm in _vms]
@staticmethod
def by_host(vms, host):
print(vms)
return list(filter(lambda x: x[1]["hostname"] == host, vms))
@staticmethod
def by_status(vms, status):
return list(filter(lambda x: x[1]["status"] == status, vms))
@staticmethod
def except_status(vms, status):
return list(filter(lambda x: x[1]["status"] != status, vms))
VM_POOL = None
HOST_POOL = None
def accumulated_specs(vms_specs):
@ -61,149 +44,124 @@ def remaining_resources(host_specs, vms_specs):
return remaining
def get_suitable_host(etcd_client, vm_prefix, host_prefix, vm_specs):
vm_pool = VmPool(etcd_client, vm_prefix)
hosts = etcd_client.get_prefix(host_prefix, value_in_json=True)
hosts = filter(lambda h: h.value["status"] == "ALIVE", hosts)
def get_suitable_host(vm_specs):
hosts = HOST_POOL.by_status("ALIVE")
for host in hosts:
_host_name, host_value = (host.key, host.value)
# Get All Virtual Machines
vms = vm_pool.vms
# Filter them by host_name
vms = VmPool.by_host(vms, _host_name)
vms = VM_POOL.by_host(host.key)
# Filter them by status
vms = VmPool.except_status(vms, "REQUESTED_NEW")
vms = VM_POOL.except_status("REQUESTED_NEW", vms=vms)
running_vms_specs = [vm[1]["specs"] for vm in vms]
running_vms_specs = [vm.specs for vm in vms]
# Accumulate all of their combined specs
running_vms_accumulated_specs = accumulated_specs(running_vms_specs)
# print(running_vms_accumulated_specs)
running_vms_accumulated_specs = accumulated_specs(
running_vms_specs
)
# Find out remaining resources after
# host_specs - already running vm_specs
# print(host_value)
remaining = remaining_resources(
host_value["specs"], running_vms_accumulated_specs
host.specs, running_vms_accumulated_specs
)
# print(remaining)
# Find out remaining - new_vm_specs
remaining = remaining_resources(remaining, vm_specs)
# if remaining resources >= 0 return this host_name
if all(
map(lambda x: True if remaining[x] >= 0 else False, remaining)
map(
lambda x: True if remaining[x] >= 0 else False,
remaining,
)
):
return _host_name
return host.key
return None
def dead_host_detection(hosts):
def dead_host_detection():
# Bring out your dead! - Monty Python and the Holy Grail
hosts = HOST_POOL.by_status(HostStatus.alive)
dead_hosts_keys = []
for host in hosts:
# Bring out your dead! - Monty Python and the Holy Grail
if "status" in host.value and "last_heartbeat" in host.value:
# Don't count that is already buried
if host.value["status"] == "DEAD":
continue
last_heartbeat = datetime.fromisoformat(
host.value["last_heartbeat"]
)
delta = datetime.utcnow() - last_heartbeat
if delta.total_seconds() > 60:
# Only check those who claims to be alive
if host.status == HostStatus.alive:
if not host.is_alive():
dead_hosts_keys.append(host.key)
else:
dead_hosts_keys.append(host.key)
return dead_hosts_keys
def dead_host_mitigation(client: Etcd3Wrapper, dead_hosts_keys):
def dead_host_mitigation(dead_hosts_keys):
for host_key in dead_hosts_keys:
host = client.get(host_key, value_in_json=True)
host.value["status"] = "DEAD"
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
host = HOST_POOL.get(host_key)
host.declare_dead()
# Find all vms that were hosted on this dead host
all_vms = client.get_prefix(config("VM_PREFIX"), value_in_json=True)
vms_hosted_on_dead_host = filter(
lambda _vm: _vm.value["hostname"] == host_key, all_vms
)
vms_hosted_on_dead_host = VM_POOL.by_host(host_key)
for vm in vms_hosted_on_dead_host:
vm.value["hostname"] = ""
if vm.value["status"] in RUNNING_VM_STATUSES:
vm.value["status"] = VMStatus.requested_start
client.put(vm.key, vm.value, value_in_json=True)
client.put(host.key, host.value, value_in_json=True)
vm.declare_killed()
VM_POOL.put(vm)
HOST_POOL.put(host)
def assign_host(client, vm_prefix, host_prefix, e):
host_name = get_suitable_host(
client, vm_prefix, host_prefix, e.value["specs"]
)
def assign_host(vm):
host_name = get_suitable_host(vm.specs)
if host_name:
if e.value["status"] == "REQUESTED_NEW":
e.value["status"] = "SCHEDULED_DEPLOY"
else:
e.value["status"] = "REQUESTED_START"
e.value["hostname"] = host_name
client.put(e.key, json.dumps(e.value))
if vm.status == "REQUESTED_NEW":
vm.status = "SCHEDULED_DEPLOY"
if vm.status == "KILLED":
vm.status = "REQUESTED_START"
vm.hostname = host_name
VM_POOL.put(vm)
return host_name
return None
def main(vm_prefix, host_prefix):
logging.info(f"{'*' * 5} SESSION STARTED {'*' * 5}")
client = Etcd3Wrapper(
host=config("ETCD_HOST"), port=int(config("ETCD_PORT"))
)
RESCAN_VMS = False
global VM_POOL, HOST_POOL
VM_POOL = VmPool(client, vm_prefix)
HOST_POOL = HostPool(client, host_prefix)
PENDING_VMS = []
for events_iterator in [
client.get_prefix(vm_prefix),
client.watch_prefix(vm_prefix, timeout=10),
client.get_prefix(vm_prefix, value_in_json=True),
client.watch_prefix(vm_prefix, timeout=10, value_in_json=True),
]:
for e in events_iterator:
try:
e.value = json.loads(e.value)
except json.JSONDecodeError:
logging.error(f"Invalid JSON {e.value}")
continue
e = VMEntry(e)
logging.debug(f"{e.key}, {e.value}")
logging.debug(e.key, e.value)
# Never Run time critical mechanism inside timeout
# mechanism because timeout mechanism only comes
# when no other event is happening. It means under
# heavy load there would not be a timeout
if e.status == "TIMEOUT":
dead_hosts = dead_host_detection()
logging.debug(f"Dead hosts: {dead_hosts}")
dead_host_mitigation(dead_hosts)
e_status = e.value["status"]
vm_scheduled = []
for vm in PENDING_VMS:
if assign_host(vm) is not None:
vm_scheduled.append(vm)
if e_status == "TIMEOUT":
logging.info("Timeout")
hosts = client.get_prefix(host_prefix, value_in_json=True)
dead_hosts = dead_host_detection(hosts)
dead_host_mitigation(client, dead_hosts)
if RESCAN_VMS:
RESCAN_VMS = False # Assume we won't need it after this
vms = client.get_prefix(vm_prefix)
for vm in vms:
fake_e = EtcdEntry(
PseudoEtcdMeta(key=vm.key.encode("utf-8")),
value=vm.value.encode("utf-8"), value_in_json=True
)
if (assign_host(client, vm_prefix, host_prefix,
fake_e) is None):
# We need it because we still have vm left
# to schedule
RESCAN_VMS = True
elif e_status in ["REQUESTED_NEW", "REQUESTED_START"]:
if assign_host(client, vm_prefix, host_prefix, e) is None:
print("No Resource Left. Emailing admin....")
RESCAN_VMS = True
for vm in vm_scheduled:
PENDING_VMS.remove(vm)
logging.debug(f"Remaining Pending: {PENDING_VMS}")
elif e.status in ["REQUESTED_NEW", "KILLED"]:
if assign_host(e) is None:
PENDING_VMS.append(e)
logging.info("No Resource Left. Emailing admin....")
logging.debug(f"Pending VMS: {PENDING_VMS}")
if __name__ == "__main__":