forked from uncloud/uncloud
		
	Merge branch 'master' of code.ungleich.ch:ucloud/ucloud
This commit is contained in:
		
				commit
				
					
						9206d8ed1d
					
				
			
		
					 4 changed files with 256 additions and 6 deletions
				
			
		
							
								
								
									
										152
									
								
								ucloud/host/main.py.old
									
										
									
									
									
										Executable file
									
								
							
							
						
						
									
										152
									
								
								ucloud/host/main.py.old
									
										
									
									
									
										Executable file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,152 @@
 | 
			
		|||
import argparse
 | 
			
		||||
import multiprocessing as mp
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
from etcd3_wrapper import Etcd3Wrapper
 | 
			
		||||
 | 
			
		||||
from ucloud.common.request import RequestEntry, RequestType
 | 
			
		||||
from ucloud.config import (vm_pool, request_pool,
 | 
			
		||||
                    etcd_client, running_vms,
 | 
			
		||||
                    etcd_wrapper_args, etcd_wrapper_kwargs,
 | 
			
		||||
                    HostPool, config)
 | 
			
		||||
 | 
			
		||||
from .helper import find_free_port
 | 
			
		||||
from . import virtualmachine
 | 
			
		||||
from ucloud.host import logger
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def update_heartbeat(hostname):
 | 
			
		||||
    """Update Last HeartBeat Time for :param hostname: in etcd"""
 | 
			
		||||
    client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs)
 | 
			
		||||
    host_pool = HostPool(client, env_vars.get('HOST_PREFIX'))
 | 
			
		||||
    this_host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
 | 
			
		||||
 | 
			
		||||
    while True:
 | 
			
		||||
        this_host.update_heartbeat()
 | 
			
		||||
        host_pool.put(this_host)
 | 
			
		||||
        time.sleep(10)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def maintenance(host):
 | 
			
		||||
    # To capture vm running according to running_vms list
 | 
			
		||||
 | 
			
		||||
    # This is to capture successful migration of a VM.
 | 
			
		||||
    # Suppose, this host is running "vm1" and user initiated
 | 
			
		||||
    # request to migrate this "vm1" to some other host. On,
 | 
			
		||||
    # successful migration the destination host would set
 | 
			
		||||
    # the vm hostname to itself. Thus, we are checking
 | 
			
		||||
    # whether this host vm is successfully migrated. If yes
 | 
			
		||||
    # then we shutdown "vm1" on this host.
 | 
			
		||||
 | 
			
		||||
    to_be_removed = []
 | 
			
		||||
    for running_vm in running_vms:
 | 
			
		||||
        with vm_pool.get_put(running_vm.key) as vm_entry:
 | 
			
		||||
            if vm_entry.hostname != host.key and not vm_entry.in_migration:
 | 
			
		||||
                running_vm.handle.shutdown()
 | 
			
		||||
                logger.info("VM migration not completed successfully.")
 | 
			
		||||
                to_be_removed.append(running_vm)
 | 
			
		||||
 | 
			
		||||
    for r in to_be_removed:
 | 
			
		||||
        running_vms.remove(r)
 | 
			
		||||
 | 
			
		||||
    # To check vm running according to etcd entries
 | 
			
		||||
    alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key))
 | 
			
		||||
 | 
			
		||||
    for vm_entry in alleged_running_vms:
 | 
			
		||||
        _vm = virtualmachine.get_vm(running_vms, vm_entry.key)
 | 
			
		||||
        # Whether, the allegedly running vm is in our
 | 
			
		||||
        # running_vms list or not if it is said to be
 | 
			
		||||
        # running on this host but it is not then we
 | 
			
		||||
        # need to shut it down
 | 
			
		||||
 | 
			
		||||
        # This is to capture poweroff/shutdown of a VM
 | 
			
		||||
        # initiated by user inside VM. OR crash of VM by some
 | 
			
		||||
        # user running process
 | 
			
		||||
        if (_vm and not _vm.handle.is_running()) or not _vm:
 | 
			
		||||
            logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running()))
 | 
			
		||||
            vm_entry.add_log("""{} is not running but is said to be running.
 | 
			
		||||
                                So, shutting it down and declare it killed""".format(vm_entry.key))
 | 
			
		||||
            vm_entry.declare_killed()
 | 
			
		||||
            vm_pool.put(vm_entry)
 | 
			
		||||
            if _vm:
 | 
			
		||||
                running_vms.remove(_vm)
 | 
			
		||||
 | 
			
		||||
def check():
 | 
			
		||||
    if env_vars.get('STORAGE_BACKEND') == 'filesystem' and not isdir(env_vars.get('VM_DIR')):
 | 
			
		||||
        print("You have set STORAGE_BACKEND to filesystem. So, the vm directory mentioned"
 | 
			
		||||
              " in .env file must exists. But, it don't.")
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def main(hostname):
 | 
			
		||||
    check()
 | 
			
		||||
 | 
			
		||||
    heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,))
 | 
			
		||||
 | 
			
		||||
    host_pool = HostPool(etcd_client, env_vars.get('HOST_PREFIX'))
 | 
			
		||||
    host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
 | 
			
		||||
    assert host is not None, "No such host with name = {}".format(hostname)
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
        heartbeat_updating_process.start()
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.info("No Need To Go Further. Our heartbeat updating mechanism is not working")
 | 
			
		||||
        logger.exception(e)
 | 
			
		||||
        exit(-1)
 | 
			
		||||
 | 
			
		||||
    logger.info("%s Session Started %s", '*' * 5, '*' * 5)
 | 
			
		||||
 | 
			
		||||
    # It is seen that under heavy load, timeout event doesn't come
 | 
			
		||||
    # in a predictive manner (which is intentional because we give
 | 
			
		||||
    # higher priority to customer's requests) which delays heart
 | 
			
		||||
    # beat update which in turn misunderstood by scheduler that the
 | 
			
		||||
    # host is dead when it is actually alive. So, to ensure that we
 | 
			
		||||
    # update the heart beat in a predictive manner we start Heart
 | 
			
		||||
    # beat updating mechanism in separated thread
 | 
			
		||||
 | 
			
		||||
    for events_iterator in [
 | 
			
		||||
        etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True),
 | 
			
		||||
        etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=10, value_in_json=True),
 | 
			
		||||
    ]:
 | 
			
		||||
        for request_event in events_iterator:
 | 
			
		||||
            request_event = RequestEntry(request_event)
 | 
			
		||||
 | 
			
		||||
            if request_event.type == "TIMEOUT":
 | 
			
		||||
                maintenance(host)
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            # If the event is directed toward me OR I am destination of a InitVMMigration
 | 
			
		||||
            if request_event.hostname == host.key or request_event.destination == host.key:
 | 
			
		||||
                logger.debug("VM Request: %s", request_event)
 | 
			
		||||
 | 
			
		||||
                request_pool.client.client.delete(request_event.key)
 | 
			
		||||
                vm_entry = vm_pool.get(request_event.uuid)
 | 
			
		||||
 | 
			
		||||
                if vm_entry:
 | 
			
		||||
                    if request_event.type == RequestType.StartVM:
 | 
			
		||||
                        virtualmachine.start(vm_entry)
 | 
			
		||||
 | 
			
		||||
                    elif request_event.type == RequestType.StopVM:
 | 
			
		||||
                        virtualmachine.stop(vm_entry)
 | 
			
		||||
 | 
			
		||||
                    elif request_event.type == RequestType.DeleteVM:
 | 
			
		||||
                        virtualmachine.delete(vm_entry)
 | 
			
		||||
 | 
			
		||||
                    elif request_event.type == RequestType.InitVMMigration:
 | 
			
		||||
                        virtualmachine.start(vm_entry, host.key, find_free_port())
 | 
			
		||||
 | 
			
		||||
                    elif request_event.type == RequestType.TransferVM:
 | 
			
		||||
                        virtualmachine.transfer(request_event)
 | 
			
		||||
                else:
 | 
			
		||||
                    logger.info("VM Entry missing")
 | 
			
		||||
 | 
			
		||||
                logger.info("Running VMs %s", running_vms)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    argparser = argparse.ArgumentParser()
 | 
			
		||||
    argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1")
 | 
			
		||||
    args = argparser.parse_args()
 | 
			
		||||
    mp.set_start_method('spawn')
 | 
			
		||||
    main(args.hostname)
 | 
			
		||||
| 
						 | 
				
			
			@ -3,7 +3,7 @@ import os
 | 
			
		|||
import subprocess
 | 
			
		||||
 | 
			
		||||
from os.path import join as join_path
 | 
			
		||||
from ucloud.config import etcd_client, env_vars, image_storage_handler
 | 
			
		||||
from ucloud.config import etcd_client, config, image_storage_handler
 | 
			
		||||
from ucloud.imagescanner import logger
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -20,9 +20,9 @@ def qemu_img_type(path):
 | 
			
		|||
 | 
			
		||||
def check():
 | 
			
		||||
    """ check whether settings are sane, refuse to start if they aren't """
 | 
			
		||||
    if env_vars.get('STORAGE_BACKEND') == 'filesystem' and not isdir(env_vars.get('IMAGE_DIR')):
 | 
			
		||||
    if config['etcd']['STORAGE_BACKEND'] == 'filesystem' and not isdir(config['etcd']['IMAGE_DIR']):
 | 
			
		||||
        print("You have set STORAGE_BACKEND to filesystem, but "
 | 
			
		||||
              "{} does not exist. Refusing to start".format(env_vars.get('IMAGE_DIR')))
 | 
			
		||||
              "{} does not exist. Refusing to start".format(config['etcd']['IMAGE_DIR']))
 | 
			
		||||
        sys.exit(1)
 | 
			
		||||
 | 
			
		||||
    try:
 | 
			
		||||
| 
						 | 
				
			
			@ -34,7 +34,7 @@ def check():
 | 
			
		|||
 | 
			
		||||
def main():
 | 
			
		||||
    # We want to get images entries that requests images to be created
 | 
			
		||||
    images = etcd_client.get_prefix(env_vars.get('IMAGE_PREFIX'), value_in_json=True)
 | 
			
		||||
    images = etcd_client.get_prefix(config['etcd']['IMAGE_PREFIX'], value_in_json=True)
 | 
			
		||||
    images_to_be_created = list(filter(lambda im: im.value['status'] == 'TO_BE_CREATED', images))
 | 
			
		||||
 | 
			
		||||
    for image in images_to_be_created:
 | 
			
		||||
| 
						 | 
				
			
			@ -43,9 +43,9 @@ def main():
 | 
			
		|||
            image_owner = image.value['owner']
 | 
			
		||||
            image_filename = image.value['filename']
 | 
			
		||||
            image_store_name = image.value['store_name']
 | 
			
		||||
            image_full_path = join_path(env_vars.get('BASE_DIR'), image_owner, image_filename)
 | 
			
		||||
            image_full_path = join_path(config['etcd']['BASE_DIR'], image_owner, image_filename)
 | 
			
		||||
 | 
			
		||||
            image_stores = etcd_client.get_prefix(env_vars.get('IMAGE_STORE_PREFIX'), value_in_json=True)
 | 
			
		||||
            image_stores = etcd_client.get_prefix(config['etcd']['IMAGE_STORE_PREFIX'], value_in_json=True)
 | 
			
		||||
            user_image_store = next(filter(
 | 
			
		||||
                lambda s, store_name=image_store_name: s.value["name"] == store_name,
 | 
			
		||||
                image_stores
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										93
									
								
								ucloud/scheduler/main.py.old
									
										
									
									
									
										Executable file
									
								
							
							
						
						
									
										93
									
								
								ucloud/scheduler/main.py.old
									
										
									
									
									
										Executable file
									
								
							| 
						 | 
				
			
			@ -0,0 +1,93 @@
 | 
			
		|||
# TODO
 | 
			
		||||
#  1. send an email to an email address defined by env['admin-email']
 | 
			
		||||
#     if resources are finished
 | 
			
		||||
#  2. Introduce a status endpoint of the scheduler -
 | 
			
		||||
#     maybe expose a prometheus compatible output
 | 
			
		||||
 | 
			
		||||
from ucloud.common.request import RequestEntry, RequestType
 | 
			
		||||
from ucloud.config import etcd_client
 | 
			
		||||
from ucloud.config import host_pool, request_pool, vm_pool, env_vars
 | 
			
		||||
from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection,
 | 
			
		||||
                     assign_host, NoSuitableHostFound)
 | 
			
		||||
from . import logger
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def main():
 | 
			
		||||
    logger.info("%s SESSION STARTED %s", '*' * 5, '*' * 5)
 | 
			
		||||
 | 
			
		||||
    pending_vms = []
 | 
			
		||||
 | 
			
		||||
    for request_iterator in [
 | 
			
		||||
        etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True),
 | 
			
		||||
        etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=5, value_in_json=True),
 | 
			
		||||
    ]:
 | 
			
		||||
        for request_event in request_iterator:
 | 
			
		||||
            request_entry = RequestEntry(request_event)
 | 
			
		||||
            # Never Run time critical mechanism inside timeout
 | 
			
		||||
            # mechanism because timeout mechanism only comes
 | 
			
		||||
            # when no other event is happening. It means under
 | 
			
		||||
            # heavy load there would not be a timeout event.
 | 
			
		||||
            if request_entry.type == "TIMEOUT":
 | 
			
		||||
 | 
			
		||||
                # Detect hosts that are dead and set their status
 | 
			
		||||
                # to "DEAD", and their VMs' status to "KILLED"
 | 
			
		||||
                dead_hosts = dead_host_detection()
 | 
			
		||||
                if dead_hosts:
 | 
			
		||||
                    logger.debug("Dead hosts: %s", dead_hosts)
 | 
			
		||||
                    dead_host_mitigation(dead_hosts)
 | 
			
		||||
 | 
			
		||||
                # If there are VMs that weren't assigned a host
 | 
			
		||||
                # because there wasn't a host available which
 | 
			
		||||
                # meets requirement of that VM then we would
 | 
			
		||||
                # create a new ScheduleVM request for that VM
 | 
			
		||||
                # on our behalf.
 | 
			
		||||
                while pending_vms:
 | 
			
		||||
                    pending_vm_entry = pending_vms.pop()
 | 
			
		||||
                    r = RequestEntry.from_scratch(type="ScheduleVM",
 | 
			
		||||
                                                  uuid=pending_vm_entry.uuid,
 | 
			
		||||
                                                  hostname=pending_vm_entry.hostname,
 | 
			
		||||
                                                  request_prefix=env_vars.get("REQUEST_PREFIX"))
 | 
			
		||||
                    request_pool.put(r)
 | 
			
		||||
 | 
			
		||||
            elif request_entry.type == RequestType.ScheduleVM:
 | 
			
		||||
                logger.debug("%s, %s", request_entry.key, request_entry.value)
 | 
			
		||||
 | 
			
		||||
                vm_entry = vm_pool.get(request_entry.uuid)
 | 
			
		||||
                if vm_entry is None:
 | 
			
		||||
                    logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid))
 | 
			
		||||
                    continue
 | 
			
		||||
                etcd_client.client.delete(request_entry.key)  # consume Request
 | 
			
		||||
 | 
			
		||||
                # If the Request is about a VM which is labelled as "migration"
 | 
			
		||||
                # and has a destination
 | 
			
		||||
                if hasattr(request_entry, "migration") and request_entry.migration \
 | 
			
		||||
                        and hasattr(request_entry, "destination") and request_entry.destination:
 | 
			
		||||
                    try:
 | 
			
		||||
                        get_suitable_host(vm_specs=vm_entry.specs,
 | 
			
		||||
                                          hosts=[host_pool.get(request_entry.destination)])
 | 
			
		||||
                    except NoSuitableHostFound:
 | 
			
		||||
                        logger.info("Requested destination host doesn't have enough capacity"
 | 
			
		||||
                                    "to hold %s" % vm_entry.uuid)
 | 
			
		||||
                    else:
 | 
			
		||||
                        r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
 | 
			
		||||
                                                      uuid=request_entry.uuid,
 | 
			
		||||
                                                      destination=request_entry.destination,
 | 
			
		||||
                                                      request_prefix=env_vars.get("REQUEST_PREFIX"))
 | 
			
		||||
                        request_pool.put(r)
 | 
			
		||||
 | 
			
		||||
                # If the Request is about a VM that just want to get started/created
 | 
			
		||||
                else:
 | 
			
		||||
                    # assign_host only returns None when we couldn't be able to assign
 | 
			
		||||
                    # a host to a VM because of resource constraints
 | 
			
		||||
                    try:
 | 
			
		||||
                        assign_host(vm_entry)
 | 
			
		||||
                    except NoSuitableHostFound:
 | 
			
		||||
                        vm_entry.add_log("Can't schedule VM. No Resource Left.")
 | 
			
		||||
                        vm_pool.put(vm_entry)
 | 
			
		||||
 | 
			
		||||
                        pending_vms.append(vm_entry)
 | 
			
		||||
                        logger.info("No Resource Left. Emailing admin....")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
if __name__ == "__main__":
 | 
			
		||||
    main()
 | 
			
		||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue