1. mp.set_start_method('spawn') commented out from scripts/uncloud

2. uncloud.shared moved under uncloud.common
3. Refactoring in etcd_wrapper e.g timeout mechanism removed and few other things
4. uncloud-{scheduler,host} now better handle etcd events in their block state (waiting for requests to come)
This commit is contained in:
ahmadbilalkhalid 2020-01-09 00:40:05 +05:00
parent f8f790e7fc
commit 48efcdf08c
17 changed files with 136 additions and 173 deletions

View file

@ -45,7 +45,7 @@ if __name__ == '__main__':
# i.e inheriting few things from parent process etcd3 module # i.e inheriting few things from parent process etcd3 module
# errors out, so the following command configure multiprocessing # errors out, so the following command configure multiprocessing
# module to not inherit anything from parent. # module to not inherit anything from parent.
mp.set_start_method('spawn') # mp.set_start_method('spawn')
arguments = vars(args) arguments = vars(args)
try: try:
name = arguments.pop('command') name = arguments.pop('command')

View file

@ -1,6 +1,6 @@
import os import os
from uncloud.shared import shared from uncloud.common.shared import shared
from uncloud.common.settings import settings from uncloud.common.settings import settings

View file

@ -3,7 +3,7 @@ import os
from uuid import uuid4 from uuid import uuid4
from uncloud.shared import shared from uncloud.common.shared import shared
from uncloud.common.settings import settings from uncloud.common.settings import settings
data = { data = {

View file

@ -1,13 +1,12 @@
import binascii import binascii
import ipaddress import ipaddress
import random import random
import subprocess as sp
import logging import logging
import requests import requests
from pyotp import TOTP from pyotp import TOTP
from uncloud.shared import shared from uncloud.common.shared import shared
from uncloud.common.settings import settings from uncloud.common.settings import settings
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)

View file

@ -10,11 +10,12 @@ from flask import Flask, request
from flask_restful import Resource, Api from flask_restful import Resource, Api
from werkzeug.exceptions import HTTPException from werkzeug.exceptions import HTTPException
from uncloud.common.shared import shared
from uncloud.common import counters from uncloud.common import counters
from uncloud.common.vm import VMStatus from uncloud.common.vm import VMStatus
from uncloud.common.request import RequestEntry, RequestType from uncloud.common.request import RequestEntry, RequestType
from uncloud.common.settings import settings from uncloud.common.settings import settings
from uncloud.shared import shared
from . import schemas from . import schemas
from .helper import generate_mac, mac2ipv6 from .helper import generate_mac, mac2ipv6
from uncloud import UncloudException from uncloud import UncloudException

View file

@ -21,7 +21,7 @@ import bitmath
from uncloud.common.host import HostStatus from uncloud.common.host import HostStatus
from uncloud.common.vm import VMStatus from uncloud.common.vm import VMStatus
from uncloud.shared import shared from uncloud.common.shared import shared
from uncloud.common.settings import settings from uncloud.common.settings import settings
from . import helper, logger from . import helper, logger
from .common_fields import Field, VmUUIDField from .common_fields import Field, VmUUIDField

View file

@ -1,24 +1,21 @@
import etcd3 import etcd3
import json import json
import queue
import copy
from uncloud import UncloudException
from collections import namedtuple
from functools import wraps from functools import wraps
from . import logger from uncloud import UncloudException
from uncloud.common import logger
PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"])
class EtcdEntry: class EtcdEntry:
# key: str def __init__(self, meta_or_key, value, value_in_json=False):
# value: str if hasattr(meta_or_key, 'key'):
# if meta has attr 'key' then get it
def __init__(self, meta, value, value_in_json=False): self.key = meta_or_key.key.decode('utf-8')
self.key = meta.key.decode("utf-8") else:
self.value = value.decode("utf-8") # otherwise meta is the 'key'
self.key = meta_or_key
self.value = value.decode('utf-8')
if value_in_json: if value_in_json:
self.value = json.loads(self.value) self.value = json.loads(self.value)
@ -29,18 +26,12 @@ def readable_errors(func):
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
try: try:
return func(*args, **kwargs) return func(*args, **kwargs)
except etcd3.exceptions.ConnectionFailedError as err: except etcd3.exceptions.ConnectionFailedError:
raise UncloudException( raise UncloudException('Cannot connect to etcd: is etcd running as configured in uncloud.conf?')
"Cannot connect to etcd: is etcd running as configured in uncloud.conf?"
)
except etcd3.exceptions.ConnectionTimeoutError as err: except etcd3.exceptions.ConnectionTimeoutError as err:
raise etcd3.exceptions.ConnectionTimeoutError( raise etcd3.exceptions.ConnectionTimeoutError('etcd connection timeout.') from err
"etcd connection timeout."
) from err
except Exception: except Exception:
logger.exception( logger.exception('Some etcd error occured. See syslog for details.')
"Some etcd error occured. See syslog for details."
)
return wrapper return wrapper
@ -64,55 +55,39 @@ class Etcd3Wrapper:
_value = json.dumps(_value) _value = json.dumps(_value)
if not isinstance(_key, str): if not isinstance(_key, str):
_key = _key.decode("utf-8") _key = _key.decode('utf-8')
return self.client.put(_key, _value, **kwargs) return self.client.put(_key, _value, **kwargs)
@readable_errors @readable_errors
def get_prefix(self, *args, value_in_json=False, **kwargs): def get_prefix(self, *args, value_in_json=False, raise_exception=True, **kwargs):
r = self.client.get_prefix(*args, **kwargs) try:
for entry in r: event_iterator = self.client.get_prefix(*args, **kwargs)
e = EtcdEntry(*entry[::-1], value_in_json=value_in_json) for e in event_iterator:
if e.value: yield EtcdEntry(*e[::-1], value_in_json=value_in_json)
yield e except Exception as err:
if raise_exception:
raise Exception('Exception in etcd_wrapper.get_prefix') from err
else:
logger.exception('Error in etcd_wrapper')
return iter([])
@readable_errors @readable_errors
def watch_prefix(self, key, timeout=0, value_in_json=False): def watch_prefix(self, key, raise_exception=True, value_in_json=False):
timeout_event = EtcdEntry( try:
PseudoEtcdMeta(key=b"TIMEOUT"), event_iterator, cancel = self.client.watch_prefix(key)
value=str.encode( for e in event_iterator:
json.dumps({"status": "TIMEOUT", "type": "TIMEOUT"}) if hasattr(e, '_event'):
), e = e._event
value_in_json=value_in_json, if e.type == e.PUT:
) yield EtcdEntry(e.kv.key, e.kv.value, value_in_json=value_in_json)
except Exception as err:
event_queue = queue.Queue() if raise_exception:
raise Exception('Exception in etcd_wrapper.get_prefix') from err
def add_event_to_queue(event): else:
if hasattr(event, "events"): logger.exception('Error in etcd_wrapper.watch_prefix')
for e in event.events: try:
if e.value: cancel()
event_queue.put( except Exception:
EtcdEntry( pass
e, e.value, value_in_json=value_in_json return iter([])
)
)
self.client.add_watch_prefix_callback(key, add_event_to_queue)
while True:
try:
while True:
v = event_queue.get(timeout=timeout)
yield v
except queue.Empty:
event_queue.put(copy.deepcopy(timeout_event))
class PsuedoEtcdEntry(EtcdEntry):
def __init__(self, key, value, value_in_json=False):
super().__init__(
PseudoEtcdMeta(key=key.encode("utf-8")),
value,
value_in_json=value_in_json,
)

View file

@ -2,8 +2,8 @@ import json
from os.path import join from os.path import join
from uuid import uuid4 from uuid import uuid4
from .etcd_wrapper import PsuedoEtcdEntry from uncloud.common.etcd_wrapper import EtcdEntry
from .classes import SpecificEtcdEntryBase from uncloud.common.classes import SpecificEtcdEntryBase
class RequestType: class RequestType:
@ -29,11 +29,8 @@ class RequestEntry(SpecificEtcdEntryBase):
@classmethod @classmethod
def from_scratch(cls, request_prefix, **kwargs): def from_scratch(cls, request_prefix, **kwargs):
e = PsuedoEtcdEntry( e = EtcdEntry(meta_or_key=join(request_prefix, uuid4().hex),
join(request_prefix, uuid4().hex), value=json.dumps(kwargs).encode('utf-8'), value_in_json=True)
value=json.dumps(kwargs).encode("utf-8"),
value_in_json=True,
)
return cls(e) return cls(e)

View file

@ -2,7 +2,7 @@ import os
import argparse import argparse
from uncloud.common.settings import settings from uncloud.common.settings import settings
from uncloud.shared import shared from uncloud.common.shared import shared
arg_parser = argparse.ArgumentParser('configure', add_help=False) arg_parser = argparse.ArgumentParser('configure', add_help=False)
configure_subparsers = arg_parser.add_subparsers(dest='subcommand') configure_subparsers = arg_parser.add_subparsers(dest='subcommand')

View file

@ -10,8 +10,7 @@ from uuid import uuid4
from . import logger from . import logger
from uncloud.common.settings import settings from uncloud.common.settings import settings
from uncloud.shared import shared from uncloud.common.shared import shared
arg_parser = argparse.ArgumentParser('filescanner', add_help=False) arg_parser = argparse.ArgumentParser('filescanner', add_help=False)
arg_parser.add_argument('--hostname', required=True) arg_parser.add_argument('--hostname', required=True)

View file

@ -5,7 +5,7 @@ import time
from uuid import uuid4 from uuid import uuid4
from uncloud.common.request import RequestEntry, RequestType from uncloud.common.request import RequestEntry, RequestType
from uncloud.shared import shared from uncloud.common.shared import shared
from uncloud.common.settings import settings from uncloud.common.settings import settings
from uncloud.common.vm import VMStatus from uncloud.common.vm import VMStatus
from uncloud.vmm import VMM from uncloud.vmm import VMM
@ -72,52 +72,52 @@ def main(hostname, debug=False):
except Exception as e: except Exception as e:
raise Exception('uncloud-host heartbeat updating mechanism is not working') from e raise Exception('uncloud-host heartbeat updating mechanism is not working') from e
for events_iterator in [ # The below while True is neccessary for gracefully handling leadership transfer and temporary
shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True), # unavailability in etcd. Why does it work? It works because the get_prefix,watch_prefix return
shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], timeout=10, value_in_json=True) # iter([]) that is iterator of empty list on exception (that occur due to above mentioned reasons)
]: # which ends the loop immediately. So, having it inside infinite loop we try again and again to
for request_event in events_iterator: # get prefix until either success or deamon death comes.
request_event = RequestEntry(request_event) while True:
for events_iterator in [
shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True,
raise_exception=False),
shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], value_in_json=True,
raise_exception=False)
]:
for request_event in events_iterator:
request_event = RequestEntry(request_event)
if request_event.type == 'TIMEOUT':
maintenance(host.key) maintenance(host.key)
elif request_event.hostname == host.key: if request_event.hostname == host.key:
logger.debug('VM Request: %s on Host %s', request_event, host.hostname) logger.debug('VM Request: %s on Host %s', request_event, host.hostname)
shared.request_pool.client.client.delete(request_event.key)
vm_entry = shared.etcd_client.get(
join_path(settings['etcd']['vm_prefix'], request_event.uuid)
)
logger.debug('VM hostname: {}'.format(vm_entry.value))
vm = virtualmachine.VM(vm_entry)
if request_event.type == RequestType.StartVM:
vm.start()
elif request_event.type == RequestType.StopVM: shared.request_pool.client.client.delete(request_event.key)
vm.stop() vm_entry = shared.etcd_client.get(
join_path(settings['etcd']['vm_prefix'], request_event.uuid)
)
elif request_event.type == RequestType.DeleteVM: logger.debug('VM hostname: {}'.format(vm_entry.value))
vm.delete()
elif request_event.type == RequestType.InitVMMigration: vm = virtualmachine.VM(vm_entry)
vm.start(destination_host_key=host.key) if request_event.type == RequestType.StartVM:
vm.start()
elif request_event.type == RequestType.TransferVM: elif request_event.type == RequestType.StopVM:
destination_host = host_pool.get(request_event.destination_host_key) vm.stop()
if destination_host:
vm.migrate(
destination_host=destination_host.hostname,
destination_sock_path=request_event.destination_sock_path,
)
else:
logger.error('Host %s not found!', request_event.destination_host_key)
elif request_event.type == RequestType.DeleteVM:
vm.delete()
if __name__ == '__main__': elif request_event.type == RequestType.InitVMMigration:
argparser = argparse.ArgumentParser() vm.start(destination_host_key=host.key)
argparser.add_argument(
'hostname', help='Name of this host. e.g uncloud1.ungleich.ch' elif request_event.type == RequestType.TransferVM:
) destination_host = host_pool.get(request_event.destination_host_key)
args = argparser.parse_args() if destination_host:
mp.set_start_method('spawn') vm.migrate(
main(args.hostname) destination_host=destination_host.hostname,
destination_sock_path=request_event.destination_sock_path,
)
else:
logger.error('Host %s not found!', request_event.destination_host_key)

View file

@ -16,7 +16,7 @@ from uncloud.common.vm import VMStatus, declare_stopped
from uncloud.common.network import create_dev, delete_network_interface from uncloud.common.network import create_dev, delete_network_interface
from uncloud.common.schemas import VMSchema, NetworkSchema from uncloud.common.schemas import VMSchema, NetworkSchema
from uncloud.host import logger from uncloud.host import logger
from uncloud.shared import shared from uncloud.common.shared import shared
from uncloud.common.settings import settings from uncloud.common.settings import settings
from uncloud.vmm import VMM from uncloud.vmm import VMM

View file

@ -5,7 +5,7 @@ import subprocess as sp
from os.path import join as join_path from os.path import join as join_path
from uncloud.common.settings import settings from uncloud.common.settings import settings
from uncloud.shared import shared from uncloud.common.shared import shared
from uncloud.imagescanner import logger from uncloud.imagescanner import logger

View file

@ -6,7 +6,7 @@ from flask_restful import Resource, Api
from werkzeug.exceptions import HTTPException from werkzeug.exceptions import HTTPException
from uncloud.common.settings import settings from uncloud.common.settings import settings
from uncloud.shared import shared from uncloud.common.shared import shared
app = Flask(__name__) app = Flask(__name__)
api = Api(app) api = Api(app)

View file

@ -6,7 +6,7 @@ import bitmath
from uncloud.common.host import HostStatus from uncloud.common.host import HostStatus
from uncloud.common.request import RequestEntry, RequestType from uncloud.common.request import RequestEntry, RequestType
from uncloud.common.vm import VMStatus from uncloud.common.vm import VMStatus
from uncloud.shared import shared from uncloud.common.shared import shared
from uncloud.common.settings import settings from uncloud.common.settings import settings

View file

@ -6,59 +6,51 @@
import argparse import argparse
from uncloud.common.request import RequestEntry, RequestType
from uncloud.shared import shared
from uncloud.common.settings import settings from uncloud.common.settings import settings
from .helper import (dead_host_mitigation, dead_host_detection, assign_host, NoSuitableHostFound) from uncloud.common.request import RequestEntry, RequestType
from . import logger from uncloud.common.shared import shared
from uncloud.scheduler import logger
from uncloud.scheduler.helper import (dead_host_mitigation, dead_host_detection,
assign_host, NoSuitableHostFound)
arg_parser = argparse.ArgumentParser('scheduler', add_help=False) arg_parser = argparse.ArgumentParser('scheduler', add_help=False)
def main(debug=False): def main(debug=False):
for request_iterator in [ # The below while True is neccessary for gracefully handling leadership transfer and temporary
shared.etcd_client.get_prefix( # unavailability in etcd. Why does it work? It works because the get_prefix,watch_prefix return
settings["etcd"]["request_prefix"], value_in_json=True # iter([]) that is iterator of empty list on exception (that occur due to above mentioned reasons)
), # which ends the loop immediately. So, having it inside infinite loop we try again and again to
shared.etcd_client.watch_prefix( # get prefix until either success or deamon death comes.
settings["etcd"]["request_prefix"], while True:
timeout=5, for request_iterator in [
value_in_json=True, shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True,
), raise_exception=False),
]: shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], value_in_json=True,
for request_event in request_iterator: raise_exception=False),
request_entry = RequestEntry(request_event) ]:
# Never Run time critical mechanism inside timeout for request_event in request_iterator:
# mechanism because timeout mechanism only comes dead_host_mitigation(dead_host_detection())
# when no other event is happening. It means under request_entry = RequestEntry(request_event)
# heavy load there would not be a timeout event.
if request_entry.type == "TIMEOUT":
# Detect hosts that are dead and set their status if request_entry.type == RequestType.ScheduleVM:
# to "DEAD", and their VMs' status to "KILLED" logger.debug('%s, %s', request_entry.key, request_entry.value)
dead_hosts = dead_host_detection()
if dead_hosts:
logger.debug("Dead hosts: %s", dead_hosts)
dead_host_mitigation(dead_hosts)
elif request_entry.type == RequestType.ScheduleVM: vm_entry = shared.vm_pool.get(request_entry.uuid)
logger.debug("%s, %s", request_entry.key, request_entry.value) if vm_entry is None:
logger.info('Trying to act on {} but it is deleted'.format(request_entry.uuid))
continue
vm_entry = shared.vm_pool.get(request_entry.uuid) shared.etcd_client.client.delete(request_entry.key) # consume Request
if vm_entry is None:
logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid))
continue
shared.etcd_client.client.delete(request_entry.key) # consume Request try:
assign_host(vm_entry)
except NoSuitableHostFound:
vm_entry.add_log('Can\'t schedule VM. No Resource Left.')
shared.vm_pool.put(vm_entry)
try: logger.info('No Resource Left. Emailing admin....')
assign_host(vm_entry)
except NoSuitableHostFound:
vm_entry.add_log("Can't schedule VM. No Resource Left.")
shared.vm_pool.put(vm_entry)
logger.info("No Resource Left. Emailing admin....")
if __name__ == "__main__": if __name__ == '__main__':
main() main()