initial code
This commit is contained in:
commit
17b40b13cc
3 changed files with 126 additions and 0 deletions
4
.gitignore
vendored
Normal file
4
.gitignore
vendored
Normal file
|
@ -0,0 +1,4 @@
|
||||||
|
.env
|
||||||
|
.idea/
|
||||||
|
__pycache__/
|
||||||
|
venv/
|
13
Pipfile
Normal file
13
Pipfile
Normal file
|
@ -0,0 +1,13 @@
|
||||||
|
[[source]]
|
||||||
|
name = "pypi"
|
||||||
|
url = "https://pypi.org/simple"
|
||||||
|
verify_ssl = true
|
||||||
|
|
||||||
|
[dev-packages]
|
||||||
|
|
||||||
|
[packages]
|
||||||
|
etcd3 = "*"
|
||||||
|
python-decouple = "*"
|
||||||
|
|
||||||
|
[requires]
|
||||||
|
python_version = "3.7"
|
109
main.py
Normal file
109
main.py
Normal file
|
@ -0,0 +1,109 @@
|
||||||
|
# TODO
|
||||||
|
# send an email to an email address defined by env['admin-email'] if resources are finished
|
||||||
|
# v3) Introduce a status endpoint of the scheduler - maybe expose a prometheus compatible output
|
||||||
|
|
||||||
|
import etcd3
|
||||||
|
import json
|
||||||
|
|
||||||
|
from decouple import config
|
||||||
|
from collections import Counter
|
||||||
|
|
||||||
|
|
||||||
|
class VmPool(object):
|
||||||
|
def __init__(self, etcd_client, vm_prefix):
|
||||||
|
self.client = etcd_client
|
||||||
|
self.vms = []
|
||||||
|
|
||||||
|
_vms = self.client.get_prefix(vm_prefix)
|
||||||
|
self.vms = [(vm[1].key.decode("utf-8"), json.loads(vm[0])) for vm in _vms]
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def by_host(vms, host):
|
||||||
|
return list(filter(lambda x: x[1]["hostname"] == host, vms))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def by_status(vms, status):
|
||||||
|
return list(filter(lambda x: x[1]["status"] == status, vms))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def except_status(vms, status):
|
||||||
|
return list(filter(lambda x: x[1]["status"] != status, vms))
|
||||||
|
|
||||||
|
|
||||||
|
def accumulated_specs(vms):
|
||||||
|
"""Accumulate specs of all :param vms:"""
|
||||||
|
specs = Counter()
|
||||||
|
for vm in vms:
|
||||||
|
_, _value = vm
|
||||||
|
vm_specs = _value["specs"]
|
||||||
|
specs += Counter(vm_specs)
|
||||||
|
return specs
|
||||||
|
|
||||||
|
|
||||||
|
def remaining_resources(host_specs, vms):
|
||||||
|
"""Return remaining resources host_specs - vms"""
|
||||||
|
vms_specs = Counter(vms)
|
||||||
|
remaining = Counter(host_specs)
|
||||||
|
remaining.subtract(vms_specs)
|
||||||
|
|
||||||
|
return remaining
|
||||||
|
|
||||||
|
|
||||||
|
def get_suitable_host(etcd_client, vm_prefix, host_prefix, vm_specs):
|
||||||
|
vm_pool = VmPool(etcd_client, vm_prefix)
|
||||||
|
hosts = client.get_prefix(host_prefix)
|
||||||
|
|
||||||
|
for host in hosts:
|
||||||
|
_host_name, host_specs = host[1].key.decode("utf-8"), json.loads(host[0])
|
||||||
|
|
||||||
|
# Get All Virtual Machines
|
||||||
|
vms = vm_pool.vms
|
||||||
|
|
||||||
|
# Filter them by host_name
|
||||||
|
vms = VmPool.by_host(vms, _host_name)
|
||||||
|
|
||||||
|
# Filter them by status
|
||||||
|
vms = VmPool.except_status(vms, "REQUESTED_NEW")
|
||||||
|
|
||||||
|
# Accumulate all of their combined specs
|
||||||
|
vms_accumulated_specs = accumulated_specs(vms)
|
||||||
|
|
||||||
|
# Find out remaining resources after host_specs - already running vm_specs
|
||||||
|
remaining = remaining_resources(host_specs, vms_accumulated_specs)
|
||||||
|
|
||||||
|
# Find out remaining - new_vm_specs
|
||||||
|
remaining = remaining_resources(remaining, vm_specs)
|
||||||
|
|
||||||
|
# if remaining resources >= 0 return this host_name
|
||||||
|
if all(map(lambda x: True if remaining[x] >= 0 else False, remaining)):
|
||||||
|
return _host_name
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
VM_PREFIX = "/v1/vm/"
|
||||||
|
HOST_PREFIX = "/v1/host/"
|
||||||
|
|
||||||
|
client = etcd3.client(host=config("ETCD_HOST"), port=int(config("ETCD_PORT")))
|
||||||
|
|
||||||
|
while True:
|
||||||
|
events_iterator, cancel = client.watch_prefix(VM_PREFIX)
|
||||||
|
for event in events_iterator:
|
||||||
|
key = event.key
|
||||||
|
value = event.value
|
||||||
|
if not value:
|
||||||
|
continue
|
||||||
|
value = json.loads(event.value)
|
||||||
|
|
||||||
|
print(key, value)
|
||||||
|
|
||||||
|
if value["status"] == "REQUESTED_NEW":
|
||||||
|
host_name = get_suitable_host(client, VM_PREFIX, HOST_PREFIX, value["specs"])
|
||||||
|
print("hostname", host_name)
|
||||||
|
if host_name:
|
||||||
|
value["status"] = "SCHEDULED_DEPLOY"
|
||||||
|
value["hostname"] = host_name
|
||||||
|
client.put(key, json.dumps(value))
|
||||||
|
else:
|
||||||
|
# email admin
|
||||||
|
print("No Resource Left. Emailing admin....")
|
Loading…
Add table
Reference in a new issue