Efforts to make ucloud a python package

This commit is contained in:
ahmadbilalkhalid 2019-12-03 15:40:41 +05:00
commit 1e7300b56e
71 changed files with 241 additions and 1043 deletions

View file

@ -0,0 +1,3 @@
import logging
logger = logging.getLogger(__name__)

113
ucloud/scheduler/helper.py Executable file
View file

@ -0,0 +1,113 @@
from collections import Counter
from functools import reduce
import bitmath
from ucloud.common.host import HostStatus
from ucloud.common.request import RequestEntry, RequestType
from ucloud.common.vm import VMStatus
from ucloud.config import vm_pool, host_pool, request_pool, env_vars
def accumulated_specs(vms_specs):
if not vms_specs:
return {}
return reduce((lambda x, y: Counter(x) + Counter(y)), vms_specs)
def remaining_resources(host_specs, vms_specs):
# Return remaining resources host_specs - vms
_vms_specs = Counter(vms_specs)
_remaining = Counter(host_specs)
for component in _vms_specs:
if isinstance(_vms_specs[component], str):
_vms_specs[component] = int(bitmath.parse_string_unsafe(_vms_specs[component]).to_MB())
elif isinstance(_vms_specs[component], list):
_vms_specs[component] = map(lambda x: int(bitmath.parse_string_unsafe(x).to_MB()), _vms_specs[component])
_vms_specs[component] = reduce(lambda x, y: x + y, _vms_specs[component], 0)
for component in _remaining:
if isinstance(_remaining[component], str):
_remaining[component] = int(bitmath.parse_string_unsafe(_remaining[component]).to_MB())
elif isinstance(_remaining[component], list):
_remaining[component] = map(lambda x: int(bitmath.parse_string_unsafe(x).to_MB()), _remaining[component])
_remaining[component] = reduce(lambda x, y: x + y, _remaining[component], 0)
_remaining.subtract(_vms_specs)
return _remaining
class NoSuitableHostFound(Exception):
"""Exception when no host found that can host a VM."""
def get_suitable_host(vm_specs, hosts=None):
if hosts is None:
hosts = host_pool.by_status(HostStatus.alive)
for host in hosts:
# Filter them by host_name
vms = vm_pool.by_host(host.key)
# Filter them by status
vms = vm_pool.by_status(VMStatus.running, vms)
running_vms_specs = [vm.specs for vm in vms]
# Accumulate all of their combined specs
running_vms_accumulated_specs = accumulated_specs(running_vms_specs)
# Find out remaining resources after
# host_specs - already running vm_specs
remaining = remaining_resources(host.specs, running_vms_accumulated_specs)
# Find out remaining - new_vm_specs
remaining = remaining_resources(remaining, vm_specs)
if all(map(lambda x: x >= 0, remaining.values())):
return host.key
raise NoSuitableHostFound
def dead_host_detection():
# Bring out your dead! - Monty Python and the Holy Grail
hosts = host_pool.by_status(HostStatus.alive)
dead_hosts_keys = []
for host in hosts:
# Only check those who claims to be alive
if host.status == HostStatus.alive:
if not host.is_alive():
dead_hosts_keys.append(host.key)
return dead_hosts_keys
def dead_host_mitigation(dead_hosts_keys):
for host_key in dead_hosts_keys:
host = host_pool.get(host_key)
host.declare_dead()
vms_hosted_on_dead_host = vm_pool.by_host(host_key)
for vm in vms_hosted_on_dead_host:
vm.declare_killed()
vm_pool.put(vm)
host_pool.put(host)
def assign_host(vm):
vm.hostname = get_suitable_host(vm.specs)
vm_pool.put(vm)
r = RequestEntry.from_scratch(type=RequestType.StartVM,
uuid=vm.uuid,
hostname=vm.hostname,
request_prefix=env_vars.get("REQUEST_PREFIX"))
request_pool.put(r)
vm.log.append("VM scheduled for starting")
return vm.hostname

93
ucloud/scheduler/main.py Executable file
View file

@ -0,0 +1,93 @@
# TODO
# 1. send an email to an email address defined by env['admin-email']
# if resources are finished
# 2. Introduce a status endpoint of the scheduler -
# maybe expose a prometheus compatible output
from ucloud.common.request import RequestEntry, RequestType
from ucloud.config import etcd_client
from ucloud.config import host_pool, request_pool, vm_pool, env_vars
from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection,
assign_host, NoSuitableHostFound)
from ucloud.scheduler import logger
def main():
logger.info("%s SESSION STARTED %s", '*' * 5, '*' * 5)
pending_vms = []
for request_iterator in [
etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True),
etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=5, value_in_json=True),
]:
for request_event in request_iterator:
request_entry = RequestEntry(request_event)
# Never Run time critical mechanism inside timeout
# mechanism because timeout mechanism only comes
# when no other event is happening. It means under
# heavy load there would not be a timeout event.
if request_entry.type == "TIMEOUT":
# Detect hosts that are dead and set their status
# to "DEAD", and their VMs' status to "KILLED"
dead_hosts = dead_host_detection()
if dead_hosts:
logger.debug("Dead hosts: %s", dead_hosts)
dead_host_mitigation(dead_hosts)
# If there are VMs that weren't assigned a host
# because there wasn't a host available which
# meets requirement of that VM then we would
# create a new ScheduleVM request for that VM
# on our behalf.
while pending_vms:
pending_vm_entry = pending_vms.pop()
r = RequestEntry.from_scratch(type="ScheduleVM",
uuid=pending_vm_entry.uuid,
hostname=pending_vm_entry.hostname,
request_prefix=env_vars.get("REQUEST_PREFIX"))
request_pool.put(r)
elif request_entry.type == RequestType.ScheduleVM:
logger.debug("%s, %s", request_entry.key, request_entry.value)
vm_entry = vm_pool.get(request_entry.uuid)
if vm_entry is None:
logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid))
continue
etcd_client.client.delete(request_entry.key) # consume Request
# If the Request is about a VM which is labelled as "migration"
# and has a destination
if hasattr(request_entry, "migration") and request_entry.migration \
and hasattr(request_entry, "destination") and request_entry.destination:
try:
get_suitable_host(vm_specs=vm_entry.specs,
hosts=[host_pool.get(request_entry.destination)])
except NoSuitableHostFound:
logger.info("Requested destination host doesn't have enough capacity"
"to hold %s" % vm_entry.uuid)
else:
r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
uuid=request_entry.uuid,
destination=request_entry.destination,
request_prefix=env_vars.get("REQUEST_PREFIX"))
request_pool.put(r)
# If the Request is about a VM that just want to get started/created
else:
# assign_host only returns None when we couldn't be able to assign
# a host to a VM because of resource constraints
try:
assign_host(vm_entry)
except NoSuitableHostFound:
vm_entry.add_log("Can't schedule VM. No Resource Left.")
vm_pool.put(vm_entry)
pending_vms.append(vm_entry)
logger.info("No Resource Left. Emailing admin....")
if __name__ == "__main__":
main()

View file

View file

@ -0,0 +1,210 @@
import json
import multiprocessing
import sys
import unittest
from datetime import datetime
from os.path import dirname
BASE_DIR = dirname(dirname(__file__))
sys.path.insert(0, BASE_DIR)
from main import (
accumulated_specs,
remaining_resources,
VmPool,
main,
)
from ucloud.config import etcd_client
class TestFunctions(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.client = etcd_client
cls.host_prefix = "/test/host"
cls.vm_prefix = "/test/vm"
# These deletion could also be in
# tearDown() but it is more appropriate here
# as it enable us to check the ETCD store
# even after test is run
cls.client.client.delete_prefix(cls.host_prefix)
cls.client.client.delete_prefix(cls.vm_prefix)
cls.create_hosts(cls)
cls.create_vms(cls)
cls.p = multiprocessing.Process(
target=main, args=[cls.vm_prefix, cls.host_prefix]
)
cls.p.start()
@classmethod
def tearDownClass(cls):
cls.p.terminate()
def create_hosts(self):
host1 = {
"cpu": 32,
"ram": 128,
"hdd": 1024,
"sdd": 0,
"status": "ALIVE",
"last_heartbeat": datetime.utcnow().isoformat(),
}
host2 = {
"cpu": 16,
"ram": 64,
"hdd": 512,
"sdd": 0,
"status": "ALIVE",
"last_heartbeat": datetime.utcnow().isoformat(),
}
host3 = {
"cpu": 16,
"ram": 32,
"hdd": 256,
"sdd": 256,
"status": "ALIVE",
"last_heartbeat": datetime.utcnow().isoformat(),
}
with self.client.client.lock("lock"):
self.client.put(f"{self.host_prefix}/1", host1, value_in_json=True)
self.client.put(f"{self.host_prefix}/2", host2, value_in_json=True)
self.client.put(f"{self.host_prefix}/3", host3, value_in_json=True)
def create_vms(self):
vm1 = json.dumps(
{
"owner": "meow",
"specs": {"cpu": 4, "ram": 8, "hdd": 100, "sdd": 256},
"hostname": "",
"status": "REQUESTED_NEW",
}
)
vm2 = json.dumps(
{
"owner": "meow",
"specs": {"cpu": 16, "ram": 64, "hdd": 512, "sdd": 0},
"hostname": "",
"status": "REQUESTED_NEW",
}
)
vm3 = json.dumps(
{
"owner": "meow",
"specs": {"cpu": 16, "ram": 32, "hdd": 128, "sdd": 0},
"hostname": "",
"status": "REQUESTED_NEW",
}
)
vm4 = json.dumps(
{
"owner": "meow",
"specs": {"cpu": 16, "ram": 64, "hdd": 512, "sdd": 0},
"hostname": "",
"status": "REQUESTED_NEW",
}
)
vm5 = json.dumps(
{
"owner": "meow",
"specs": {"cpu": 2, "ram": 2, "hdd": 10, "sdd": 0},
"hostname": "",
"status": "REQUESTED_NEW",
}
)
vm6 = json.dumps(
{
"owner": "meow",
"specs": {"cpu": 10, "ram": 22, "hdd": 146, "sdd": 0},
"hostname": "",
"status": "REQUESTED_NEW",
}
)
vm7 = json.dumps(
{
"owner": "meow",
"specs": {"cpu": 10, "ram": 22, "hdd": 146, "sdd": 0},
"hostname": "",
"status": "REQUESTED_NEW",
}
)
self.client.put(f"{self.vm_prefix}/1", vm1)
self.client.put(f"{self.vm_prefix}/2", vm2)
self.client.put(f"{self.vm_prefix}/3", vm3)
self.client.put(f"{self.vm_prefix}/4", vm4)
self.client.put(f"{self.vm_prefix}/5", vm5)
self.client.put(f"{self.vm_prefix}/6", vm6)
self.client.put(f"{self.vm_prefix}/7", vm7)
def test_accumulated_specs(self):
vms = [
{"ssd": 10, "cpu": 4, "ram": 8},
{"hdd": 10, "cpu": 4, "ram": 8},
{"cpu": 8, "ram": 32},
]
self.assertEqual(
accumulated_specs(vms), {"ssd": 10, "cpu": 16, "ram": 48, "hdd": 10}
)
def test_remaining_resources(self):
host_specs = {"ssd": 10, "cpu": 16, "ram": 48, "hdd": 10}
vms_specs = {"ssd": 10, "cpu": 32, "ram": 12, "hdd": 0}
resultant_specs = {"ssd": 0, "cpu": -16, "ram": 36, "hdd": 10}
self.assertEqual(remaining_resources(host_specs, vms_specs),
resultant_specs)
def test_vmpool(self):
self.p.join(1)
vm_pool = VmPool(self.client, self.vm_prefix)
# vm_pool by host
actual = vm_pool.by_host(vm_pool.vms, f"{self.host_prefix}/3")
ground_truth = [
(
f"{self.vm_prefix}/1",
{
"owner": "meow",
"specs": {"cpu": 4, "ram": 8, "hdd": 100, "sdd": 256},
"hostname": f"{self.host_prefix}/3",
"status": "SCHEDULED_DEPLOY",
},
)
]
self.assertEqual(actual[0], ground_truth[0])
# vm_pool by status
actual = vm_pool.by_status(vm_pool.vms, "REQUESTED_NEW")
ground_truth = [
(
f"{self.vm_prefix}/7",
{
"owner": "meow",
"specs": {"cpu": 10, "ram": 22, "hdd": 146, "sdd": 0},
"hostname": "",
"status": "REQUESTED_NEW",
},
)
]
self.assertEqual(actual[0], ground_truth[0])
# vm_pool by except status
actual = vm_pool.except_status(vm_pool.vms, "SCHEDULED_DEPLOY")
ground_truth = [
(
f"{self.vm_prefix}/7",
{
"owner": "meow",
"specs": {"cpu": 10, "ram": 22, "hdd": 146, "sdd": 0},
"hostname": "",
"status": "REQUESTED_NEW",
},
)
]
self.assertEqual(actual[0], ground_truth[0])
if __name__ == "__main__":
unittest.main()

View file

@ -0,0 +1,75 @@
import sys
import unittest
from datetime import datetime
from os.path import dirname
BASE_DIR = dirname(dirname(__file__))
sys.path.insert(0, BASE_DIR)
from main import (
dead_host_detection,
dead_host_mitigation,
config
)
class TestDeadHostMechanism(unittest.TestCase):
def setUp(self):
self.client = config.etcd_client
self.host_prefix = "/test/host"
self.vm_prefix = "/test/vm"
self.client.client.delete_prefix(self.host_prefix)
self.client.client.delete_prefix(self.vm_prefix)
self.create_hosts()
def create_hosts(self):
host1 = {
"cpu": 32,
"ram": 128,
"hdd": 1024,
"sdd": 0,
"status": "ALIVE",
"last_heartbeat": datetime.utcnow().isoformat(),
}
host2 = {
"cpu": 16,
"ram": 64,
"hdd": 512,
"sdd": 0,
"status": "ALIVE",
"last_heartbeat": datetime(2011, 1, 1).isoformat(),
}
host3 = {"cpu": 16, "ram": 32, "hdd": 256, "sdd": 256}
host4 = {
"cpu": 16,
"ram": 32,
"hdd": 256,
"sdd": 256,
"status": "DEAD",
"last_heartbeat": datetime(2011, 1, 1).isoformat(),
}
with self.client.client.lock("lock"):
self.client.put(f"{self.host_prefix}/1", host1, value_in_json=True)
self.client.put(f"{self.host_prefix}/2", host2, value_in_json=True)
self.client.put(f"{self.host_prefix}/3", host3, value_in_json=True)
self.client.put(f"{self.host_prefix}/4", host4, value_in_json=True)
def test_dead_host_detection(self):
hosts = self.client.get_prefix(self.host_prefix, value_in_json=True)
deads = dead_host_detection(hosts)
self.assertEqual(deads, ["/test/host/2", "/test/host/3"])
return deads
def test_dead_host_mitigation(self):
deads = self.test_dead_host_detection()
dead_host_mitigation(self.client, deads)
hosts = self.client.get_prefix(self.host_prefix, value_in_json=True)
deads = dead_host_detection(hosts)
self.assertEqual(deads, [])
if __name__ == "__main__":
unittest.main()