a
This commit is contained in:
parent
d468a1ba7d
commit
e70270041a
2 changed files with 69 additions and 69 deletions
20
config.py
20
config.py
|
|
@ -1,4 +1,24 @@
|
||||||
|
import logging
|
||||||
|
|
||||||
from decouple import config
|
from decouple import config
|
||||||
from etcd3_wrapper import Etcd3Wrapper
|
from etcd3_wrapper import Etcd3Wrapper
|
||||||
|
from ucloud_common.vm import VmPool
|
||||||
|
from ucloud_common.host import HostPool
|
||||||
|
from ucloud_common.request import RequestPool
|
||||||
|
|
||||||
|
logging.basicConfig(
|
||||||
|
level=logging.DEBUG,
|
||||||
|
filename="log.txt",
|
||||||
|
filemode="a",
|
||||||
|
format="%(asctime)s: %(levelname)s - %(message)s",
|
||||||
|
datefmt="%d-%b-%y %H:%M:%S",
|
||||||
|
)
|
||||||
|
|
||||||
|
vm_prefix = config("VM_PREFIX")
|
||||||
|
host_prefix = config("HOST_PREFIX")
|
||||||
|
request_prefix = config("REQUEST_PREFIX")
|
||||||
|
|
||||||
etcd_client = Etcd3Wrapper(host=config("ETCD_URL"))
|
etcd_client = Etcd3Wrapper(host=config("ETCD_URL"))
|
||||||
|
vm_pool = VmPool(etcd_client, vm_prefix)
|
||||||
|
host_pool = HostPool(etcd_client, host_prefix)
|
||||||
|
request_pool = RequestPool(etcd_client, request_prefix)
|
||||||
118
main.py
118
main.py
|
|
@ -1,105 +1,85 @@
|
||||||
# TODO
|
# TODO
|
||||||
# 1. send an email to an email address defined by env['admin-email']
|
# 1. send an email to an email address defined by env['admin-email']
|
||||||
# if resources are finished
|
# if resources are finished
|
||||||
# 2. v3) Introduce a status endpoint of the scheduler -
|
# 2. Introduce a status endpoint of the scheduler -
|
||||||
# maybe expose a prometheus compatible output
|
# maybe expose a prometheus compatible output
|
||||||
|
|
||||||
import argparse
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from decouple import config
|
|
||||||
|
|
||||||
from config import etcd_client as client
|
from config import etcd_client as client
|
||||||
|
from config import (host_pool, request_pool, vm_pool, request_prefix)
|
||||||
from ucloud_common.vm import VmPool
|
from ucloud_common.request import RequestEntry, RequestType
|
||||||
from ucloud_common.host import HostPool
|
|
||||||
from ucloud_common.request import RequestEntry, RequestPool, RequestType
|
|
||||||
from helper import (get_suitable_host, dead_host_mitigation, dead_host_detection,
|
from helper import (get_suitable_host, dead_host_mitigation, dead_host_detection,
|
||||||
assign_host)
|
assign_host)
|
||||||
|
|
||||||
logging.basicConfig(
|
pending_vms = []
|
||||||
level=logging.DEBUG,
|
|
||||||
filename="log.txt",
|
|
||||||
filemode="a",
|
|
||||||
format="%(asctime)s: %(levelname)s - %(message)s",
|
|
||||||
datefmt="%d-%b-%y %H:%M:%S",
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def main(vm_prefix, host_prefix, request_prefix):
|
def main():
|
||||||
logging.info(f"{'*' * 5} SESSION STARTED {'*' * 5}")
|
global pending_vms
|
||||||
|
|
||||||
vm_pool = VmPool(client, vm_prefix)
|
for request_iterator in [
|
||||||
host_pool = HostPool(client, host_prefix)
|
|
||||||
request_pool = RequestPool(client, request_prefix)
|
|
||||||
|
|
||||||
PENDING_VMS = []
|
|
||||||
for events_iterator in [
|
|
||||||
client.get_prefix(request_prefix, value_in_json=True),
|
client.get_prefix(request_prefix, value_in_json=True),
|
||||||
client.watch_prefix(request_prefix, timeout=5, value_in_json=True),
|
client.watch_prefix(request_prefix, timeout=5, value_in_json=True),
|
||||||
]:
|
]:
|
||||||
for e in events_iterator:
|
for request_event in request_iterator:
|
||||||
if not e.value:
|
request_entry = RequestEntry(request_event)
|
||||||
continue
|
logging.debug(f"{request_entry.key}, {request_entry.value}")
|
||||||
e = RequestEntry(e)
|
|
||||||
logging.debug(f"{e.key}, {e.value}")
|
|
||||||
|
|
||||||
# Never Run time critical mechanism inside timeout
|
# Never Run time critical mechanism inside timeout
|
||||||
# mechanism because timeout mechanism only comes
|
# mechanism because timeout mechanism only comes
|
||||||
# when no other event is happening. It means under
|
# when no other event is happening. It means under
|
||||||
# heavy load there would not be a timeout<
|
# heavy load there would not be a timeout event.
|
||||||
if e.type == "TIMEOUT":
|
if request_entry.type == "TIMEOUT":
|
||||||
|
|
||||||
|
# Detect hosts that are dead and set their status
|
||||||
|
# to "DEAD", and their VMs' status to "KILLED"
|
||||||
logging.debug("TIMEOUT event occured")
|
logging.debug("TIMEOUT event occured")
|
||||||
dead_hosts = dead_host_detection()
|
dead_hosts = dead_host_detection()
|
||||||
logging.debug(f"Dead hosts: {dead_hosts}")
|
logging.debug(f"Dead hosts: {dead_hosts}")
|
||||||
dead_host_mitigation(dead_hosts)
|
dead_host_mitigation(dead_hosts)
|
||||||
#
|
|
||||||
# vm_scheduled = []
|
|
||||||
# for vm in PENDING_VMS:
|
|
||||||
# if assign_host(vm) is not None:
|
|
||||||
# vm_scheduled.append(vm)
|
|
||||||
#
|
|
||||||
# for vm in vm_scheduled:
|
|
||||||
# PENDING_VMS.remove(vm)
|
|
||||||
# logging.debug(f"Remaining Pending: {PENDING_VMS}")
|
|
||||||
|
|
||||||
elif e.type == RequestType.ScheduleVM:
|
# If there are VMs that weren't assigned a host
|
||||||
if hasattr(e, "migration") and e.migration and\
|
# because there wasn't a host available which
|
||||||
hasattr(e, "destination") and e.destination:
|
# meets requirement of that VM then we would
|
||||||
client.client.delete(e.key)
|
# create a new ScheduleVM request for that VM
|
||||||
vm = vm_pool.get(e.uuid)
|
# on our behalf.
|
||||||
host = get_suitable_host(vm.specs, [host_pool.get(e.destination)])
|
while pending_vms:
|
||||||
|
pending_vm_entry = pending_vms.pop()
|
||||||
|
r = RequestEntry.from_scratch(type=f"ScheduleVM",
|
||||||
|
uuid=pending_vm_entry.uuid,
|
||||||
|
hostname=pending_vm_entry.hostname)
|
||||||
|
request_pool.put(r)
|
||||||
|
|
||||||
|
logging.debug(f"Remaining Pending: {pending_vms}")
|
||||||
|
|
||||||
|
elif request_entry.type == RequestType.ScheduleVM:
|
||||||
|
vm_entry = vm_pool.get(request_entry.uuid)
|
||||||
|
client.client.delete(request_entry.key) # consume Request
|
||||||
|
|
||||||
|
# If Request is about a VM which is labelled as "migration"
|
||||||
|
# and has a destination
|
||||||
|
if hasattr(request_entry, "migration") and request_entry.migration \
|
||||||
|
and hasattr(request_entry, "destination") and request_entry.destination:
|
||||||
|
host = get_suitable_host(vm_entry.specs, [host_pool.get(request_entry.destination)])
|
||||||
if host:
|
if host:
|
||||||
r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
|
r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
|
||||||
uuid=e.uuid, destination=e.destination)
|
uuid=request_entry.uuid,
|
||||||
|
destination=request_entry.destination)
|
||||||
request_pool.put(r)
|
request_pool.put(r)
|
||||||
print(host, e)
|
|
||||||
else:
|
else:
|
||||||
logging.info("Requested destination host doesn't have enough capacity"
|
logging.info("Requested destination host doesn't have enough capacity"
|
||||||
f"to hold {vm.uuid}")
|
f"to hold {vm_entry.uuid}")
|
||||||
else:
|
else:
|
||||||
client.client.delete(e.key)
|
# assign_host only returns None when we couldn't be able to assign
|
||||||
vm = vm_pool.get(e.uuid)
|
# a host to a VM because of resource constraints
|
||||||
if assign_host(vm) is None:
|
if assign_host(vm_entry) is None:
|
||||||
vm.log.append("Can't schedule VM. No Resource Left.")
|
vm_entry.log.append("Can't schedule VM. No Resource Left.")
|
||||||
vm_pool.put(vm)
|
vm_pool.put(vm_entry)
|
||||||
|
|
||||||
PENDING_VMS.append(vm)
|
pending_vms.append(vm_entry)
|
||||||
logging.info("No Resource Left. Emailing admin....")
|
logging.info("No Resource Left. Emailing admin....")
|
||||||
logging.debug(f"Pending VMS: {PENDING_VMS}")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
logging.info(f"{'*' * 5} SESSION STARTED {'*' * 5}")
|
||||||
argparser = argparse.ArgumentParser()
|
main()
|
||||||
argparser.add_argument(
|
|
||||||
"--vm_prefix", required=False, default=config("VM_PREFIX")
|
|
||||||
)
|
|
||||||
argparser.add_argument(
|
|
||||||
"--host_prefix", required=False, default=config("HOST_PREFIX")
|
|
||||||
)
|
|
||||||
argparser.add_argument(
|
|
||||||
"--request_prefix", required=False, default=config("REQUEST_PREFIX")
|
|
||||||
)
|
|
||||||
args = argparser.parse_args()
|
|
||||||
|
|
||||||
main(args.vm_prefix, args.host_prefix, args.request_prefix)
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue