simplifies logic + add unit tests
This commit is contained in:
parent
17b40b13cc
commit
7354af7e07
5 changed files with 463 additions and 27 deletions
82
main.py
82
main.py
|
|
@ -1,12 +1,17 @@
|
|||
# TODO
|
||||
# send an email to an email address defined by env['admin-email'] if resources are finished
|
||||
# v3) Introduce a status endpoint of the scheduler - maybe expose a prometheus compatible output
|
||||
# 1. on startup check if there is any VM with status REQUESTED_NEW already
|
||||
# 2. send an email to an email address defined by env['admin-email']
|
||||
# if resources are finished
|
||||
# 3. v3) Introduce a status endpoint of the scheduler -
|
||||
# maybe expose a prometheus compatible output
|
||||
|
||||
import etcd3
|
||||
import json
|
||||
import argparse
|
||||
|
||||
from decouple import config
|
||||
from collections import Counter
|
||||
from functools import reduce
|
||||
|
||||
|
||||
class VmPool(object):
|
||||
|
|
@ -15,7 +20,9 @@ class VmPool(object):
|
|||
self.vms = []
|
||||
|
||||
_vms = self.client.get_prefix(vm_prefix)
|
||||
self.vms = [(vm[1].key.decode("utf-8"), json.loads(vm[0])) for vm in _vms]
|
||||
self.vms = [
|
||||
(vm[1].key.decode("utf-8"), json.loads(vm[0])) for vm in _vms
|
||||
]
|
||||
|
||||
@staticmethod
|
||||
def by_host(vms, host):
|
||||
|
|
@ -30,19 +37,15 @@ class VmPool(object):
|
|||
return list(filter(lambda x: x[1]["status"] != status, vms))
|
||||
|
||||
|
||||
def accumulated_specs(vms):
|
||||
"""Accumulate specs of all :param vms:"""
|
||||
specs = Counter()
|
||||
for vm in vms:
|
||||
_, _value = vm
|
||||
vm_specs = _value["specs"]
|
||||
specs += Counter(vm_specs)
|
||||
return specs
|
||||
def accumulated_specs(vms_specs):
|
||||
if vms_specs == []:
|
||||
return {}
|
||||
return reduce((lambda x, y: Counter(x) + Counter(y)), vms_specs)
|
||||
|
||||
|
||||
def remaining_resources(host_specs, vms):
|
||||
def remaining_resources(host_specs, vms_specs):
|
||||
"""Return remaining resources host_specs - vms"""
|
||||
vms_specs = Counter(vms)
|
||||
vms_specs = Counter(vms_specs)
|
||||
remaining = Counter(host_specs)
|
||||
remaining.subtract(vms_specs)
|
||||
|
||||
|
|
@ -51,10 +54,13 @@ def remaining_resources(host_specs, vms):
|
|||
|
||||
def get_suitable_host(etcd_client, vm_prefix, host_prefix, vm_specs):
|
||||
vm_pool = VmPool(etcd_client, vm_prefix)
|
||||
hosts = client.get_prefix(host_prefix)
|
||||
hosts = etcd_client.get_prefix(host_prefix)
|
||||
|
||||
for host in hosts:
|
||||
_host_name, host_specs = host[1].key.decode("utf-8"), json.loads(host[0])
|
||||
_host_name, host_specs = (
|
||||
host[1].key.decode("utf-8"),
|
||||
json.loads(host[0]),
|
||||
)
|
||||
|
||||
# Get All Virtual Machines
|
||||
vms = vm_pool.vms
|
||||
|
|
@ -65,29 +71,37 @@ def get_suitable_host(etcd_client, vm_prefix, host_prefix, vm_specs):
|
|||
# Filter them by status
|
||||
vms = VmPool.except_status(vms, "REQUESTED_NEW")
|
||||
|
||||
# Accumulate all of their combined specs
|
||||
vms_accumulated_specs = accumulated_specs(vms)
|
||||
running_vms_specs = [vm[1]["specs"] for vm in vms]
|
||||
|
||||
# Find out remaining resources after host_specs - already running vm_specs
|
||||
remaining = remaining_resources(host_specs, vms_accumulated_specs)
|
||||
# Accumulate all of their combined specs
|
||||
running_vms_accumulated_specs = accumulated_specs(running_vms_specs)
|
||||
|
||||
# Find out remaining resources after
|
||||
# host_specs - already running vm_specs
|
||||
remaining = remaining_resources(
|
||||
host_specs, running_vms_accumulated_specs
|
||||
)
|
||||
|
||||
# Find out remaining - new_vm_specs
|
||||
remaining = remaining_resources(remaining, vm_specs)
|
||||
|
||||
# if remaining resources >= 0 return this host_name
|
||||
if all(map(lambda x: True if remaining[x] >= 0 else False, remaining)):
|
||||
if all(
|
||||
map(lambda x: True if remaining[x] >= 0 else False, remaining)
|
||||
):
|
||||
return _host_name
|
||||
|
||||
return None
|
||||
|
||||
|
||||
VM_PREFIX = "/v1/vm/"
|
||||
HOST_PREFIX = "/v1/host/"
|
||||
def main(vm_prefix, host_prefix):
|
||||
|
||||
client = etcd3.client(host=config("ETCD_HOST"), port=int(config("ETCD_PORT")))
|
||||
client = etcd3.client(
|
||||
host=config("ETCD_HOST"), port=int(config("ETCD_PORT"))
|
||||
)
|
||||
|
||||
events_iterator, _ = client.watch_prefix(vm_prefix)
|
||||
|
||||
while True:
|
||||
events_iterator, cancel = client.watch_prefix(VM_PREFIX)
|
||||
for event in events_iterator:
|
||||
key = event.key
|
||||
value = event.value
|
||||
|
|
@ -98,8 +112,9 @@ while True:
|
|||
print(key, value)
|
||||
|
||||
if value["status"] == "REQUESTED_NEW":
|
||||
host_name = get_suitable_host(client, VM_PREFIX, HOST_PREFIX, value["specs"])
|
||||
print("hostname", host_name)
|
||||
host_name = get_suitable_host(
|
||||
client, vm_prefix, host_prefix, value["specs"]
|
||||
)
|
||||
if host_name:
|
||||
value["status"] = "SCHEDULED_DEPLOY"
|
||||
value["hostname"] = host_name
|
||||
|
|
@ -107,3 +122,16 @@ while True:
|
|||
else:
|
||||
# email admin
|
||||
print("No Resource Left. Emailing admin....")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
argparser = argparse.ArgumentParser()
|
||||
argparser.add_argument(
|
||||
"--vm_prefix", required=False, default=config("VM_PREFIX")
|
||||
)
|
||||
argparser.add_argument(
|
||||
"--host_prefix", required=False, default=config("HOST_PREFIX")
|
||||
)
|
||||
args = argparser.parse_args()
|
||||
|
||||
main(args.vm_prefix, args.host_prefix)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue