diff --git a/scripts/uncloud b/scripts/uncloud index 968ace6..1ca9c68 100755 --- a/scripts/uncloud +++ b/scripts/uncloud @@ -45,7 +45,7 @@ if __name__ == '__main__': # i.e inheriting few things from parent process etcd3 module # errors out, so the following command configure multiprocessing # module to not inherit anything from parent. - mp.set_start_method('spawn') + # mp.set_start_method('spawn') arguments = vars(args) try: name = arguments.pop('command') diff --git a/uncloud/api/common_fields.py b/uncloud/api/common_fields.py index adf7cdc..d1fcb64 100755 --- a/uncloud/api/common_fields.py +++ b/uncloud/api/common_fields.py @@ -1,6 +1,6 @@ import os -from uncloud.shared import shared +from uncloud.common.shared import shared from uncloud.common.settings import settings diff --git a/uncloud/api/create_image_store.py b/uncloud/api/create_image_store.py index 075f26f..1040e97 100755 --- a/uncloud/api/create_image_store.py +++ b/uncloud/api/create_image_store.py @@ -3,7 +3,7 @@ import os from uuid import uuid4 -from uncloud.shared import shared +from uncloud.common.shared import shared from uncloud.common.settings import settings data = { diff --git a/uncloud/api/helper.py b/uncloud/api/helper.py index 0e5fa19..0805280 100755 --- a/uncloud/api/helper.py +++ b/uncloud/api/helper.py @@ -1,13 +1,12 @@ import binascii import ipaddress import random -import subprocess as sp import logging import requests from pyotp import TOTP -from uncloud.shared import shared +from uncloud.common.shared import shared from uncloud.common.settings import settings logger = logging.getLogger(__name__) diff --git a/uncloud/api/main.py b/uncloud/api/main.py index e8e85fb..2d8d035 100644 --- a/uncloud/api/main.py +++ b/uncloud/api/main.py @@ -10,11 +10,12 @@ from flask import Flask, request from flask_restful import Resource, Api from werkzeug.exceptions import HTTPException +from uncloud.common.shared import shared + from uncloud.common import counters from uncloud.common.vm import VMStatus from uncloud.common.request import RequestEntry, RequestType from uncloud.common.settings import settings -from uncloud.shared import shared from . import schemas from .helper import generate_mac, mac2ipv6 from uncloud import UncloudException diff --git a/uncloud/api/schemas.py b/uncloud/api/schemas.py index f606803..e4de9a8 100755 --- a/uncloud/api/schemas.py +++ b/uncloud/api/schemas.py @@ -21,7 +21,7 @@ import bitmath from uncloud.common.host import HostStatus from uncloud.common.vm import VMStatus -from uncloud.shared import shared +from uncloud.common.shared import shared from uncloud.common.settings import settings from . import helper, logger from .common_fields import Field, VmUUIDField diff --git a/uncloud/common/etcd_wrapper.py b/uncloud/common/etcd_wrapper.py index 6a979ba..fe768ac 100644 --- a/uncloud/common/etcd_wrapper.py +++ b/uncloud/common/etcd_wrapper.py @@ -1,24 +1,21 @@ import etcd3 import json -import queue -import copy -from uncloud import UncloudException -from collections import namedtuple from functools import wraps -from . import logger - -PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"]) +from uncloud import UncloudException +from uncloud.common import logger class EtcdEntry: - # key: str - # value: str - - def __init__(self, meta, value, value_in_json=False): - self.key = meta.key.decode("utf-8") - self.value = value.decode("utf-8") + def __init__(self, meta_or_key, value, value_in_json=False): + if hasattr(meta_or_key, 'key'): + # if meta has attr 'key' then get it + self.key = meta_or_key.key.decode('utf-8') + else: + # otherwise meta is the 'key' + self.key = meta_or_key + self.value = value.decode('utf-8') if value_in_json: self.value = json.loads(self.value) @@ -29,18 +26,12 @@ def readable_errors(func): def wrapper(*args, **kwargs): try: return func(*args, **kwargs) - except etcd3.exceptions.ConnectionFailedError as err: - raise UncloudException( - "Cannot connect to etcd: is etcd running as configured in uncloud.conf?" - ) + except etcd3.exceptions.ConnectionFailedError: + raise UncloudException('Cannot connect to etcd: is etcd running as configured in uncloud.conf?') except etcd3.exceptions.ConnectionTimeoutError as err: - raise etcd3.exceptions.ConnectionTimeoutError( - "etcd connection timeout." - ) from err + raise etcd3.exceptions.ConnectionTimeoutError('etcd connection timeout.') from err except Exception: - logger.exception( - "Some etcd error occured. See syslog for details." - ) + logger.exception('Some etcd error occured. See syslog for details.') return wrapper @@ -64,55 +55,39 @@ class Etcd3Wrapper: _value = json.dumps(_value) if not isinstance(_key, str): - _key = _key.decode("utf-8") + _key = _key.decode('utf-8') return self.client.put(_key, _value, **kwargs) @readable_errors - def get_prefix(self, *args, value_in_json=False, **kwargs): - r = self.client.get_prefix(*args, **kwargs) - for entry in r: - e = EtcdEntry(*entry[::-1], value_in_json=value_in_json) - if e.value: - yield e + def get_prefix(self, *args, value_in_json=False, raise_exception=True, **kwargs): + try: + event_iterator = self.client.get_prefix(*args, **kwargs) + for e in event_iterator: + yield EtcdEntry(*e[::-1], value_in_json=value_in_json) + except Exception as err: + if raise_exception: + raise Exception('Exception in etcd_wrapper.get_prefix') from err + else: + logger.exception('Error in etcd_wrapper') + return iter([]) @readable_errors - def watch_prefix(self, key, timeout=0, value_in_json=False): - timeout_event = EtcdEntry( - PseudoEtcdMeta(key=b"TIMEOUT"), - value=str.encode( - json.dumps({"status": "TIMEOUT", "type": "TIMEOUT"}) - ), - value_in_json=value_in_json, - ) - - event_queue = queue.Queue() - - def add_event_to_queue(event): - if hasattr(event, "events"): - for e in event.events: - if e.value: - event_queue.put( - EtcdEntry( - e, e.value, value_in_json=value_in_json - ) - ) - - self.client.add_watch_prefix_callback(key, add_event_to_queue) - - while True: - try: - while True: - v = event_queue.get(timeout=timeout) - yield v - except queue.Empty: - event_queue.put(copy.deepcopy(timeout_event)) - - -class PsuedoEtcdEntry(EtcdEntry): - def __init__(self, key, value, value_in_json=False): - super().__init__( - PseudoEtcdMeta(key=key.encode("utf-8")), - value, - value_in_json=value_in_json, - ) + def watch_prefix(self, key, raise_exception=True, value_in_json=False): + try: + event_iterator, cancel = self.client.watch_prefix(key) + for e in event_iterator: + if hasattr(e, '_event'): + e = e._event + if e.type == e.PUT: + yield EtcdEntry(e.kv.key, e.kv.value, value_in_json=value_in_json) + except Exception as err: + if raise_exception: + raise Exception('Exception in etcd_wrapper.get_prefix') from err + else: + logger.exception('Error in etcd_wrapper.watch_prefix') + try: + cancel() + except Exception: + pass + return iter([]) diff --git a/uncloud/common/request.py b/uncloud/common/request.py index a8c2d0a..cb0add5 100644 --- a/uncloud/common/request.py +++ b/uncloud/common/request.py @@ -2,8 +2,8 @@ import json from os.path import join from uuid import uuid4 -from .etcd_wrapper import PsuedoEtcdEntry -from .classes import SpecificEtcdEntryBase +from uncloud.common.etcd_wrapper import EtcdEntry +from uncloud.common.classes import SpecificEtcdEntryBase class RequestType: @@ -29,11 +29,8 @@ class RequestEntry(SpecificEtcdEntryBase): @classmethod def from_scratch(cls, request_prefix, **kwargs): - e = PsuedoEtcdEntry( - join(request_prefix, uuid4().hex), - value=json.dumps(kwargs).encode("utf-8"), - value_in_json=True, - ) + e = EtcdEntry(meta_or_key=join(request_prefix, uuid4().hex), + value=json.dumps(kwargs).encode('utf-8'), value_in_json=True) return cls(e) diff --git a/uncloud/shared/__init__.py b/uncloud/common/shared.py similarity index 100% rename from uncloud/shared/__init__.py rename to uncloud/common/shared.py diff --git a/uncloud/configure/main.py b/uncloud/configure/main.py index f3e9717..e190460 100644 --- a/uncloud/configure/main.py +++ b/uncloud/configure/main.py @@ -2,7 +2,7 @@ import os import argparse from uncloud.common.settings import settings -from uncloud.shared import shared +from uncloud.common.shared import shared arg_parser = argparse.ArgumentParser('configure', add_help=False) configure_subparsers = arg_parser.add_subparsers(dest='subcommand') diff --git a/uncloud/filescanner/main.py b/uncloud/filescanner/main.py index cb5f2b7..314481f 100755 --- a/uncloud/filescanner/main.py +++ b/uncloud/filescanner/main.py @@ -10,8 +10,7 @@ from uuid import uuid4 from . import logger from uncloud.common.settings import settings -from uncloud.shared import shared - +from uncloud.common.shared import shared arg_parser = argparse.ArgumentParser('filescanner', add_help=False) arg_parser.add_argument('--hostname', required=True) diff --git a/uncloud/host/main.py b/uncloud/host/main.py index bed068b..695e3d1 100755 --- a/uncloud/host/main.py +++ b/uncloud/host/main.py @@ -5,7 +5,7 @@ import time from uuid import uuid4 from uncloud.common.request import RequestEntry, RequestType -from uncloud.shared import shared +from uncloud.common.shared import shared from uncloud.common.settings import settings from uncloud.common.vm import VMStatus from uncloud.vmm import VMM @@ -72,52 +72,52 @@ def main(hostname, debug=False): except Exception as e: raise Exception('uncloud-host heartbeat updating mechanism is not working') from e - for events_iterator in [ - shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True), - shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], timeout=10, value_in_json=True) - ]: - for request_event in events_iterator: - request_event = RequestEntry(request_event) + # The below while True is neccessary for gracefully handling leadership transfer and temporary + # unavailability in etcd. Why does it work? It works because the get_prefix,watch_prefix return + # iter([]) that is iterator of empty list on exception (that occur due to above mentioned reasons) + # which ends the loop immediately. So, having it inside infinite loop we try again and again to + # get prefix until either success or deamon death comes. + while True: + for events_iterator in [ + shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True, + raise_exception=False), + shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], value_in_json=True, + raise_exception=False) + ]: + for request_event in events_iterator: + request_event = RequestEntry(request_event) - if request_event.type == 'TIMEOUT': maintenance(host.key) - elif request_event.hostname == host.key: - logger.debug('VM Request: %s on Host %s', request_event, host.hostname) - shared.request_pool.client.client.delete(request_event.key) - vm_entry = shared.etcd_client.get( - join_path(settings['etcd']['vm_prefix'], request_event.uuid) - ) - logger.debug('VM hostname: {}'.format(vm_entry.value)) - vm = virtualmachine.VM(vm_entry) - if request_event.type == RequestType.StartVM: - vm.start() + if request_event.hostname == host.key: + logger.debug('VM Request: %s on Host %s', request_event, host.hostname) - elif request_event.type == RequestType.StopVM: - vm.stop() + shared.request_pool.client.client.delete(request_event.key) + vm_entry = shared.etcd_client.get( + join_path(settings['etcd']['vm_prefix'], request_event.uuid) + ) - elif request_event.type == RequestType.DeleteVM: - vm.delete() + logger.debug('VM hostname: {}'.format(vm_entry.value)) - elif request_event.type == RequestType.InitVMMigration: - vm.start(destination_host_key=host.key) + vm = virtualmachine.VM(vm_entry) + if request_event.type == RequestType.StartVM: + vm.start() - elif request_event.type == RequestType.TransferVM: - destination_host = host_pool.get(request_event.destination_host_key) - if destination_host: - vm.migrate( - destination_host=destination_host.hostname, - destination_sock_path=request_event.destination_sock_path, - ) - else: - logger.error('Host %s not found!', request_event.destination_host_key) + elif request_event.type == RequestType.StopVM: + vm.stop() + elif request_event.type == RequestType.DeleteVM: + vm.delete() -if __name__ == '__main__': - argparser = argparse.ArgumentParser() - argparser.add_argument( - 'hostname', help='Name of this host. e.g uncloud1.ungleich.ch' - ) - args = argparser.parse_args() - mp.set_start_method('spawn') - main(args.hostname) + elif request_event.type == RequestType.InitVMMigration: + vm.start(destination_host_key=host.key) + + elif request_event.type == RequestType.TransferVM: + destination_host = host_pool.get(request_event.destination_host_key) + if destination_host: + vm.migrate( + destination_host=destination_host.hostname, + destination_sock_path=request_event.destination_sock_path, + ) + else: + logger.error('Host %s not found!', request_event.destination_host_key) diff --git a/uncloud/host/virtualmachine.py b/uncloud/host/virtualmachine.py index a37dee4..2f6a5e3 100755 --- a/uncloud/host/virtualmachine.py +++ b/uncloud/host/virtualmachine.py @@ -16,7 +16,7 @@ from uncloud.common.vm import VMStatus, declare_stopped from uncloud.common.network import create_dev, delete_network_interface from uncloud.common.schemas import VMSchema, NetworkSchema from uncloud.host import logger -from uncloud.shared import shared +from uncloud.common.shared import shared from uncloud.common.settings import settings from uncloud.vmm import VMM diff --git a/uncloud/imagescanner/main.py b/uncloud/imagescanner/main.py index cb13ac7..91f100e 100755 --- a/uncloud/imagescanner/main.py +++ b/uncloud/imagescanner/main.py @@ -5,7 +5,7 @@ import subprocess as sp from os.path import join as join_path from uncloud.common.settings import settings -from uncloud.shared import shared +from uncloud.common.shared import shared from uncloud.imagescanner import logger diff --git a/uncloud/metadata/main.py b/uncloud/metadata/main.py index 03469a5..73d59cd 100644 --- a/uncloud/metadata/main.py +++ b/uncloud/metadata/main.py @@ -6,7 +6,7 @@ from flask_restful import Resource, Api from werkzeug.exceptions import HTTPException from uncloud.common.settings import settings -from uncloud.shared import shared +from uncloud.common.shared import shared app = Flask(__name__) api = Api(app) diff --git a/uncloud/scheduler/helper.py b/uncloud/scheduler/helper.py index a7fec15..108d126 100755 --- a/uncloud/scheduler/helper.py +++ b/uncloud/scheduler/helper.py @@ -6,7 +6,7 @@ import bitmath from uncloud.common.host import HostStatus from uncloud.common.request import RequestEntry, RequestType from uncloud.common.vm import VMStatus -from uncloud.shared import shared +from uncloud.common.shared import shared from uncloud.common.settings import settings diff --git a/uncloud/scheduler/main.py b/uncloud/scheduler/main.py index 5143537..20a52cb 100755 --- a/uncloud/scheduler/main.py +++ b/uncloud/scheduler/main.py @@ -6,59 +6,51 @@ import argparse -from uncloud.common.request import RequestEntry, RequestType -from uncloud.shared import shared from uncloud.common.settings import settings -from .helper import (dead_host_mitigation, dead_host_detection, assign_host, NoSuitableHostFound) -from . import logger +from uncloud.common.request import RequestEntry, RequestType +from uncloud.common.shared import shared +from uncloud.scheduler import logger +from uncloud.scheduler.helper import (dead_host_mitigation, dead_host_detection, + assign_host, NoSuitableHostFound) arg_parser = argparse.ArgumentParser('scheduler', add_help=False) def main(debug=False): - for request_iterator in [ - shared.etcd_client.get_prefix( - settings["etcd"]["request_prefix"], value_in_json=True - ), - shared.etcd_client.watch_prefix( - settings["etcd"]["request_prefix"], - timeout=5, - value_in_json=True, - ), - ]: - for request_event in request_iterator: - request_entry = RequestEntry(request_event) - # Never Run time critical mechanism inside timeout - # mechanism because timeout mechanism only comes - # when no other event is happening. It means under - # heavy load there would not be a timeout event. - if request_entry.type == "TIMEOUT": + # The below while True is neccessary for gracefully handling leadership transfer and temporary + # unavailability in etcd. Why does it work? It works because the get_prefix,watch_prefix return + # iter([]) that is iterator of empty list on exception (that occur due to above mentioned reasons) + # which ends the loop immediately. So, having it inside infinite loop we try again and again to + # get prefix until either success or deamon death comes. + while True: + for request_iterator in [ + shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True, + raise_exception=False), + shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], value_in_json=True, + raise_exception=False), + ]: + for request_event in request_iterator: + dead_host_mitigation(dead_host_detection()) + request_entry = RequestEntry(request_event) - # Detect hosts that are dead and set their status - # to "DEAD", and their VMs' status to "KILLED" - dead_hosts = dead_host_detection() - if dead_hosts: - logger.debug("Dead hosts: %s", dead_hosts) - dead_host_mitigation(dead_hosts) + if request_entry.type == RequestType.ScheduleVM: + logger.debug('%s, %s', request_entry.key, request_entry.value) - elif request_entry.type == RequestType.ScheduleVM: - logger.debug("%s, %s", request_entry.key, request_entry.value) + vm_entry = shared.vm_pool.get(request_entry.uuid) + if vm_entry is None: + logger.info('Trying to act on {} but it is deleted'.format(request_entry.uuid)) + continue - vm_entry = shared.vm_pool.get(request_entry.uuid) - if vm_entry is None: - logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid)) - continue + shared.etcd_client.client.delete(request_entry.key) # consume Request - shared.etcd_client.client.delete(request_entry.key) # consume Request + try: + assign_host(vm_entry) + except NoSuitableHostFound: + vm_entry.add_log('Can\'t schedule VM. No Resource Left.') + shared.vm_pool.put(vm_entry) - try: - assign_host(vm_entry) - except NoSuitableHostFound: - vm_entry.add_log("Can't schedule VM. No Resource Left.") - shared.vm_pool.put(vm_entry) - - logger.info("No Resource Left. Emailing admin....") + logger.info('No Resource Left. Emailing admin....') -if __name__ == "__main__": +if __name__ == '__main__': main()