Request mechanism rolled out, migration mechanism is also incorporated
This commit is contained in:
parent
0c1e9328e8
commit
726839f4c6
2 changed files with 171 additions and 122 deletions
117
helper.py
Normal file
117
helper.py
Normal file
|
@ -0,0 +1,117 @@
|
||||||
|
from collections import Counter
|
||||||
|
from functools import reduce
|
||||||
|
|
||||||
|
from ucloud_common.vm import VmPool, VMStatus
|
||||||
|
from ucloud_common.host import HostPool, HostStatus
|
||||||
|
from ucloud_common.request import RequestEntry, RequestPool, RequestType
|
||||||
|
|
||||||
|
from etcd3_wrapper import Etcd3Wrapper
|
||||||
|
from decouple import config
|
||||||
|
|
||||||
|
client = Etcd3Wrapper(
|
||||||
|
host=config("ETCD_HOST"), port=int(config("ETCD_PORT"))
|
||||||
|
)
|
||||||
|
vm_pool = VmPool(client, config("VM_PREFIX"))
|
||||||
|
host_pool = HostPool(client, config("HOST_PREFIX"))
|
||||||
|
request_pool = RequestPool(client, config("REQUEST_PREFIX"))
|
||||||
|
|
||||||
|
|
||||||
|
def accumulated_specs(vms_specs):
|
||||||
|
if not vms_specs:
|
||||||
|
return {}
|
||||||
|
return reduce((lambda x, y: Counter(x) + Counter(y)), vms_specs)
|
||||||
|
|
||||||
|
|
||||||
|
def remaining_resources(host_specs, vms_specs):
|
||||||
|
"""Return remaining resources host_specs - vms"""
|
||||||
|
vms_specs = Counter(vms_specs)
|
||||||
|
remaining = Counter(host_specs)
|
||||||
|
remaining.subtract(vms_specs)
|
||||||
|
|
||||||
|
return remaining
|
||||||
|
|
||||||
|
|
||||||
|
def get_suitable_host(vm_specs, hosts=None):
|
||||||
|
if hosts is None:
|
||||||
|
hosts = host_pool.by_status(HostStatus.alive)
|
||||||
|
|
||||||
|
for host in hosts:
|
||||||
|
# Filter them by host_name
|
||||||
|
vms = vm_pool.by_host(host.key)
|
||||||
|
|
||||||
|
# Filter them by status
|
||||||
|
vms = vm_pool.except_status(VMStatus.requested_new, vms)
|
||||||
|
|
||||||
|
running_vms_specs = [vm.specs for vm in vms]
|
||||||
|
# Accumulate all of their combined specs
|
||||||
|
running_vms_accumulated_specs = accumulated_specs(
|
||||||
|
running_vms_specs
|
||||||
|
)
|
||||||
|
|
||||||
|
# Find out remaining resources after
|
||||||
|
# host_specs - already running vm_specs
|
||||||
|
remaining = remaining_resources(
|
||||||
|
host.specs, running_vms_accumulated_specs
|
||||||
|
)
|
||||||
|
|
||||||
|
# Find out remaining - new_vm_specs
|
||||||
|
remaining = remaining_resources(remaining, vm_specs)
|
||||||
|
# if remaining resources >= 0 return this host_name
|
||||||
|
if all(
|
||||||
|
map(
|
||||||
|
lambda x: True if remaining[x] >= 0 else False,
|
||||||
|
remaining,
|
||||||
|
)
|
||||||
|
):
|
||||||
|
return host.key
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def dead_host_detection():
|
||||||
|
# Bring out your dead! - Monty Python and the Holy Grail
|
||||||
|
hosts = host_pool.by_status(HostStatus.alive)
|
||||||
|
dead_hosts_keys = []
|
||||||
|
|
||||||
|
for host in hosts:
|
||||||
|
# Only check those who claims to be alive
|
||||||
|
if host.status == HostStatus.alive:
|
||||||
|
if not host.is_alive():
|
||||||
|
dead_hosts_keys.append(host.key)
|
||||||
|
|
||||||
|
return dead_hosts_keys
|
||||||
|
|
||||||
|
|
||||||
|
def dead_host_mitigation(dead_hosts_keys):
|
||||||
|
for host_key in dead_hosts_keys:
|
||||||
|
host = host_pool.get(host_key)
|
||||||
|
host.declare_dead()
|
||||||
|
|
||||||
|
vms_hosted_on_dead_host = vm_pool.by_host(host_key)
|
||||||
|
for vm in vms_hosted_on_dead_host:
|
||||||
|
vm.declare_killed()
|
||||||
|
vm_pool.put(vm)
|
||||||
|
host_pool.put(host)
|
||||||
|
|
||||||
|
|
||||||
|
def assign_host(vm):
|
||||||
|
host_name = get_suitable_host(vm.specs)
|
||||||
|
if host_name:
|
||||||
|
# if vm.status == VMStatus.requested_new:
|
||||||
|
# vm.status = VMStatus.scheduled_deploy
|
||||||
|
#
|
||||||
|
# if vm.status == VMStatus.killed:
|
||||||
|
# vm.status = VMStatus.requested_start
|
||||||
|
|
||||||
|
vm.hostname = host_name
|
||||||
|
vm_pool.put(vm)
|
||||||
|
|
||||||
|
r = RequestEntry.from_scratch(type=RequestType.StartVM,
|
||||||
|
uuid=vm.uuid,
|
||||||
|
hostname=vm.hostname)
|
||||||
|
request_pool.put(r)
|
||||||
|
|
||||||
|
vm.log.append("VM scheduled for starting")
|
||||||
|
|
||||||
|
return host_name
|
||||||
|
return None
|
176
main.py
176
main.py
|
@ -8,12 +8,14 @@ import argparse
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from decouple import config
|
from decouple import config
|
||||||
from collections import Counter
|
|
||||||
from functools import reduce
|
|
||||||
from etcd3_wrapper import Etcd3Wrapper
|
from etcd3_wrapper import Etcd3Wrapper
|
||||||
|
|
||||||
from ucloud_common.vm import VmPool, VMEntry, VMStatus
|
from ucloud_common.vm import VmPool
|
||||||
from ucloud_common.host import HostPool, HostStatus
|
from ucloud_common.host import HostPool
|
||||||
|
from ucloud_common.request import RequestEntry, RequestPool, RequestType
|
||||||
|
from helper import (get_suitable_host, dead_host_mitigation, dead_host_detection,
|
||||||
|
assign_host)
|
||||||
|
|
||||||
logging.basicConfig(
|
logging.basicConfig(
|
||||||
level=logging.DEBUG,
|
level=logging.DEBUG,
|
||||||
|
@ -23,143 +25,70 @@ logging.basicConfig(
|
||||||
datefmt="%d-%b-%y %H:%M:%S",
|
datefmt="%d-%b-%y %H:%M:%S",
|
||||||
)
|
)
|
||||||
|
|
||||||
VM_POOL = None
|
|
||||||
HOST_POOL = None
|
|
||||||
|
|
||||||
|
def main(vm_prefix, host_prefix, request_prefix):
|
||||||
def accumulated_specs(vms_specs):
|
|
||||||
if not vms_specs:
|
|
||||||
return {}
|
|
||||||
return reduce((lambda x, y: Counter(x) + Counter(y)), vms_specs)
|
|
||||||
|
|
||||||
|
|
||||||
def remaining_resources(host_specs, vms_specs):
|
|
||||||
"""Return remaining resources host_specs - vms"""
|
|
||||||
vms_specs = Counter(vms_specs)
|
|
||||||
remaining = Counter(host_specs)
|
|
||||||
remaining.subtract(vms_specs)
|
|
||||||
|
|
||||||
return remaining
|
|
||||||
|
|
||||||
|
|
||||||
def get_suitable_host(vm_specs):
|
|
||||||
hosts = HOST_POOL.by_status(HostStatus.alive)
|
|
||||||
|
|
||||||
for host in hosts:
|
|
||||||
# Filter them by host_name
|
|
||||||
vms = VM_POOL.by_host(host.key)
|
|
||||||
|
|
||||||
# Filter them by status
|
|
||||||
vms = VM_POOL.except_status(VMStatus.requested_new, vms)
|
|
||||||
|
|
||||||
running_vms_specs = [vm.specs for vm in vms]
|
|
||||||
# Accumulate all of their combined specs
|
|
||||||
running_vms_accumulated_specs = accumulated_specs(
|
|
||||||
running_vms_specs
|
|
||||||
)
|
|
||||||
|
|
||||||
# Find out remaining resources after
|
|
||||||
# host_specs - already running vm_specs
|
|
||||||
remaining = remaining_resources(
|
|
||||||
host.specs, running_vms_accumulated_specs
|
|
||||||
)
|
|
||||||
|
|
||||||
# Find out remaining - new_vm_specs
|
|
||||||
remaining = remaining_resources(remaining, vm_specs)
|
|
||||||
# if remaining resources >= 0 return this host_name
|
|
||||||
if all(
|
|
||||||
map(
|
|
||||||
lambda x: True if remaining[x] >= 0 else False,
|
|
||||||
remaining,
|
|
||||||
)
|
|
||||||
):
|
|
||||||
return host.key
|
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def dead_host_detection():
|
|
||||||
# Bring out your dead! - Monty Python and the Holy Grail
|
|
||||||
hosts = HOST_POOL.by_status(HostStatus.alive)
|
|
||||||
dead_hosts_keys = []
|
|
||||||
|
|
||||||
for host in hosts:
|
|
||||||
# Only check those who claims to be alive
|
|
||||||
if host.status == HostStatus.alive:
|
|
||||||
if not host.is_alive():
|
|
||||||
dead_hosts_keys.append(host.key)
|
|
||||||
|
|
||||||
return dead_hosts_keys
|
|
||||||
|
|
||||||
|
|
||||||
def dead_host_mitigation(dead_hosts_keys):
|
|
||||||
for host_key in dead_hosts_keys:
|
|
||||||
host = HOST_POOL.get(host_key)
|
|
||||||
host.declare_dead()
|
|
||||||
|
|
||||||
vms_hosted_on_dead_host = VM_POOL.by_host(host_key)
|
|
||||||
for vm in vms_hosted_on_dead_host:
|
|
||||||
vm.declare_killed()
|
|
||||||
VM_POOL.put(vm)
|
|
||||||
HOST_POOL.put(host)
|
|
||||||
|
|
||||||
|
|
||||||
def assign_host(vm):
|
|
||||||
host_name = get_suitable_host(vm.specs)
|
|
||||||
if host_name:
|
|
||||||
if vm.status == VMStatus.requested_new:
|
|
||||||
vm.status = VMStatus.scheduled_deploy
|
|
||||||
|
|
||||||
if vm.status == VMStatus.killed:
|
|
||||||
vm.status = VMStatus.requested_start
|
|
||||||
|
|
||||||
vm.hostname = host_name
|
|
||||||
VM_POOL.put(vm)
|
|
||||||
|
|
||||||
return host_name
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def main(vm_prefix, host_prefix):
|
|
||||||
logging.info(f"{'*' * 5} SESSION STARTED {'*' * 5}")
|
logging.info(f"{'*' * 5} SESSION STARTED {'*' * 5}")
|
||||||
client = Etcd3Wrapper(
|
client = Etcd3Wrapper(
|
||||||
host=config("ETCD_HOST"), port=int(config("ETCD_PORT"))
|
host=config("ETCD_HOST"), port=int(config("ETCD_PORT"))
|
||||||
)
|
)
|
||||||
global VM_POOL, HOST_POOL
|
vm_pool = VmPool(client, vm_prefix)
|
||||||
VM_POOL = VmPool(client, vm_prefix)
|
host_pool = HostPool(client, host_prefix)
|
||||||
HOST_POOL = HostPool(client, host_prefix)
|
request_pool = RequestPool(client, request_prefix)
|
||||||
|
|
||||||
PENDING_VMS = []
|
PENDING_VMS = []
|
||||||
for events_iterator in [
|
for events_iterator in [
|
||||||
client.get_prefix(vm_prefix, value_in_json=True),
|
client.get_prefix(request_prefix, value_in_json=True),
|
||||||
client.watch_prefix(vm_prefix, timeout=5, value_in_json=True),
|
client.watch_prefix(request_prefix, timeout=5, value_in_json=True),
|
||||||
]:
|
]:
|
||||||
for e in events_iterator:
|
for e in events_iterator:
|
||||||
e = VMEntry(e)
|
if not e.value:
|
||||||
|
continue
|
||||||
|
e = RequestEntry(e)
|
||||||
logging.debug(f"{e.key}, {e.value}")
|
logging.debug(f"{e.key}, {e.value}")
|
||||||
|
|
||||||
# Never Run time critical mechanism inside timeout
|
# Never Run time critical mechanism inside timeout
|
||||||
# mechanism because timeout mechanism only comes
|
# mechanism because timeout mechanism only comes
|
||||||
# when no other event is happening. It means under
|
# when no other event is happening. It means under
|
||||||
# heavy load there would not be a timeout
|
# heavy load there would not be a timeout<
|
||||||
if e.status == "TIMEOUT":
|
if e.type == "TIMEOUT":
|
||||||
|
logging.debug("TIMEOUT event occured")
|
||||||
dead_hosts = dead_host_detection()
|
dead_hosts = dead_host_detection()
|
||||||
logging.debug(f"Dead hosts: {dead_hosts}")
|
logging.debug(f"Dead hosts: {dead_hosts}")
|
||||||
dead_host_mitigation(dead_hosts)
|
dead_host_mitigation(dead_hosts)
|
||||||
|
#
|
||||||
|
# vm_scheduled = []
|
||||||
|
# for vm in PENDING_VMS:
|
||||||
|
# if assign_host(vm) is not None:
|
||||||
|
# vm_scheduled.append(vm)
|
||||||
|
#
|
||||||
|
# for vm in vm_scheduled:
|
||||||
|
# PENDING_VMS.remove(vm)
|
||||||
|
# logging.debug(f"Remaining Pending: {PENDING_VMS}")
|
||||||
|
|
||||||
vm_scheduled = []
|
elif e.type == RequestType.ScheduleVM:
|
||||||
for vm in PENDING_VMS:
|
if hasattr(e, "migration") and e.migration and\
|
||||||
if assign_host(vm) is not None:
|
hasattr(e, "destination") and e.destination:
|
||||||
vm_scheduled.append(vm)
|
client.client.delete(e.key)
|
||||||
|
vm = vm_pool.get(e.uuid)
|
||||||
|
host = get_suitable_host(vm.specs, [host_pool.get(e.destination)])
|
||||||
|
if host:
|
||||||
|
r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
|
||||||
|
uuid=e.uuid, destination=e.destination)
|
||||||
|
request_pool.put(r)
|
||||||
|
print(host, e)
|
||||||
|
else:
|
||||||
|
logging.info("Requested destination host doesn't have enough capacity"
|
||||||
|
f"to hold {vm.uuid}")
|
||||||
|
else:
|
||||||
|
client.client.delete(e.key)
|
||||||
|
vm = vm_pool.get(e.uuid)
|
||||||
|
if assign_host(vm) is None:
|
||||||
|
vm.log.append("Can't schedule VM. No Resource Left.")
|
||||||
|
vm_pool.put(vm)
|
||||||
|
|
||||||
for vm in vm_scheduled:
|
PENDING_VMS.append(vm)
|
||||||
PENDING_VMS.remove(vm)
|
logging.info("No Resource Left. Emailing admin....")
|
||||||
logging.debug(f"Remaining Pending: {PENDING_VMS}")
|
logging.debug(f"Pending VMS: {PENDING_VMS}")
|
||||||
elif e.status in [VMStatus.requested_new, VMStatus.killed]:
|
|
||||||
if assign_host(e) is None:
|
|
||||||
PENDING_VMS.append(e)
|
|
||||||
logging.info("No Resource Left. Emailing admin....")
|
|
||||||
logging.debug(f"Pending VMS: {PENDING_VMS}")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -170,6 +99,9 @@ if __name__ == "__main__":
|
||||||
argparser.add_argument(
|
argparser.add_argument(
|
||||||
"--host_prefix", required=False, default=config("HOST_PREFIX")
|
"--host_prefix", required=False, default=config("HOST_PREFIX")
|
||||||
)
|
)
|
||||||
|
argparser.add_argument(
|
||||||
|
"--request_prefix", required=False, default=config("REQUEST_PREFIX")
|
||||||
|
)
|
||||||
args = argparser.parse_args()
|
args = argparser.parse_args()
|
||||||
|
|
||||||
main(args.vm_prefix, args.host_prefix)
|
main(args.vm_prefix, args.host_prefix, args.request_prefix)
|
||||||
|
|
Loading…
Add table
Reference in a new issue