forked from uncloud/uncloud
94 lines
4.5 KiB
Python
94 lines
4.5 KiB
Python
|
# TODO
|
||
|
# 1. send an email to an email address defined by env['admin-email']
|
||
|
# if resources are finished
|
||
|
# 2. Introduce a status endpoint of the scheduler -
|
||
|
# maybe expose a prometheus compatible output
|
||
|
|
||
|
from ucloud.common.request import RequestEntry, RequestType
|
||
|
from ucloud.config import etcd_client
|
||
|
from ucloud.config import host_pool, request_pool, vm_pool, env_vars
|
||
|
from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection,
|
||
|
assign_host, NoSuitableHostFound)
|
||
|
from . import logger
|
||
|
|
||
|
|
||
|
def main():
|
||
|
logger.info("%s SESSION STARTED %s", '*' * 5, '*' * 5)
|
||
|
|
||
|
pending_vms = []
|
||
|
|
||
|
for request_iterator in [
|
||
|
etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True),
|
||
|
etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=5, value_in_json=True),
|
||
|
]:
|
||
|
for request_event in request_iterator:
|
||
|
request_entry = RequestEntry(request_event)
|
||
|
# Never Run time critical mechanism inside timeout
|
||
|
# mechanism because timeout mechanism only comes
|
||
|
# when no other event is happening. It means under
|
||
|
# heavy load there would not be a timeout event.
|
||
|
if request_entry.type == "TIMEOUT":
|
||
|
|
||
|
# Detect hosts that are dead and set their status
|
||
|
# to "DEAD", and their VMs' status to "KILLED"
|
||
|
dead_hosts = dead_host_detection()
|
||
|
if dead_hosts:
|
||
|
logger.debug("Dead hosts: %s", dead_hosts)
|
||
|
dead_host_mitigation(dead_hosts)
|
||
|
|
||
|
# If there are VMs that weren't assigned a host
|
||
|
# because there wasn't a host available which
|
||
|
# meets requirement of that VM then we would
|
||
|
# create a new ScheduleVM request for that VM
|
||
|
# on our behalf.
|
||
|
while pending_vms:
|
||
|
pending_vm_entry = pending_vms.pop()
|
||
|
r = RequestEntry.from_scratch(type="ScheduleVM",
|
||
|
uuid=pending_vm_entry.uuid,
|
||
|
hostname=pending_vm_entry.hostname,
|
||
|
request_prefix=env_vars.get("REQUEST_PREFIX"))
|
||
|
request_pool.put(r)
|
||
|
|
||
|
elif request_entry.type == RequestType.ScheduleVM:
|
||
|
logger.debug("%s, %s", request_entry.key, request_entry.value)
|
||
|
|
||
|
vm_entry = vm_pool.get(request_entry.uuid)
|
||
|
if vm_entry is None:
|
||
|
logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid))
|
||
|
continue
|
||
|
etcd_client.client.delete(request_entry.key) # consume Request
|
||
|
|
||
|
# If the Request is about a VM which is labelled as "migration"
|
||
|
# and has a destination
|
||
|
if hasattr(request_entry, "migration") and request_entry.migration \
|
||
|
and hasattr(request_entry, "destination") and request_entry.destination:
|
||
|
try:
|
||
|
get_suitable_host(vm_specs=vm_entry.specs,
|
||
|
hosts=[host_pool.get(request_entry.destination)])
|
||
|
except NoSuitableHostFound:
|
||
|
logger.info("Requested destination host doesn't have enough capacity"
|
||
|
"to hold %s" % vm_entry.uuid)
|
||
|
else:
|
||
|
r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
|
||
|
uuid=request_entry.uuid,
|
||
|
destination=request_entry.destination,
|
||
|
request_prefix=env_vars.get("REQUEST_PREFIX"))
|
||
|
request_pool.put(r)
|
||
|
|
||
|
# If the Request is about a VM that just want to get started/created
|
||
|
else:
|
||
|
# assign_host only returns None when we couldn't be able to assign
|
||
|
# a host to a VM because of resource constraints
|
||
|
try:
|
||
|
assign_host(vm_entry)
|
||
|
except NoSuitableHostFound:
|
||
|
vm_entry.add_log("Can't schedule VM. No Resource Left.")
|
||
|
vm_pool.put(vm_entry)
|
||
|
|
||
|
pending_vms.append(vm_entry)
|
||
|
logger.info("No Resource Left. Emailing admin....")
|
||
|
|
||
|
|
||
|
if __name__ == "__main__":
|
||
|
main()
|