Refactoring, Removal of most global vars, config default path is ~/ucloud/

This commit is contained in:
ahmadbilalkhalid 2019-12-22 12:26:48 +05:00
parent bc58a6ed9c
commit 04993e4106
23 changed files with 673 additions and 726 deletions

View file

@ -3,29 +3,23 @@
import argparse import argparse
import logging import logging
import importlib import importlib
import os
import multiprocessing as mp import multiprocessing as mp
import sys import sys
from ucloud.configure.main import update_config, configure_parser from ucloud.configure.main import configure_parser
def exception_hook(exc_type, exc_value, exc_traceback): def exception_hook(exc_type, exc_value, exc_traceback):
logger.error( logger.error(
"Uncaught exception", 'Uncaught exception',
exc_info=(exc_type, exc_value, exc_traceback) exc_info=(exc_type, exc_value, exc_traceback)
) )
print(exc_type, exc_value) print('Error: ', end='')
print(exc_type, exc_value, exc_traceback)
if __name__ == '__main__': if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG,
format='%(pathname)s:%(lineno)d -- %(levelname)-8s %(message)s',
filename='/var/log/ucloud.log', filemode='a')
logger = logging.getLogger("ucloud")
sys.excepthook = exception_hook sys.excepthook = exception_hook
mp.set_start_method('spawn')
arg_parser = argparse.ArgumentParser() arg_parser = argparse.ArgumentParser()
subparsers = arg_parser.add_subparsers(dest="command") subparsers = arg_parser.add_subparsers(dest="command")
@ -50,13 +44,20 @@ if __name__ == '__main__':
if not args.command: if not args.command:
arg_parser.print_help() arg_parser.print_help()
else: else:
logging.basicConfig(
level=logging.DEBUG,
format='%(pathname)s:%(lineno)d -- %(levelname)-8s %(message)s',
handlers=[logging.handlers.SysLogHandler(address = '/dev/log')]
)
logger = logging.getLogger("ucloud")
mp.set_start_method('spawn')
arguments = vars(args) arguments = vars(args)
try: try:
name = arguments.pop('command') name = arguments.pop('command')
mod = importlib.import_module("ucloud.{}.main".format(name)) mod = importlib.import_module("ucloud.{}.main".format(name))
main = getattr(mod, "main") main = getattr(mod, "main")
main(**arguments) main(**arguments)
except Exception as e: except Exception as e:
logger.exception(e) logger.exception('Error')
print(e) print(e)

View file

@ -1,3 +1,5 @@
import os
from setuptools import setup, find_packages from setuptools import setup, find_packages
with open("README.md", "r") as fh: with open("README.md", "r") as fh:
@ -39,5 +41,5 @@ setup(name='ucloud',
'etcd3 @ https://github.com/kragniz/python-etcd3/tarball/master#egg=etcd3', 'etcd3 @ https://github.com/kragniz/python-etcd3/tarball/master#egg=etcd3',
], ],
scripts=['scripts/ucloud'], scripts=['scripts/ucloud'],
data_files=[('/etc/ucloud/', ['conf/ucloud.conf'])], data_files=[(os.path.expanduser('~/ucloud/'), ['conf/ucloud.conf'])],
zip_safe=False) zip_safe=False)

View file

@ -1,6 +1,8 @@
import os import os
from ucloud.config import etcd_client, config from ucloud.shared import shared
from ucloud.settings import settings
class Optional: class Optional:
pass pass
@ -47,6 +49,6 @@ class VmUUIDField(Field):
self.validation = self.vm_uuid_validation self.validation = self.vm_uuid_validation
def vm_uuid_validation(self): def vm_uuid_validation(self):
r = etcd_client.get(os.path.join(config['etcd']['vm_prefix'], self.uuid)) r = shared.etcd_client.get(os.path.join(settings['etcd']['vm_prefix'], self.uuid))
if not r: if not r:
self.add_error("VM with uuid {} does not exists".format(self.uuid)) self.add_error("VM with uuid {} does not exists".format(self.uuid))

View file

@ -3,7 +3,8 @@ import os
from uuid import uuid4 from uuid import uuid4
from ucloud.config import etcd_client, config from ucloud.shared import shared
from ucloud.settings import settings
data = { data = {
"is_public": True, "is_public": True,
@ -13,4 +14,4 @@ data = {
"attributes": {"list": [], "key": [], "pool": "images"}, "attributes": {"list": [], "key": [], "pool": "images"},
} }
etcd_client.put(os.path.join(config['etcd']['image_store_prefix'], uuid4().hex), json.dumps(data)) shared.etcd_client.put(os.path.join(settings['etcd']['image_store_prefix'], uuid4().hex), json.dumps(data))

View file

@ -7,29 +7,29 @@ import requests
from pyotp import TOTP from pyotp import TOTP
from ucloud.config import vm_pool, config from ucloud.shared import shared
from ucloud.settings import settings
logger = logging.getLogger("ucloud.api.helper") logger = logging.getLogger("ucloud.api.helper")
def check_otp(name, realm, token): def check_otp(name, realm, token):
try: try:
data = { data = {
"auth_name": config['otp']['auth_name'], "auth_name": settings['otp']['auth_name'],
"auth_token": TOTP(config['otp']['auth_seed']).now(), "auth_token": TOTP(settings['otp']['auth_seed']).now(),
"auth_realm": config['otp']['auth_realm'], "auth_realm": settings['otp']['auth_realm'],
"name": name, "name": name,
"realm": realm, "realm": realm,
"token": token, "token": token,
} }
except binascii.Error as err: except binascii.Error as err:
logger.error( logger.error(
"Cannot compute OTP for seed: {}".format(config['otp']['auth_seed']) "Cannot compute OTP for seed: {}".format(settings['otp']['auth_seed'])
) )
return 400 return 400
response = requests.post( response = requests.post(
config['otp']['verification_controller_url'], json=data settings['otp']['verification_controller_url'], json=data
) )
return response.status_code return response.status_code
@ -43,7 +43,7 @@ def resolve_vm_name(name, owner):
result = next( result = next(
filter( filter(
lambda vm: vm.value["owner"] == owner and vm.value["name"] == name, lambda vm: vm.value["owner"] == owner and vm.value["name"] == name,
vm_pool.vms, shared.vm_pool.vms,
), ),
None, None,
) )
@ -81,7 +81,7 @@ def resolve_image_name(name, etcd_client):
except Exception: except Exception:
raise ValueError("Image name not in correct format i.e {store_name}:{image_name}") raise ValueError("Image name not in correct format i.e {store_name}:{image_name}")
images = etcd_client.get_prefix(config['etcd']['image_prefix'], value_in_json=True) images = etcd_client.get_prefix(settings['etcd']['image_prefix'], value_in_json=True)
# Try to find image with name == image_name and store_name == store_name # Try to find image with name == image_name and store_name == store_name
try: try:

View file

@ -10,10 +10,9 @@ from flask_restful import Resource, Api
from ucloud.common import counters from ucloud.common import counters
from ucloud.common.vm import VMStatus from ucloud.common.vm import VMStatus
from ucloud.common.request import RequestEntry, RequestType from ucloud.common.request import RequestEntry, RequestType
from ucloud.config import ( from ucloud.settings import settings
etcd_client, request_pool, vm_pool, from ucloud.shared import shared
host_pool, config, image_storage_handler
)
from . import schemas from . import schemas
from .helper import generate_mac, mac2ipv6 from .helper import generate_mac, mac2ipv6
from . import logger from . import logger
@ -31,7 +30,7 @@ class CreateVM(Resource):
validator = schemas.CreateVMSchema(data) validator = schemas.CreateVMSchema(data)
if validator.is_valid(): if validator.is_valid():
vm_uuid = uuid4().hex vm_uuid = uuid4().hex
vm_key = join_path(config['etcd']['vm_prefix'], vm_uuid) vm_key = join_path(settings['etcd']['vm_prefix'], vm_uuid)
specs = { specs = {
"cpu": validator.specs["cpu"], "cpu": validator.specs["cpu"],
"ram": validator.specs["ram"], "ram": validator.specs["ram"],
@ -39,7 +38,7 @@ class CreateVM(Resource):
"hdd": validator.specs["hdd"], "hdd": validator.specs["hdd"],
} }
macs = [generate_mac() for _ in range(len(data["network"]))] macs = [generate_mac() for _ in range(len(data["network"]))]
tap_ids = [counters.increment_etcd_counter(etcd_client, "/v1/counter/tap") tap_ids = [counters.increment_etcd_counter(shared.etcd_client, "/v1/counter/tap")
for _ in range(len(data["network"]))] for _ in range(len(data["network"]))]
vm_entry = { vm_entry = {
"name": data["vm_name"], "name": data["vm_name"],
@ -54,14 +53,14 @@ class CreateVM(Resource):
"network": list(zip(data["network"], macs, tap_ids)), "network": list(zip(data["network"], macs, tap_ids)),
"metadata": {"ssh-keys": []}, "metadata": {"ssh-keys": []},
} }
etcd_client.put(vm_key, vm_entry, value_in_json=True) shared.etcd_client.put(vm_key, vm_entry, value_in_json=True)
# Create ScheduleVM Request # Create ScheduleVM Request
r = RequestEntry.from_scratch( r = RequestEntry.from_scratch(
type=RequestType.ScheduleVM, uuid=vm_uuid, type=RequestType.ScheduleVM, uuid=vm_uuid,
request_prefix=config['etcd']['request_prefix'] request_prefix=settings['etcd']['request_prefix']
) )
request_pool.put(r) shared.request_pool.put(r)
return {"message": "VM Creation Queued"}, 200 return {"message": "VM Creation Queued"}, 200
return validator.get_errors(), 400 return validator.get_errors(), 400
@ -73,16 +72,16 @@ class VmStatus(Resource):
data = request.json data = request.json
validator = schemas.VMStatusSchema(data) validator = schemas.VMStatusSchema(data)
if validator.is_valid(): if validator.is_valid():
vm = vm_pool.get( vm = shared.vm_pool.get(
join_path(config['etcd']['vm_prefix'], data["uuid"]) join_path(settings['etcd']['vm_prefix'], data["uuid"])
) )
vm_value = vm.value.copy() vm_value = vm.value.copy()
vm_value["ip"] = [] vm_value["ip"] = []
for network_mac_and_tap in vm.network: for network_mac_and_tap in vm.network:
network_name, mac, tap = network_mac_and_tap network_name, mac, tap = network_mac_and_tap
network = etcd_client.get( network = shared.etcd_client.get(
join_path( join_path(
config['etcd']['network_prefix'], settings['etcd']['network_prefix'],
data["name"], data["name"],
network_name, network_name,
), ),
@ -102,8 +101,8 @@ class CreateImage(Resource):
data = request.json data = request.json
validator = schemas.CreateImageSchema(data) validator = schemas.CreateImageSchema(data)
if validator.is_valid(): if validator.is_valid():
file_entry = etcd_client.get( file_entry = shared.etcd_client.get(
join_path(config['etcd']['file_prefix'], data["uuid"]) join_path(settings['etcd']['file_prefix'], data["uuid"])
) )
file_entry_value = json.loads(file_entry.value) file_entry_value = json.loads(file_entry.value)
@ -115,8 +114,8 @@ class CreateImage(Resource):
"store_name": data["image_store"], "store_name": data["image_store"],
"visibility": "public", "visibility": "public",
} }
etcd_client.put( shared.etcd_client.put(
join_path(config['etcd']['image_prefix'], data["uuid"]), join_path(settings['etcd']['image_prefix'], data["uuid"]),
json.dumps(image_entry_json), json.dumps(image_entry_json),
) )
@ -127,8 +126,8 @@ class CreateImage(Resource):
class ListPublicImages(Resource): class ListPublicImages(Resource):
@staticmethod @staticmethod
def get(): def get():
images = etcd_client.get_prefix( images = shared.etcd_client.get_prefix(
config['etcd']['image_prefix'], value_in_json=True settings['etcd']['image_prefix'], value_in_json=True
) )
r = { r = {
"images": [] "images": []
@ -150,8 +149,8 @@ class VMAction(Resource):
validator = schemas.VmActionSchema(data) validator = schemas.VmActionSchema(data)
if validator.is_valid(): if validator.is_valid():
vm_entry = vm_pool.get( vm_entry = shared.vm_pool.get(
join_path(config['etcd']['vm_prefix'], data["uuid"]) join_path(settings['etcd']['vm_prefix'], data["uuid"])
) )
action = data["action"] action = data["action"]
@ -159,25 +158,25 @@ class VMAction(Resource):
action = "schedule" action = "schedule"
if action == "delete" and vm_entry.hostname == "": if action == "delete" and vm_entry.hostname == "":
if image_storage_handler.is_vm_image_exists(vm_entry.uuid): if shared.storage_handler.is_vm_image_exists(vm_entry.uuid):
r_status = image_storage_handler.delete_vm_image(vm_entry.uuid) r_status = shared.storage_handler.delete_vm_image(vm_entry.uuid)
if r_status: if r_status:
etcd_client.client.delete(vm_entry.key) shared.etcd_client.client.delete(vm_entry.key)
return {"message": "VM successfully deleted"} return {"message": "VM successfully deleted"}
else: else:
logger.error("Some Error Occurred while deleting VM") logger.error("Some Error Occurred while deleting VM")
return {"message": "VM deletion unsuccessfull"} return {"message": "VM deletion unsuccessfull"}
else: else:
etcd_client.client.delete(vm_entry.key) shared.etcd_client.client.delete(vm_entry.key)
return {"message": "VM successfully deleted"} return {"message": "VM successfully deleted"}
r = RequestEntry.from_scratch( r = RequestEntry.from_scratch(
type="{}VM".format(action.title()), type="{}VM".format(action.title()),
uuid=data["uuid"], uuid=data["uuid"],
hostname=vm_entry.hostname, hostname=vm_entry.hostname,
request_prefix=config['etcd']['request_prefix'] request_prefix=settings['etcd']['request_prefix']
) )
request_pool.put(r) shared.request_pool.put(r)
return {"message": "VM {} Queued".format(action.title())}, 200 return {"message": "VM {} Queued".format(action.title())}, 200
else: else:
return validator.get_errors(), 400 return validator.get_errors(), 400
@ -190,18 +189,18 @@ class VMMigration(Resource):
validator = schemas.VmMigrationSchema(data) validator = schemas.VmMigrationSchema(data)
if validator.is_valid(): if validator.is_valid():
vm = vm_pool.get(data["uuid"]) vm = shared.vm_pool.get(data["uuid"])
r = RequestEntry.from_scratch( r = RequestEntry.from_scratch(
type=RequestType.ScheduleVM, type=RequestType.ScheduleVM,
uuid=vm.uuid, uuid=vm.uuid,
destination=join_path( destination=join_path(
config['etcd']['host_prefix'], validator.destination.value settings['etcd']['host_prefix'], validator.destination.value
), ),
migration=True, migration=True,
request_prefix=config['etcd']['request_prefix'] request_prefix=settings['etcd']['request_prefix']
) )
request_pool.put(r) shared.request_pool.put(r)
return {"message": "VM Migration Initialization Queued"}, 200 return {"message": "VM Migration Initialization Queued"}, 200
else: else:
return validator.get_errors(), 400 return validator.get_errors(), 400
@ -214,8 +213,8 @@ class ListUserVM(Resource):
validator = schemas.OTPSchema(data) validator = schemas.OTPSchema(data)
if validator.is_valid(): if validator.is_valid():
vms = etcd_client.get_prefix( vms = shared.etcd_client.get_prefix(
config['etcd']['vm_prefix'], value_in_json=True settings['etcd']['vm_prefix'], value_in_json=True
) )
return_vms = [] return_vms = []
user_vms = filter(lambda v: v.value["owner"] == data["name"], vms) user_vms = filter(lambda v: v.value["owner"] == data["name"], vms)
@ -227,7 +226,6 @@ class ListUserVM(Resource):
"specs": vm.value["specs"], "specs": vm.value["specs"],
"status": vm.value["status"], "status": vm.value["status"],
"hostname": vm.value["hostname"], "hostname": vm.value["hostname"],
# "mac": vm.value["mac"],
"vnc_socket": None "vnc_socket": None
if vm.value.get("vnc_socket", None) is None if vm.value.get("vnc_socket", None) is None
else vm.value["vnc_socket"], else vm.value["vnc_socket"],
@ -248,8 +246,8 @@ class ListUserFiles(Resource):
validator = schemas.OTPSchema(data) validator = schemas.OTPSchema(data)
if validator.is_valid(): if validator.is_valid():
files = etcd_client.get_prefix( files = shared.etcd_client.get_prefix(
config['etcd']['file_prefix'], value_in_json=True settings['etcd']['file_prefix'], value_in_json=True
) )
return_files = [] return_files = []
user_files = list( user_files = list(
@ -273,14 +271,14 @@ class CreateHost(Resource):
data = request.json data = request.json
validator = schemas.CreateHostSchema(data) validator = schemas.CreateHostSchema(data)
if validator.is_valid(): if validator.is_valid():
host_key = join_path(config['etcd']['host_prefix'], uuid4().hex) host_key = join_path(settings['etcd']['host_prefix'], uuid4().hex)
host_entry = { host_entry = {
"specs": data["specs"], "specs": data["specs"],
"hostname": data["hostname"], "hostname": data["hostname"],
"status": "DEAD", "status": "DEAD",
"last_heartbeat": "", "last_heartbeat": "",
} }
etcd_client.put(host_key, host_entry, value_in_json=True) shared.etcd_client.put(host_key, host_entry, value_in_json=True)
return {"message": "Host Created"}, 200 return {"message": "Host Created"}, 200
@ -290,7 +288,7 @@ class CreateHost(Resource):
class ListHost(Resource): class ListHost(Resource):
@staticmethod @staticmethod
def get(): def get():
hosts = host_pool.hosts hosts = shared.host_pool.hosts
r = { r = {
host.key: { host.key: {
"status": host.status, "status": host.status,
@ -312,12 +310,12 @@ class GetSSHKeys(Resource):
# {user_prefix}/{realm}/{name}/key/ # {user_prefix}/{realm}/{name}/key/
etcd_key = join_path( etcd_key = join_path(
config['etcd']['user_prefix'], settings['etcd']['user_prefix'],
data["realm"], data["realm"],
data["name"], data["name"],
"key", "key",
) )
etcd_entry = etcd_client.get_prefix( etcd_entry = shared.etcd_client.get_prefix(
etcd_key, value_in_json=True etcd_key, value_in_json=True
) )
@ -329,13 +327,13 @@ class GetSSHKeys(Resource):
# {user_prefix}/{realm}/{name}/key/{key_name} # {user_prefix}/{realm}/{name}/key/{key_name}
etcd_key = join_path( etcd_key = join_path(
config['etcd']['user_prefix'], settings['etcd']['user_prefix'],
data["realm"], data["realm"],
data["name"], data["name"],
"key", "key",
data["key_name"], data["key_name"],
) )
etcd_entry = etcd_client.get(etcd_key, value_in_json=True) etcd_entry = shared.etcd_client.get(etcd_key, value_in_json=True)
if etcd_entry: if etcd_entry:
return { return {
@ -358,13 +356,13 @@ class AddSSHKey(Resource):
# {user_prefix}/{realm}/{name}/key/{key_name} # {user_prefix}/{realm}/{name}/key/{key_name}
etcd_key = join_path( etcd_key = join_path(
config['etcd']['user_prefix'], settings['etcd']['user_prefix'],
data["realm"], data["realm"],
data["name"], data["name"],
"key", "key",
data["key_name"], data["key_name"],
) )
etcd_entry = etcd_client.get(etcd_key, value_in_json=True) etcd_entry = shared.etcd_client.get(etcd_key, value_in_json=True)
if etcd_entry: if etcd_entry:
return { return {
"message": "Key with name '{}' already exists".format( "message": "Key with name '{}' already exists".format(
@ -373,7 +371,7 @@ class AddSSHKey(Resource):
} }
else: else:
# Key Not Found. It implies user' haven't added any key yet. # Key Not Found. It implies user' haven't added any key yet.
etcd_client.put(etcd_key, data["key"], value_in_json=True) shared.etcd_client.put(etcd_key, data["key"], value_in_json=True)
return {"message": "Key added successfully"} return {"message": "Key added successfully"}
else: else:
return validator.get_errors(), 400 return validator.get_errors(), 400
@ -388,15 +386,15 @@ class RemoveSSHKey(Resource):
# {user_prefix}/{realm}/{name}/key/{key_name} # {user_prefix}/{realm}/{name}/key/{key_name}
etcd_key = join_path( etcd_key = join_path(
config['etcd']['user_prefix'], settings['etcd']['user_prefix'],
data["realm"], data["realm"],
data["name"], data["name"],
"key", "key",
data["key_name"], data["key_name"],
) )
etcd_entry = etcd_client.get(etcd_key, value_in_json=True) etcd_entry = shared.etcd_client.get(etcd_key, value_in_json=True)
if etcd_entry: if etcd_entry:
etcd_client.client.delete(etcd_key) shared.etcd_client.client.delete(etcd_key)
return {"message": "Key successfully removed."} return {"message": "Key successfully removed."}
else: else:
return { return {
@ -418,22 +416,22 @@ class CreateNetwork(Resource):
network_entry = { network_entry = {
"id": counters.increment_etcd_counter( "id": counters.increment_etcd_counter(
etcd_client, "/v1/counter/vxlan" shared.etcd_client, "/v1/counter/vxlan"
), ),
"type": data["type"], "type": data["type"],
} }
if validator.user.value: if validator.user.value:
try: try:
nb = pynetbox.api( nb = pynetbox.api(
url=config['netbox']['url'], url=settings['netbox']['url'],
token=config['netbox']['token'], token=settings['netbox']['token'],
) )
nb_prefix = nb.ipam.prefixes.get( nb_prefix = nb.ipam.prefixes.get(
prefix=config['network']['prefix'] prefix=settings['network']['prefix']
) )
prefix = nb_prefix.available_prefixes.create( prefix = nb_prefix.available_prefixes.create(
data={ data={
"prefix_length": int(config['network']['prefix_length']), "prefix_length": int(settings['network']['prefix_length']),
"description": '{}\'s network "{}"'.format( "description": '{}\'s network "{}"'.format(
data["name"], data["network_name"] data["name"], data["network_name"]
), ),
@ -449,11 +447,11 @@ class CreateNetwork(Resource):
network_entry["ipv6"] = "fd00::/64" network_entry["ipv6"] = "fd00::/64"
network_key = join_path( network_key = join_path(
config['etcd']['network_prefix'], settings['etcd']['network_prefix'],
data['name'], data['name'],
data['network_name'], data['network_name'],
) )
etcd_client.put(network_key, network_entry, value_in_json=True) shared.etcd_client.put(network_key, network_entry, value_in_json=True)
return {"message": "Network successfully added."} return {"message": "Network successfully added."}
else: else:
return validator.get_errors(), 400 return validator.get_errors(), 400
@ -467,9 +465,9 @@ class ListUserNetwork(Resource):
if validator.is_valid(): if validator.is_valid():
prefix = join_path( prefix = join_path(
config['etcd']['network_prefix'], data["name"] settings['etcd']['network_prefix'], data["name"]
) )
networks = etcd_client.get_prefix(prefix, value_in_json=True) networks = shared.etcd_client.get_prefix(prefix, value_in_json=True)
user_networks = [] user_networks = []
for net in networks: for net in networks:
net.value["name"] = net.key.split("/")[-1] net.value["name"] = net.key.split("/")[-1]
@ -503,7 +501,7 @@ api.add_resource(CreateNetwork, "/network/create")
def main(): def main():
image_stores = list(etcd_client.get_prefix(config['etcd']['image_store_prefix'], value_in_json=True)) image_stores = list(shared.etcd_client.get_prefix(settings['etcd']['image_store_prefix'], value_in_json=True))
if len(image_stores) == 0: if len(image_stores) == 0:
data = { data = {
"is_public": True, "is_public": True,
@ -513,7 +511,7 @@ def main():
"attributes": {"list": [], "key": [], "pool": "images"}, "attributes": {"list": [], "key": [], "pool": "images"},
} }
etcd_client.put(join_path(config['etcd']['image_store_prefix'], uuid4().hex), json.dumps(data)) shared.etcd_client.put(join_path(settings['etcd']['image_store_prefix'], uuid4().hex), json.dumps(data))
app.run(host="::", debug=True) app.run(host="::", debug=True)

View file

@ -21,7 +21,8 @@ import bitmath
from ucloud.common.host import HostStatus from ucloud.common.host import HostStatus
from ucloud.common.vm import VMStatus from ucloud.common.vm import VMStatus
from ucloud.config import etcd_client, config, vm_pool, host_pool from ucloud.shared import shared
from ucloud.settings import settings
from . import helper, logger from . import helper, logger
from .common_fields import Field, VmUUIDField from .common_fields import Field, VmUUIDField
from .helper import check_otp, resolve_vm_name from .helper import check_otp, resolve_vm_name
@ -102,14 +103,14 @@ class CreateImageSchema(BaseSchema):
super().__init__(data, fields) super().__init__(data, fields)
def file_uuid_validation(self): def file_uuid_validation(self):
file_entry = etcd_client.get(os.path.join(config['etcd']['file_prefix'], self.uuid.value)) file_entry = shared.etcd_client.get(os.path.join(settings['etcd']['file_prefix'], self.uuid.value))
if file_entry is None: if file_entry is None:
self.add_error( self.add_error(
"Image File with uuid '{}' Not Found".format(self.uuid.value) "Image File with uuid '{}' Not Found".format(self.uuid.value)
) )
def image_store_name_validation(self): def image_store_name_validation(self):
image_stores = list(etcd_client.get_prefix(config['etcd']['image_store_prefix'])) image_stores = list(shared.etcd_client.get_prefix(settings['etcd']['image_store_prefix']))
image_store = next( image_store = next(
filter( filter(
@ -218,7 +219,7 @@ class CreateVMSchema(OTPSchema):
def image_validation(self): def image_validation(self):
try: try:
image_uuid = helper.resolve_image_name(self.image.value, etcd_client) image_uuid = helper.resolve_image_name(self.image.value, shared.etcd_client)
except Exception as e: except Exception as e:
logger.exception("Cannot resolve image name = %s", self.image.value) logger.exception("Cannot resolve image name = %s", self.image.value)
self.add_error(str(e)) self.add_error(str(e))
@ -236,7 +237,7 @@ class CreateVMSchema(OTPSchema):
if _network: if _network:
for net in _network: for net in _network:
network = etcd_client.get(os.path.join(config['etcd']['network_prefix'], network = shared.etcd_client.get(os.path.join(settings['etcd']['network_prefix'],
self.name.value, self.name.value,
net), value_in_json=True) net), value_in_json=True)
if not network: if not network:
@ -310,7 +311,7 @@ class VMStatusSchema(OTPSchema):
super().__init__(data, fields) super().__init__(data, fields)
def validation(self): def validation(self):
vm = vm_pool.get(self.uuid.value) vm = shared.vm_pool.get(self.uuid.value)
if not ( if not (
vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin"
): ):
@ -343,7 +344,7 @@ class VmActionSchema(OTPSchema):
) )
def validation(self): def validation(self):
vm = vm_pool.get(self.uuid.value) vm = shared.vm_pool.get(self.uuid.value)
if not ( if not (
vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin"
): ):
@ -383,7 +384,7 @@ class VmMigrationSchema(OTPSchema):
def destination_validation(self): def destination_validation(self):
hostname = self.destination.value hostname = self.destination.value
host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) host = next(filter(lambda h: h.hostname == hostname, shared.host_pool.hosts), None)
if not host: if not host:
self.add_error("No Such Host ({}) exists".format(self.destination.value)) self.add_error("No Such Host ({}) exists".format(self.destination.value))
elif host.status != HostStatus.alive: elif host.status != HostStatus.alive:
@ -392,7 +393,7 @@ class VmMigrationSchema(OTPSchema):
self.destination.value = host.key self.destination.value = host.key
def validation(self): def validation(self):
vm = vm_pool.get(self.uuid.value) vm = shared.vm_pool.get(self.uuid.value)
if not ( if not (
vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin" vm.value["owner"] == self.name.value or self.realm.value == "ungleich-admin"
): ):
@ -401,7 +402,7 @@ class VmMigrationSchema(OTPSchema):
if vm.status != VMStatus.running: if vm.status != VMStatus.running:
self.add_error("Can't migrate non-running VM") self.add_error("Can't migrate non-running VM")
if vm.hostname == os.path.join(config['etcd']['host_prefix'], self.destination.value): if vm.hostname == os.path.join(settings['etcd']['host_prefix'], self.destination.value):
self.add_error("Destination host couldn't be same as Source Host") self.add_error("Destination host couldn't be same as Source Host")
@ -443,7 +444,7 @@ class CreateNetwork(OTPSchema):
super().__init__(data, fields=fields) super().__init__(data, fields=fields)
def network_name_validation(self): def network_name_validation(self):
network = etcd_client.get(os.path.join(config['etcd']['network_prefix'], network = shared.etcd_client.get(os.path.join(settings['etcd']['network_prefix'],
self.name.value, self.name.value,
self.network_name.value), self.network_name.value),
value_in_json=True) value_in_json=True)

View file

@ -4,40 +4,63 @@ import queue
import copy import copy
from collections import namedtuple from collections import namedtuple
from functools import wraps
from . import logger
PseudoEtcdMeta = namedtuple('PseudoEtcdMeta', ['key'])
PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"])
class EtcdEntry: class EtcdEntry:
# key: str # key: str
# value: str # value: str
def __init__(self, meta, value, value_in_json=False): def __init__(self, meta, value, value_in_json=False):
self.key = meta.key.decode("utf-8") self.key = meta.key.decode('utf-8')
self.value = value.decode("utf-8") self.value = value.decode('utf-8')
if value_in_json: if value_in_json:
self.value = json.loads(self.value) self.value = json.loads(self.value)
def readable_errors(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except etcd3.exceptions.ConnectionFailedError as err:
raise etcd3.exceptions.ConnectionFailedError('etcd connection failed') from err
except etcd3.exceptions.ConnectionTimeoutError as err:
raise etcd3.exceptions.ConnectionTimeoutError('etcd connection timeout') from err
except Exception:
print('Some error occurred, most probably it is etcd that is erroring out.')
logger.exception('Some etcd error occurred')
return wrapper
class Etcd3Wrapper: class Etcd3Wrapper:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self.client = etcd3.client(*args, **kwargs) self.client = etcd3.client(*args, **kwargs)
@readable_errors
def get(self, *args, value_in_json=False, **kwargs): def get(self, *args, value_in_json=False, **kwargs):
_value, _key = self.client.get(*args, **kwargs) _value, _key = self.client.get(*args, **kwargs)
if _key is None or _value is None: if _key is None or _value is None:
return None return None
return EtcdEntry(_key, _value, value_in_json=value_in_json) return EtcdEntry(_key, _value, value_in_json=value_in_json)
@readable_errors
def put(self, *args, value_in_json=False, **kwargs): def put(self, *args, value_in_json=False, **kwargs):
_key, _value = args _key, _value = args
if value_in_json: if value_in_json:
_value = json.dumps(_value) _value = json.dumps(_value)
if not isinstance(_key, str): if not isinstance(_key, str):
_key = _key.decode("utf-8") _key = _key.decode('utf-8')
return self.client.put(_key, _value, **kwargs) return self.client.put(_key, _value, **kwargs)
@readable_errors
def get_prefix(self, *args, value_in_json=False, **kwargs): def get_prefix(self, *args, value_in_json=False, **kwargs):
r = self.client.get_prefix(*args, **kwargs) r = self.client.get_prefix(*args, **kwargs)
for entry in r: for entry in r:
@ -45,10 +68,11 @@ class Etcd3Wrapper:
if e.value: if e.value:
yield e yield e
@readable_errors
def watch_prefix(self, key, timeout=0, value_in_json=False): def watch_prefix(self, key, timeout=0, value_in_json=False):
timeout_event = EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), timeout_event = EtcdEntry(PseudoEtcdMeta(key=b'TIMEOUT'),
value=str.encode(json.dumps({"status": "TIMEOUT", value=str.encode(json.dumps({'status': 'TIMEOUT',
"type": "TIMEOUT"})), 'type': 'TIMEOUT'})),
value_in_json=value_in_json) value_in_json=value_in_json)
event_queue = queue.Queue() event_queue = queue.Queue()
@ -71,4 +95,4 @@ class Etcd3Wrapper:
class PsuedoEtcdEntry(EtcdEntry): class PsuedoEtcdEntry(EtcdEntry):
def __init__(self, key, value, value_in_json=False): def __init__(self, key, value, value_in_json=False):
super().__init__(PseudoEtcdMeta(key=key.encode("utf-8")), value, value_in_json=value_in_json) super().__init__(PseudoEtcdMeta(key=key.encode('utf-8')), value, value_in_json=value_in_json)

60
ucloud/common/network.py Normal file
View file

@ -0,0 +1,60 @@
import subprocess as sp
import random
import logging
import socket
from contextlib import closing
logger = logging.getLogger(__name__)
def random_bytes(num=6):
return [random.randrange(256) for _ in range(num)]
def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='%02x'):
mac = random_bytes()
if oui:
if type(oui) == str:
oui = [int(chunk) for chunk in oui.split(separator)]
mac = oui + random_bytes(num=6 - len(oui))
else:
if multicast:
mac[0] |= 1 # set bit 0
else:
mac[0] &= ~1 # clear bit 0
if uaa:
mac[0] &= ~(1 << 1) # clear bit 1
else:
mac[0] |= 1 << 1 # set bit 1
return separator.join(byte_fmt % b for b in mac)
def create_dev(script, _id, dev, ip=None):
command = [script, _id, dev]
if ip:
command.append(ip)
try:
output = sp.check_output(command, stderr=sp.PIPE)
except Exception as e:
print(e)
return None
else:
return output.decode('utf-8').strip()
def delete_network_interface(iface):
try:
sp.check_output(['ip', 'link', 'del', iface])
except Exception:
logger.exception('Interface Deletion failed')
def find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
try:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except Exception:
return None
else:
return s.getsockname()[1]

View file

@ -7,6 +7,8 @@ from abc import ABC
from . import logger from . import logger
from os.path import join as join_path from os.path import join as join_path
from ucloud.settings import settings as config
class ImageStorageHandler(ABC): class ImageStorageHandler(ABC):
def __init__(self, image_base, vm_base): def __init__(self, image_base, vm_base):
@ -156,3 +158,19 @@ class CEPHBasedImageStorageHandler(ImageStorageHandler):
path = join_path(self.vm_base, path) path = join_path(self.vm_base, path)
command = ["rbd", "info", path] command = ["rbd", "info", path]
return self.execute_command(command, report=False) return self.execute_command(command, report=False)
def get_storage_handler():
__storage_backend = config['storage']['storage_backend']
if __storage_backend == 'filesystem':
return FileSystemBasedImageStorageHandler(
vm_base=config['storage']['vm_dir'],
image_base=config['storage']['image_dir']
)
elif __storage_backend == 'ceph':
return CEPHBasedImageStorageHandler(
vm_base=config['storage']['ceph_vm_pool'],
image_base=config['storage']['ceph_image_pool']
)
else:
raise Exception('Unknown Image Storage Handler')

View file

@ -1,39 +0,0 @@
import configparser
import os
import logging
from ucloud.common.host import HostPool
from ucloud.common.request import RequestPool
from ucloud.common.vm import VmPool
from ucloud.common.storage_handlers import (FileSystemBasedImageStorageHandler,
CEPHBasedImageStorageHandler)
from ucloud.common.etcd_wrapper import Etcd3Wrapper
from ucloud.settings import Settings
from os.path import join as join_path
logger = logging.getLogger('ucloud.config')
config = Settings()
etcd_client = config.get_etcd_client()
host_pool = HostPool(etcd_client, config['etcd']['host_prefix'])
vm_pool = VmPool(etcd_client, config['etcd']['vm_prefix'])
request_pool = RequestPool(etcd_client, config['etcd']['request_prefix'])
running_vms = []
__storage_backend = config['storage']['storage_backend']
if __storage_backend == 'filesystem':
image_storage_handler = FileSystemBasedImageStorageHandler(
vm_base=config['storage']['vm_dir'],
image_base=config['storage']['image_dir']
)
elif __storage_backend == 'ceph':
image_storage_handler = CEPHBasedImageStorageHandler(
vm_base=config['storage']['ceph_vm_pool'],
image_base=config['storage']['ceph_image_pool']
)
else:
raise Exception('Unknown Image Storage Handler')

View file

@ -2,21 +2,19 @@ import argparse
import sys import sys
import os import os
from ucloud.settings import Settings from ucloud.settings import settings
from ucloud.shared import shared
config = Settings()
etcd_client = config.get_etcd_client()
def update_config(section, kwargs): def update_config(section, kwargs):
uncloud_config = etcd_client.get(config.config_key, uncloud_config = shared.etcd_client.get(settings.config_key, value_in_json=True)
value_in_json=True)
if not uncloud_config: if not uncloud_config:
uncloud_config = {} uncloud_config = {}
else: else:
uncloud_config = uncloud_config.value uncloud_config = uncloud_config.value
uncloud_config[section] = kwargs uncloud_config[section] = kwargs
etcd_client.put(config.config_key, uncloud_config, value_in_json=True) shared.etcd_client.put(settings.config_key, uncloud_config, value_in_json=True)
def configure_parser(parser): def configure_parser(parser):

View file

@ -8,8 +8,8 @@ import sys
from uuid import uuid4 from uuid import uuid4
from . import logger from . import logger
from ucloud.config import config, etcd_client from ucloud.settings import settings
from ucloud.shared import shared
def getxattr(file, attr): def getxattr(file, attr):
"""Get specified user extended attribute (arg:attr) of a file (arg:file)""" """Get specified user extended attribute (arg:attr) of a file (arg:file)"""
@ -69,11 +69,10 @@ except Exception as e:
def main(): def main():
BASE_DIR = config['storage']['file_dir'] base_dir = settings['storage']['file_dir']
FILE_PREFIX = config['etcd']['file_prefix']
# Recursively Get All Files and Folder below BASE_DIR # Recursively Get All Files and Folder below BASE_DIR
files = glob.glob("{}/**".format(BASE_DIR), recursive=True) files = glob.glob("{}/**".format(base_dir), recursive=True)
# Retain only Files # Retain only Files
files = list(filter(os.path.isfile, files)) files = list(filter(os.path.isfile, files))
@ -89,7 +88,7 @@ def main():
file_id = uuid4() file_id = uuid4()
# Get Username # Get Username
owner = pathlib.Path(file).parts[len(pathlib.Path(BASE_DIR).parts)] owner = pathlib.Path(file).parts[len(pathlib.Path(base_dir).parts)]
# Get Creation Date of File # Get Creation Date of File
# Here, we are assuming that ctime is creation time # Here, we are assuming that ctime is creation time
@ -105,7 +104,7 @@ def main():
file_path = pathlib.Path(file).parts[-1] file_path = pathlib.Path(file).parts[-1]
# Create Entry # Create Entry
entry_key = os.path.join(FILE_PREFIX, str(file_id)) entry_key = os.path.join(settings['etcd']['file_prefix'], str(file_id))
entry_value = { entry_value = {
"filename": file_path, "filename": file_path,
"owner": owner, "owner": owner,
@ -115,8 +114,8 @@ def main():
} }
logger.info("Tracking %s", file) logger.info("Tracking %s", file)
# Insert Entry
etcd_client.put(entry_key, entry_value, value_in_json=True) shared.etcd_client.put(entry_key, entry_value, value_in_json=True)
setxattr(file, "utracked", True) setxattr(file, "utracked", True)

View file

@ -1,13 +0,0 @@
import socket
from contextlib import closing
def find_free_port():
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
try:
s.bind(('', 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except Exception:
return None
else:
return s.getsockname()[1]

View file

@ -3,22 +3,21 @@ import multiprocessing as mp
import time import time
import sys import sys
from os.path import isdir
from ucloud.common.etcd_wrapper import Etcd3Wrapper
from ucloud.common.request import RequestEntry, RequestType from ucloud.common.request import RequestEntry, RequestType
from ucloud.config import (vm_pool, request_pool, from ucloud.common.host import HostPool
etcd_client, running_vms, from ucloud.shared import shared
HostPool, config) from ucloud.settings import settings
from .helper import find_free_port
from . import virtualmachine, logger from . import virtualmachine, logger
vmm = virtualmachine.VMM()
def update_heartbeat(hostname): def update_heartbeat(hostname):
"""Update Last HeartBeat Time for :param hostname: in etcd""" """Update Last HeartBeat Time for :param hostname: in etcd"""
client = config.get_etcd_client() client = shared.etcd_client
host_pool = HostPool(client, config['etcd']['host_prefix']) host_pool = HostPool(client)
this_host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) this_host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
while True: while True:
@ -27,122 +26,55 @@ def update_heartbeat(hostname):
time.sleep(10) time.sleep(10)
def maintenance(host):
# To capture vm running according to running_vms list
# This is to capture successful migration of a VM.
# Suppose, this host is running "vm1" and user initiated
# request to migrate this "vm1" to some other host. On,
# successful migration the destination host would set
# the vm hostname to itself. Thus, we are checking
# whether this host vm is successfully migrated. If yes
# then we shutdown "vm1" on this host.
to_be_removed = []
for running_vm in running_vms:
with vm_pool.get_put(running_vm.key) as vm_entry:
if vm_entry.hostname != host.key and not vm_entry.in_migration:
running_vm.handle.shutdown()
logger.info("VM migration not completed successfully.")
to_be_removed.append(running_vm)
for r in to_be_removed:
running_vms.remove(r)
# To check vm running according to etcd entries
alleged_running_vms = vm_pool.by_status("RUNNING", vm_pool.by_host(host.key))
for vm_entry in alleged_running_vms:
_vm = virtualmachine.get_vm(running_vms, vm_entry.key)
# Whether, the allegedly running vm is in our
# running_vms list or not if it is said to be
# running on this host but it is not then we
# need to shut it down
# This is to capture poweroff/shutdown of a VM
# initiated by user inside VM. OR crash of VM by some
# user running process
if (_vm and not _vm.handle.is_running()) or not _vm:
logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running()))
vm_entry.add_log("""{} is not running but is said to be running.
So, shutting it down and declare it killed""".format(vm_entry.key))
vm_entry.declare_killed()
vm_pool.put(vm_entry)
if _vm:
running_vms.remove(_vm)
def check():
if config['storage']['storage_backend'] == 'filesystem' and \
not isdir(config['storage']['vm_dir']):
print("You have set STORAGE_BACKEND to filesystem. So, the vm directory mentioned"
" in /etc/ucloud/ucloud.conf file must exists. But, it don't.")
sys.exit(1)
def main(hostname): def main(hostname):
check() host_pool = HostPool(shared.etcd_client)
heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,))
host_pool = HostPool(etcd_client, config['etcd']['host_prefix'])
host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
assert host is not None, "No such host with name = {}".format(hostname) assert host is not None, "No such host with name = {}".format(hostname)
try: try:
heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,))
heartbeat_updating_process.start() heartbeat_updating_process.start()
except Exception as e: except Exception as e:
logger.info("No Need To Go Further. Our heartbeat updating mechanism is not working")
logger.exception(e) logger.exception(e)
exit(-1) sys.exit("No Need To Go Further. ucloud-host heartbeat updating mechanism is not working")
logger.info("%s Session Started %s", '*' * 5, '*' * 5)
# It is seen that under heavy load, timeout event doesn't come
# in a predictive manner (which is intentional because we give
# higher priority to customer's requests) which delays heart
# beat update which in turn misunderstood by scheduler that the
# host is dead when it is actually alive. So, to ensure that we
# update the heart beat in a predictive manner we start Heart
# beat updating mechanism in separated thread
for events_iterator in [ for events_iterator in [
etcd_client.get_prefix(config['etcd']['request_prefix'], value_in_json=True), shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True),
etcd_client.watch_prefix(config['etcd']['request_prefix'], timeout=10, value_in_json=True), shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], timeout=10, value_in_json=True),
]: ]:
for request_event in events_iterator: for request_event in events_iterator:
request_event = RequestEntry(request_event) request_event = RequestEntry(request_event)
if request_event.type == "TIMEOUT": if request_event.type == "TIMEOUT":
maintenance(host) vmm.maintenance(host)
continue continue
# If the event is directed toward me OR I am destination of a InitVMMigration # If the event is directed toward me OR I am destination of a InitVMMigration
if request_event.hostname == host.key or request_event.destination == host.key: if request_event.hostname == host.key or request_event.destination == host.key:
logger.debug("VM Request: %s", request_event) logger.debug("VM Request: %s", request_event)
request_pool.client.client.delete(request_event.key) shared.request_pool.client.client.delete(request_event.key)
vm_entry = vm_pool.get(request_event.uuid) vm_entry = shared.vm_pool.get(request_event.uuid)
if vm_entry: if vm_entry:
if request_event.type == RequestType.StartVM: if request_event.type == RequestType.StartVM:
virtualmachine.start(vm_entry) vmm.start(vm_entry)
elif request_event.type == RequestType.StopVM: elif request_event.type == RequestType.StopVM:
virtualmachine.stop(vm_entry) vmm.stop(vm_entry)
elif request_event.type == RequestType.DeleteVM: elif request_event.type == RequestType.DeleteVM:
virtualmachine.delete(vm_entry) vmm.delete(vm_entry)
elif request_event.type == RequestType.InitVMMigration: elif request_event.type == RequestType.InitVMMigration:
virtualmachine.start(vm_entry, host.key, find_free_port()) vmm.start(vm_entry, host.key)
elif request_event.type == RequestType.TransferVM: elif request_event.type == RequestType.TransferVM:
virtualmachine.transfer(request_event) vmm.transfer(request_event)
else: else:
logger.info("VM Entry missing") logger.info("VM Entry missing")
logger.info("Running VMs %s", running_vms) logger.info("Running VMs %s", vmm.running_vms)
if __name__ == "__main__": if __name__ == "__main__":

View file

@ -5,7 +5,6 @@
# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor # https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor
import os import os
import random
import subprocess as sp import subprocess as sp
import tempfile import tempfile
import time import time
@ -21,11 +20,12 @@ import sshtunnel
from ucloud.common.helpers import get_ipv6_address from ucloud.common.helpers import get_ipv6_address
from ucloud.common.request import RequestEntry, RequestType from ucloud.common.request import RequestEntry, RequestType
from ucloud.common.vm import VMEntry, VMStatus from ucloud.common.vm import VMEntry, VMStatus
from ucloud.config import (etcd_client, request_pool, from ucloud.common.network import create_dev, delete_network_interface, find_free_port
running_vms, vm_pool, config,
image_storage_handler)
from . import qmp
from ucloud.host import logger from ucloud.host import logger
from ucloud.shared import shared
from ucloud.settings import settings
from . import qmp
class VM: class VM:
@ -38,193 +38,22 @@ class VM:
return "VM({})".format(self.key) return "VM({})".format(self.key)
def delete_network_interface(iface): def capture_all_exception(func):
try: @wraps(func)
sp.check_output(['ip', 'link', 'del', iface]) def wrapper(*args, **kwargs):
except Exception: try:
pass func(*args, **kwargs)
except Exception:
logger.info("Exception absorbed by captual_all_exception()")
logger.exception(func.__name__)
return wrapper
def resolve_network(network_name, network_owner):
network = etcd_client.get(join_path(config['etcd']['network_prefix'],
network_owner,
network_name),
value_in_json=True)
return network
def delete_vm_network(vm_entry):
try:
for network in vm_entry.network:
network_name = network[0]
tap_mac = network[1]
tap_id = network[2]
delete_network_interface('tap{}'.format(tap_id))
owners_vms = vm_pool.by_owner(vm_entry.owner)
owners_running_vms = vm_pool.by_status(VMStatus.running,
_vms=owners_vms)
networks = map(lambda n: n[0],
map(lambda vm: vm.network, owners_running_vms)
)
networks_in_use_by_user_vms = [vm[0] for vm in networks]
if network_name not in networks_in_use_by_user_vms:
network_entry = resolve_network(network[0], vm_entry.owner)
if network_entry:
network_type = network_entry.value["type"]
network_id = network_entry.value["id"]
if network_type == "vxlan":
delete_network_interface('br{}'.format(network_id))
delete_network_interface('vxlan{}'.format(network_id))
except Exception:
logger.exception("Exception in network interface deletion")
def create_dev(script, _id, dev, ip=None):
command = [script, _id, dev]
if ip:
command.append(ip)
try:
output = sp.check_output(command, stderr=sp.PIPE)
except Exception as e:
print(e.stderr)
return None
else:
return output.decode("utf-8").strip()
def create_vxlan_br_tap(_id, _dev, tap_id, ip=None):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'),
_id=_id, dev=_dev)
if vxlan:
bridge = create_dev(script=os.path.join(network_script_base, 'create-bridge.sh'),
_id=_id, dev=vxlan, ip=ip)
if bridge:
tap = create_dev(script=os.path.join(network_script_base, 'create-tap.sh'),
_id=str(tap_id), dev=bridge)
if tap:
return tap
def random_bytes(num=6):
return [random.randrange(256) for _ in range(num)]
def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='%02x'):
mac = random_bytes()
if oui:
if type(oui) == str:
oui = [int(chunk) for chunk in oui.split(separator)]
mac = oui + random_bytes(num=6 - len(oui))
else:
if multicast:
mac[0] |= 1 # set bit 0
else:
mac[0] &= ~1 # clear bit 0
if uaa:
mac[0] &= ~(1 << 1) # clear bit 1
else:
mac[0] |= 1 << 1 # set bit 1
return separator.join(byte_fmt % b for b in mac)
def update_radvd_conf(etcd_client):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
networks = {
net.value['ipv6']: net.value['id']
for net in etcd_client.get_prefix('/v1/network/', value_in_json=True)
if net.value.get('ipv6')
}
radvd_template = open(os.path.join(network_script_base,
'radvd-template.conf'), 'r').read()
radvd_template = Template(radvd_template)
content = [radvd_template.safe_substitute(bridge='br{}'.format(networks[net]),
prefix=net)
for net in networks if networks.get(net)]
with open('/etc/radvd.conf', 'w') as radvd_conf:
radvd_conf.writelines(content)
try:
sp.check_output(['systemctl', 'restart', 'radvd'])
except Exception:
sp.check_output(['service', 'radvd', 'restart'])
def get_start_command_args(vm_entry, vnc_sock_filename: str, migration=False, migration_port=None):
threads_per_core = 1
vm_memory = int(bitmath.parse_string_unsafe(vm_entry.specs["ram"]).to_MB())
vm_cpus = int(vm_entry.specs["cpu"])
vm_uuid = vm_entry.uuid
vm_networks = vm_entry.network
command = "-name {}_{}".format(vm_entry.owner, vm_entry.name)
command += " -drive file={},format=raw,if=virtio,cache=none".format(
image_storage_handler.qemu_path_string(vm_uuid)
)
command += " -device virtio-rng-pci -vnc unix:{}".format(vnc_sock_filename)
command += " -m {} -smp cores={},threads={}".format(
vm_memory, vm_cpus, threads_per_core
)
if migration:
command += " -incoming tcp:[::]:{}".format(migration_port)
tap = None
for network_mac_and_tap in vm_networks:
network_name, mac, tap = network_mac_and_tap
_key = os.path.join(config['etcd']['network_prefix'], vm_entry.owner, network_name)
network = etcd_client.get(_key, value_in_json=True)
network_type = network.value["type"]
network_id = str(network.value["id"])
network_ipv6 = network.value["ipv6"]
if network_type == "vxlan":
tap = create_vxlan_br_tap(_id=network_id,
_dev=config['network']['vxlan_phy_dev'],
tap_id=tap,
ip=network_ipv6)
update_radvd_conf(etcd_client)
command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" \
" -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}" \
.format(tap=tap, net_id=network_id, mac=mac)
return command.split(" ")
def create_vm_object(vm_entry, migration=False, migration_port=None):
# NOTE: If migration suddenly stop working, having different
# VNC unix filename on source and destination host can
# be a possible cause of it.
# REQUIREMENT: Use Unix Socket instead of TCP Port for VNC
vnc_sock_file = tempfile.NamedTemporaryFile()
qemu_args = get_start_command_args(
vm_entry=vm_entry,
vnc_sock_filename=vnc_sock_file.name,
migration=migration,
migration_port=migration_port,
)
qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args)
return VM(vm_entry.key, qemu_machine, vnc_sock_file)
def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
return next((vm for vm in vm_list if vm.key == vm_key), None)
def need_running_vm(func): def need_running_vm(func):
@wraps(func) @wraps(func)
def wrapper(e): def wrapper(self, e):
vm = get_vm(running_vms, e.key) vm = self.get_vm(self.running_vms, e.key)
if vm: if vm:
try: try:
status = vm.handle.command("query-status") status = vm.handle.command("query-status")
@ -242,143 +71,336 @@ def need_running_vm(func):
return wrapper return wrapper
def create(vm_entry: VMEntry): class VMM:
if image_storage_handler.is_vm_image_exists(vm_entry.uuid): def __init__(self):
# File Already exists. No Problem Continue self.etcd_client = shared.etcd_client
logger.debug("Image for vm %s exists", vm_entry.uuid) self.storage_handler = shared.storage_handler
else: self.running_vms = []
vm_hdd = int(bitmath.parse_string_unsafe(vm_entry.specs["os-ssd"]).to_MB())
if image_storage_handler.make_vm_image(src=vm_entry.image_uuid, dest=vm_entry.uuid):
if not image_storage_handler.resize_vm_image(path=vm_entry.uuid, size=vm_hdd):
vm_entry.status = VMStatus.error
else:
logger.info("New VM Created")
def get_start_command_args(self, vm_entry, vnc_sock_filename: str, migration=False, migration_port=None):
threads_per_core = 1
vm_memory = int(bitmath.parse_string_unsafe(vm_entry.specs['ram']).to_MB())
vm_cpus = int(vm_entry.specs['cpu'])
vm_uuid = vm_entry.uuid
vm_networks = vm_entry.network
def start(vm_entry: VMEntry, destination_host_key=None, migration_port=None): command = '-name {}_{}'.format(vm_entry.owner, vm_entry.name)
_vm = get_vm(running_vms, vm_entry.key)
# VM already running. No need to proceed further. command += ' -drive file={},format=raw,if=virtio,cache=none'.format(
if _vm: self.storage_handler.qemu_path_string(vm_uuid)
logger.info("VM %s already running" % vm_entry.uuid)
return
else:
logger.info("Trying to start %s" % vm_entry.uuid)
if destination_host_key:
launch_vm(vm_entry, migration=True, migration_port=migration_port,
destination_host_key=destination_host_key)
else:
create(vm_entry)
launch_vm(vm_entry)
@need_running_vm
def stop(vm_entry):
vm = get_vm(running_vms, vm_entry.key)
vm.handle.shutdown()
if not vm.handle.is_running():
vm_entry.add_log("Shutdown successfully")
vm_entry.declare_stopped()
vm_pool.put(vm_entry)
running_vms.remove(vm)
delete_vm_network(vm_entry)
def delete(vm_entry):
logger.info("Deleting VM | %s", vm_entry)
stop(vm_entry)
if image_storage_handler.is_vm_image_exists(vm_entry.uuid):
r_status = image_storage_handler.delete_vm_image(vm_entry.uuid)
if r_status:
etcd_client.client.delete(vm_entry.key)
else:
etcd_client.client.delete(vm_entry.key)
def transfer(request_event):
# This function would run on source host i.e host on which the vm
# is running initially. This host would be responsible for transferring
# vm state to destination host.
_host, _port = request_event.parameters["host"], request_event.parameters["port"]
_uuid = request_event.uuid
_destination = request_event.destination_host_key
vm = get_vm(running_vms, join_path(config['etcd']['vm_prefix'], _uuid))
if vm:
tunnel = sshtunnel.SSHTunnelForwarder(
_host,
ssh_username=config['ssh']['username'],
ssh_pkey=config['ssh']['private_key_path'],
remote_bind_address=("127.0.0.1", _port),
ssh_proxy_enabled=True,
ssh_proxy=(_host, 22)
) )
try: command += ' -device virtio-rng-pci -vnc unix:{}'.format(vnc_sock_filename)
tunnel.start() command += ' -m {} -smp cores={},threads={}'.format(
except sshtunnel.BaseSSHTunnelForwarderError: vm_memory, vm_cpus, threads_per_core
logger.exception("Couldn't establish connection to (%s, 22)", _host) )
if migration:
command += ' -incoming tcp:[::]:{}'.format(migration_port)
for network_mac_and_tap in vm_networks:
network_name, mac, tap = network_mac_and_tap
_key = os.path.join(settings['etcd']['network_prefix'], vm_entry.owner, network_name)
network = self.etcd_client.get(_key, value_in_json=True)
network_type = network.value["type"]
network_id = str(network.value["id"])
network_ipv6 = network.value["ipv6"]
if network_type == "vxlan":
tap = create_vxlan_br_tap(_id=network_id,
_dev=settings['network']['vxlan_phy_dev'],
tap_id=tap,
ip=network_ipv6)
all_networks = self.etcd_client.get_prefix('/v1/network/', value_in_json=True)
update_radvd_conf(all_networks)
command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" \
" -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}" \
.format(tap=tap, net_id=network_id, mac=mac)
return command.split(" ")
def create_vm_object(self, vm_entry, migration=False, migration_port=None):
vnc_sock_file = tempfile.NamedTemporaryFile()
qemu_args = self.get_start_command_args(
vm_entry=vm_entry,
vnc_sock_filename=vnc_sock_file.name,
migration=migration,
migration_port=migration_port,
)
qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args)
return VM(vm_entry.key, qemu_machine, vnc_sock_file)
@staticmethod
def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
return next((vm for vm in vm_list if vm.key == vm_key), None)
@capture_all_exception
def create(self, vm_entry: VMEntry):
if self.storage_handler.is_vm_image_exists(vm_entry.uuid):
# File Already exists. No Problem Continue
logger.debug("Image for vm %s exists", vm_entry.uuid)
return None
else: else:
vm.handle.command( vm_hdd = int(bitmath.parse_string_unsafe(vm_entry.specs["os-ssd"]).to_MB())
"migrate", uri="tcp:0.0.0.0:{}".format(tunnel.local_bind_port) if self.storage_handler.make_vm_image(src=vm_entry.image_uuid, dest=vm_entry.uuid):
) if not self.storage_handler.resize_vm_image(path=vm_entry.uuid, size=vm_hdd):
vm_entry.status = VMStatus.error
else:
logger.info("New VM Created")
status = vm.handle.command("query-migrate")["status"] @capture_all_exception
while status not in ["failed", "completed"]: def start(self, vm_entry: VMEntry, destination_host_key=None):
time.sleep(2) _vm = self.get_vm(self.running_vms, vm_entry.key)
status = vm.handle.command("query-migrate")["status"]
with vm_pool.get_put(request_event.uuid) as source_vm: # VM already running. No need to proceed further.
if status == "failed": if _vm:
source_vm.add_log("Migration Failed") logger.info("VM %s already running" % vm_entry.uuid)
elif status == "completed": return
# If VM is successfully migrated then shutdown the VM else:
# on this host and update hostname to destination host key logger.info("Trying to start %s" % vm_entry.uuid)
source_vm.add_log("Successfully migrated") if destination_host_key:
source_vm.hostname = _destination migration_port = find_free_port()
running_vms.remove(vm) self.launch_vm(vm_entry, migration=True, migration_port=migration_port,
vm.handle.shutdown() destination_host_key=destination_host_key)
source_vm.in_migration = False # VM transfer finished else:
finally: self.create(vm_entry)
tunnel.close() self.launch_vm(vm_entry)
@need_running_vm
def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_key=None): @capture_all_exception
logger.info("Starting %s" % vm_entry.key) def stop(self, vm_entry):
vm = self.get_vm(self.running_vms, vm_entry.key)
vm = create_vm_object(vm_entry, migration=migration, migration_port=migration_port)
try:
vm.handle.launch()
except Exception:
logger.exception("Error Occured while starting VM")
vm.handle.shutdown() vm.handle.shutdown()
if not vm.handle.is_running():
vm_entry.add_log("Shutdown successfully")
vm_entry.declare_stopped()
shared.vm_pool.put(vm_entry)
self.running_vms.remove(vm)
delete_vm_network(vm_entry)
if migration: @capture_all_exception
# We don't care whether MachineError or any other error occurred def delete(self, vm_entry):
pass logger.info("Deleting VM | %s", vm_entry)
self.stop(vm_entry)
if self.storage_handler.is_vm_image_exists(vm_entry.uuid):
r_status = self.storage_handler.delete_vm_image(vm_entry.uuid)
if r_status:
shared.etcd_client.client.delete(vm_entry.key)
else: else:
# Error during typical launch of a vm shared.etcd_client.client.delete(vm_entry.key)
vm.handle.shutdown()
vm_entry.declare_killed()
vm_pool.put(vm_entry)
else:
vm_entry.vnc_socket = vm.vnc_socket_file.name
running_vms.append(vm)
if migration: @capture_all_exception
vm_entry.in_migration = True def transfer(self, request_event):
r = RequestEntry.from_scratch( # This function would run on source host i.e host on which the vm
type=RequestType.TransferVM, # is running initially. This host would be responsible for transferring
hostname=vm_entry.hostname, # vm state to destination host.
parameters={"host": get_ipv6_address(), "port": migration_port},
uuid=vm_entry.uuid, _host, _port = request_event.parameters["host"], request_event.parameters["port"]
destination_host_key=destination_host_key, _uuid = request_event.uuid
request_prefix=config['etcd']['request_prefix'] _destination = request_event.destination_host_key
vm = self.get_vm(self.running_vms, join_path(settings['etcd']['vm_prefix'], _uuid))
if vm:
tunnel = sshtunnel.SSHTunnelForwarder(
_host,
ssh_username=settings['ssh']['username'],
ssh_pkey=settings['ssh']['private_key_path'],
remote_bind_address=("127.0.0.1", _port),
ssh_proxy_enabled=True,
ssh_proxy=(_host, 22)
) )
request_pool.put(r) try:
else: tunnel.start()
# Typical launching of a vm except sshtunnel.BaseSSHTunnelForwarderError:
vm_entry.status = VMStatus.running logger.exception("Couldn't establish connection to (%s, 22)", _host)
vm_entry.add_log("Started successfully") else:
vm.handle.command(
"migrate", uri="tcp:0.0.0.0:{}".format(tunnel.local_bind_port)
)
vm_pool.put(vm_entry) status = vm.handle.command("query-migrate")["status"]
while status not in ["failed", "completed"]:
time.sleep(2)
status = vm.handle.command("query-migrate")["status"]
with shared.vm_pool.get_put(request_event.uuid) as source_vm:
if status == "failed":
source_vm.add_log("Migration Failed")
elif status == "completed":
# If VM is successfully migrated then shutdown the VM
# on this host and update hostname to destination host key
source_vm.add_log("Successfully migrated")
source_vm.hostname = _destination
self.running_vms.remove(vm)
vm.handle.shutdown()
source_vm.in_migration = False # VM transfer finished
finally:
tunnel.close()
@capture_all_exception
def launch_vm(self, vm_entry, migration=False, migration_port=None, destination_host_key=None):
logger.info("Starting %s" % vm_entry.key)
vm = self.create_vm_object(vm_entry, migration=migration, migration_port=migration_port)
try:
vm.handle.launch()
except Exception:
logger.exception("Error Occured while starting VM")
vm.handle.shutdown()
if migration:
# We don't care whether MachineError or any other error occurred
pass
else:
# Error during typical launch of a vm
vm.handle.shutdown()
vm_entry.declare_killed()
shared.vm_pool.put(vm_entry)
else:
vm_entry.vnc_socket = vm.vnc_socket_file.name
self.running_vms.append(vm)
if migration:
vm_entry.in_migration = True
r = RequestEntry.from_scratch(
type=RequestType.TransferVM,
hostname=vm_entry.hostname,
parameters={"host": get_ipv6_address(), "port": migration_port},
uuid=vm_entry.uuid,
destination_host_key=destination_host_key,
request_prefix=settings['etcd']['request_prefix']
)
shared.request_pool.put(r)
else:
# Typical launching of a vm
vm_entry.status = VMStatus.running
vm_entry.add_log("Started successfully")
shared.vm_pool.put(vm_entry)
@capture_all_exception
def maintenance(self, host):
# To capture vm running according to running_vms list
# This is to capture successful migration of a VM.
# Suppose, this host is running "vm1" and user initiated
# request to migrate this "vm1" to some other host. On,
# successful migration the destination host would set
# the vm hostname to itself. Thus, we are checking
# whether this host vm is successfully migrated. If yes
# then we shutdown "vm1" on this host.
logger.debug("Starting Maintenance!!")
to_be_removed = []
for running_vm in self.running_vms:
with shared.vm_pool.get_put(running_vm.key) as vm_entry:
if vm_entry.hostname != host.key and not vm_entry.in_migration:
running_vm.handle.shutdown()
logger.info("VM migration not completed successfully.")
to_be_removed.append(running_vm)
for r in to_be_removed:
self.running_vms.remove(r)
# To check vm running according to etcd entries
alleged_running_vms = shared.vm_pool.by_status("RUNNING", shared.vm_pool.by_host(host.key))
for vm_entry in alleged_running_vms:
_vm = self.get_vm(self.running_vms, vm_entry.key)
# Whether, the allegedly running vm is in our
# running_vms list or not if it is said to be
# running on this host but it is not then we
# need to shut it down
# This is to capture poweroff/shutdown of a VM
# initiated by user inside VM. OR crash of VM by some
# user running process
if (_vm and not _vm.handle.is_running()) or not _vm:
logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running()))
vm_entry.add_log("""{} is not running but is said to be running.
So, shutting it down and declare it killed""".format(vm_entry.key))
vm_entry.declare_killed()
shared.vm_pool.put(vm_entry)
if _vm:
self.running_vms.remove(_vm)
def resolve_network(network_name, network_owner):
network = shared.etcd_client.get(join_path(settings['etcd']['network_prefix'],
network_owner,
network_name),
value_in_json=True)
return network
def delete_vm_network(vm_entry):
try:
for network in vm_entry.network:
network_name = network[0]
tap_mac = network[1]
tap_id = network[2]
delete_network_interface('tap{}'.format(tap_id))
owners_vms = shared.vm_pool.by_owner(vm_entry.owner)
owners_running_vms = shared.vm_pool.by_status(VMStatus.running,
_vms=owners_vms)
networks = map(
lambda n: n[0], map(lambda vm: vm.network, owners_running_vms)
)
networks_in_use_by_user_vms = [vm[0] for vm in networks]
if network_name not in networks_in_use_by_user_vms:
network_entry = resolve_network(network[0], vm_entry.owner)
if network_entry:
network_type = network_entry.value["type"]
network_id = network_entry.value["id"]
if network_type == "vxlan":
delete_network_interface('br{}'.format(network_id))
delete_network_interface('vxlan{}'.format(network_id))
except Exception:
logger.exception("Exception in network interface deletion")
def create_vxlan_br_tap(_id, _dev, tap_id, ip=None):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'),
_id=_id, dev=_dev)
if vxlan:
bridge = create_dev(script=os.path.join(network_script_base, 'create-bridge.sh'),
_id=_id, dev=vxlan, ip=ip)
if bridge:
tap = create_dev(script=os.path.join(network_script_base, 'create-tap.sh'),
_id=str(tap_id), dev=bridge)
if tap:
return tap
def update_radvd_conf(all_networks):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
networks = {
net.value['ipv6']: net.value['id']
for net in all_networks
if net.value.get('ipv6')
}
radvd_template = open(os.path.join(network_script_base,
'radvd-template.conf'), 'r').read()
radvd_template = Template(radvd_template)
content = [
radvd_template.safe_substitute(
bridge='br{}'.format(networks[net]),
prefix=net
)
for net in networks if networks.get(net)
]
with open('/etc/radvd.conf', 'w') as radvd_conf:
radvd_conf.writelines(content)
try:
sp.check_output(['systemctl', 'restart', 'radvd'])
except Exception:
sp.check_output(['service', 'radvd', 'restart'])

View file

@ -5,7 +5,8 @@ import sys
from os.path import isdir from os.path import isdir
from os.path import join as join_path from os.path import join as join_path
from ucloud.config import etcd_client, config, image_storage_handler from ucloud.settings import settings
from ucloud.shared import shared
from ucloud.imagescanner import logger from ucloud.imagescanner import logger
@ -22,9 +23,9 @@ def qemu_img_type(path):
def check(): def check():
""" check whether settings are sane, refuse to start if they aren't """ """ check whether settings are sane, refuse to start if they aren't """
if config['storage']['storage_backend'] == 'filesystem' and not isdir(config['storage']['image_dir']): if settings['storage']['storage_backend'] == 'filesystem' and not isdir(settings['storage']['image_dir']):
sys.exit("You have set STORAGE_BACKEND to filesystem, but " sys.exit("You have set STORAGE_BACKEND to filesystem, but "
"{} does not exist. Refusing to start".format(config['storage']['image_dir']) "{} does not exist. Refusing to start".format(settings['storage']['image_dir'])
) )
try: try:
@ -36,7 +37,7 @@ def check():
def main(): def main():
# We want to get images entries that requests images to be created # We want to get images entries that requests images to be created
images = etcd_client.get_prefix(config['etcd']['image_prefix'], value_in_json=True) images = shared.etcd_client.get_prefix(settings['etcd']['image_prefix'], value_in_json=True)
images_to_be_created = list(filter(lambda im: im.value['status'] == 'TO_BE_CREATED', images)) images_to_be_created = list(filter(lambda im: im.value['status'] == 'TO_BE_CREATED', images))
for image in images_to_be_created: for image in images_to_be_created:
@ -45,9 +46,9 @@ def main():
image_owner = image.value['owner'] image_owner = image.value['owner']
image_filename = image.value['filename'] image_filename = image.value['filename']
image_store_name = image.value['store_name'] image_store_name = image.value['store_name']
image_full_path = join_path(config['storage']['file_dir'], image_owner, image_filename) image_full_path = join_path(settings['storage']['file_dir'], image_owner, image_filename)
image_stores = etcd_client.get_prefix(config['etcd']['image_store_prefix'], image_stores = shared.etcd_client.get_prefix(settings['etcd']['image_store_prefix'],
value_in_json=True) value_in_json=True)
user_image_store = next(filter( user_image_store = next(filter(
lambda s, store_name=image_store_name: s.value["name"] == store_name, lambda s, store_name=image_store_name: s.value["name"] == store_name,
@ -71,18 +72,18 @@ def main():
logger.exception(e) logger.exception(e)
else: else:
# Import and Protect # Import and Protect
r_status = image_storage_handler.import_image(src="image.raw", r_status = shared.storage_handler.import_image(src="image.raw",
dest=image_uuid, dest=image_uuid,
protect=True) protect=True)
if r_status: if r_status:
# Everything is successfully done # Everything is successfully done
image.value["status"] = "CREATED" image.value["status"] = "CREATED"
etcd_client.put(image.key, json.dumps(image.value)) shared.etcd_client.put(image.key, json.dumps(image.value))
else: else:
# The user provided image is either not found or of invalid format # The user provided image is either not found or of invalid format
image.value["status"] = "INVALID_IMAGE" image.value["status"] = "INVALID_IMAGE"
etcd_client.put(image.key, json.dumps(image.value)) shared.etcd_client.put(image.key, json.dumps(image.value))
try: try:
os.remove("image.raw") os.remove("image.raw")

View file

@ -2,15 +2,15 @@ import os
from flask import Flask, request from flask import Flask, request
from flask_restful import Resource, Api from flask_restful import Resource, Api
from ucloud.settings import settings
from ucloud.config import etcd_client, config, vm_pool from ucloud.shared import shared
app = Flask(__name__) app = Flask(__name__)
api = Api(app) api = Api(app)
def get_vm_entry(mac_addr): def get_vm_entry(mac_addr):
return next(filter(lambda vm: mac_addr in list(zip(*vm.network))[1], vm_pool.vms), None) return next(filter(lambda vm: mac_addr in list(zip(*vm.network))[1], shared.vm_pool.vms), None)
# https://stackoverflow.com/questions/37140846/how-to-convert-ipv6-link-local-address-to-mac-address-in-python # https://stackoverflow.com/questions/37140846/how-to-convert-ipv6-link-local-address-to-mac-address-in-python
@ -43,10 +43,10 @@ class Root(Resource):
if not data: if not data:
return {'message': 'Metadata for such VM does not exists.'}, 404 return {'message': 'Metadata for such VM does not exists.'}, 404
else: else:
etcd_key = os.path.join(config['etcd']['user_prefix'], etcd_key = os.path.join(settings['etcd']['user_prefix'],
data.value['owner_realm'], data.value['owner_realm'],
data.value['owner'], 'key') data.value['owner'], 'key')
etcd_entry = etcd_client.get_prefix(etcd_key, value_in_json=True) etcd_entry = shared.etcd_client.get_prefix(etcd_key, value_in_json=True)
user_personal_ssh_keys = [key.value for key in etcd_entry] user_personal_ssh_keys = [key.value for key in etcd_entry]
data.value['metadata']['ssh-keys'] += user_personal_ssh_keys data.value['metadata']['ssh-keys'] += user_personal_ssh_keys
return data.value['metadata'], 200 return data.value['metadata'], 200

View file

@ -6,7 +6,8 @@ import bitmath
from ucloud.common.host import HostStatus from ucloud.common.host import HostStatus
from ucloud.common.request import RequestEntry, RequestType from ucloud.common.request import RequestEntry, RequestType
from ucloud.common.vm import VMStatus from ucloud.common.vm import VMStatus
from ucloud.config import vm_pool, host_pool, request_pool, config from ucloud.shared import shared
from ucloud.settings import settings
def accumulated_specs(vms_specs): def accumulated_specs(vms_specs):
@ -46,14 +47,14 @@ class NoSuitableHostFound(Exception):
def get_suitable_host(vm_specs, hosts=None): def get_suitable_host(vm_specs, hosts=None):
if hosts is None: if hosts is None:
hosts = host_pool.by_status(HostStatus.alive) hosts = shared.host_pool.by_status(HostStatus.alive)
for host in hosts: for host in hosts:
# Filter them by host_name # Filter them by host_name
vms = vm_pool.by_host(host.key) vms = shared.vm_pool.by_host(host.key)
# Filter them by status # Filter them by status
vms = vm_pool.by_status(VMStatus.running, vms) vms = shared.vm_pool.by_status(VMStatus.running, vms)
running_vms_specs = [vm.specs for vm in vms] running_vms_specs = [vm.specs for vm in vms]
@ -75,7 +76,7 @@ def get_suitable_host(vm_specs, hosts=None):
def dead_host_detection(): def dead_host_detection():
# Bring out your dead! - Monty Python and the Holy Grail # Bring out your dead! - Monty Python and the Holy Grail
hosts = host_pool.by_status(HostStatus.alive) hosts = shared.host_pool.by_status(HostStatus.alive)
dead_hosts_keys = [] dead_hosts_keys = []
for host in hosts: for host in hosts:
@ -89,25 +90,25 @@ def dead_host_detection():
def dead_host_mitigation(dead_hosts_keys): def dead_host_mitigation(dead_hosts_keys):
for host_key in dead_hosts_keys: for host_key in dead_hosts_keys:
host = host_pool.get(host_key) host = shared.host_pool.get(host_key)
host.declare_dead() host.declare_dead()
vms_hosted_on_dead_host = vm_pool.by_host(host_key) vms_hosted_on_dead_host = shared.vm_pool.by_host(host_key)
for vm in vms_hosted_on_dead_host: for vm in vms_hosted_on_dead_host:
vm.declare_killed() vm.declare_killed()
vm_pool.put(vm) shared.vm_pool.put(vm)
host_pool.put(host) shared.host_pool.put(host)
def assign_host(vm): def assign_host(vm):
vm.hostname = get_suitable_host(vm.specs) vm.hostname = get_suitable_host(vm.specs)
vm_pool.put(vm) shared.vm_pool.put(vm)
r = RequestEntry.from_scratch(type=RequestType.StartVM, r = RequestEntry.from_scratch(type=RequestType.StartVM,
uuid=vm.uuid, uuid=vm.uuid,
hostname=vm.hostname, hostname=vm.hostname,
request_prefix=config['etcd']['request_prefix']) request_prefix=settings['etcd']['request_prefix'])
request_pool.put(r) shared.request_pool.put(r)
vm.log.append("VM scheduled for starting") vm.log.append("VM scheduled for starting")
return vm.hostname return vm.hostname

View file

@ -5,8 +5,8 @@
# maybe expose a prometheus compatible output # maybe expose a prometheus compatible output
from ucloud.common.request import RequestEntry, RequestType from ucloud.common.request import RequestEntry, RequestType
from ucloud.config import etcd_client from ucloud.shared import shared
from ucloud.config import host_pool, request_pool, vm_pool, config from ucloud.settings import settings
from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection, from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection,
assign_host, NoSuitableHostFound) assign_host, NoSuitableHostFound)
from . import logger from . import logger
@ -16,8 +16,8 @@ def main():
pending_vms = [] pending_vms = []
for request_iterator in [ for request_iterator in [
etcd_client.get_prefix(config['etcd']['request_prefix'], value_in_json=True), shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True),
etcd_client.watch_prefix(config['etcd']['request_prefix'], timeout=5, value_in_json=True), shared.etcd_client.watch_prefix(settings['etcd']['request_prefix'], timeout=5, value_in_json=True),
]: ]:
for request_event in request_iterator: for request_event in request_iterator:
request_entry = RequestEntry(request_event) request_entry = RequestEntry(request_event)
@ -44,17 +44,17 @@ def main():
r = RequestEntry.from_scratch(type="ScheduleVM", r = RequestEntry.from_scratch(type="ScheduleVM",
uuid=pending_vm_entry.uuid, uuid=pending_vm_entry.uuid,
hostname=pending_vm_entry.hostname, hostname=pending_vm_entry.hostname,
request_prefix=config['etcd']['request_prefix']) request_prefix=settings['etcd']['request_prefix'])
request_pool.put(r) shared.request_pool.put(r)
elif request_entry.type == RequestType.ScheduleVM: elif request_entry.type == RequestType.ScheduleVM:
logger.debug("%s, %s", request_entry.key, request_entry.value) logger.debug("%s, %s", request_entry.key, request_entry.value)
vm_entry = vm_pool.get(request_entry.uuid) vm_entry = shared.vm_pool.get(request_entry.uuid)
if vm_entry is None: if vm_entry is None:
logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid)) logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid))
continue continue
etcd_client.client.delete(request_entry.key) # consume Request shared.etcd_client.client.delete(request_entry.key) # consume Request
# If the Request is about a VM which is labelled as "migration" # If the Request is about a VM which is labelled as "migration"
# and has a destination # and has a destination
@ -62,7 +62,7 @@ def main():
and hasattr(request_entry, "destination") and request_entry.destination: and hasattr(request_entry, "destination") and request_entry.destination:
try: try:
get_suitable_host(vm_specs=vm_entry.specs, get_suitable_host(vm_specs=vm_entry.specs,
hosts=[host_pool.get(request_entry.destination)]) hosts=[shared.host_pool.get(request_entry.destination)])
except NoSuitableHostFound: except NoSuitableHostFound:
logger.info("Requested destination host doesn't have enough capacity" logger.info("Requested destination host doesn't have enough capacity"
"to hold %s" % vm_entry.uuid) "to hold %s" % vm_entry.uuid)
@ -70,8 +70,8 @@ def main():
r = RequestEntry.from_scratch(type=RequestType.InitVMMigration, r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
uuid=request_entry.uuid, uuid=request_entry.uuid,
destination=request_entry.destination, destination=request_entry.destination,
request_prefix=config['etcd']['request_prefix']) request_prefix=settings['etcd']['request_prefix'])
request_pool.put(r) shared.request_pool.put(r)
# If the Request is about a VM that just want to get started/created # If the Request is about a VM that just want to get started/created
else: else:
@ -81,7 +81,7 @@ def main():
assign_host(vm_entry) assign_host(vm_entry)
except NoSuitableHostFound: except NoSuitableHostFound:
vm_entry.add_log("Can't schedule VM. No Resource Left.") vm_entry.add_log("Can't schedule VM. No Resource Left.")
vm_pool.put(vm_entry) shared.vm_pool.put(vm_entry)
pending_vms.append(vm_entry) pending_vms.append(vm_entry)
logger.info("No Resource Left. Emailing admin....") logger.info("No Resource Left. Emailing admin....")

View file

@ -1,93 +0,0 @@
# TODO
# 1. send an email to an email address defined by env['admin-email']
# if resources are finished
# 2. Introduce a status endpoint of the scheduler -
# maybe expose a prometheus compatible output
from ucloud.common.request import RequestEntry, RequestType
from ucloud.config import etcd_client
from ucloud.config import host_pool, request_pool, vm_pool, env_vars
from .helper import (get_suitable_host, dead_host_mitigation, dead_host_detection,
assign_host, NoSuitableHostFound)
from . import logger
def main():
logger.info("%s SESSION STARTED %s", '*' * 5, '*' * 5)
pending_vms = []
for request_iterator in [
etcd_client.get_prefix(env_vars.get('REQUEST_PREFIX'), value_in_json=True),
etcd_client.watch_prefix(env_vars.get('REQUEST_PREFIX'), timeout=5, value_in_json=True),
]:
for request_event in request_iterator:
request_entry = RequestEntry(request_event)
# Never Run time critical mechanism inside timeout
# mechanism because timeout mechanism only comes
# when no other event is happening. It means under
# heavy load there would not be a timeout event.
if request_entry.type == "TIMEOUT":
# Detect hosts that are dead and set their status
# to "DEAD", and their VMs' status to "KILLED"
dead_hosts = dead_host_detection()
if dead_hosts:
logger.debug("Dead hosts: %s", dead_hosts)
dead_host_mitigation(dead_hosts)
# If there are VMs that weren't assigned a host
# because there wasn't a host available which
# meets requirement of that VM then we would
# create a new ScheduleVM request for that VM
# on our behalf.
while pending_vms:
pending_vm_entry = pending_vms.pop()
r = RequestEntry.from_scratch(type="ScheduleVM",
uuid=pending_vm_entry.uuid,
hostname=pending_vm_entry.hostname,
request_prefix=env_vars.get("REQUEST_PREFIX"))
request_pool.put(r)
elif request_entry.type == RequestType.ScheduleVM:
logger.debug("%s, %s", request_entry.key, request_entry.value)
vm_entry = vm_pool.get(request_entry.uuid)
if vm_entry is None:
logger.info("Trying to act on {} but it is deleted".format(request_entry.uuid))
continue
etcd_client.client.delete(request_entry.key) # consume Request
# If the Request is about a VM which is labelled as "migration"
# and has a destination
if hasattr(request_entry, "migration") and request_entry.migration \
and hasattr(request_entry, "destination") and request_entry.destination:
try:
get_suitable_host(vm_specs=vm_entry.specs,
hosts=[host_pool.get(request_entry.destination)])
except NoSuitableHostFound:
logger.info("Requested destination host doesn't have enough capacity"
"to hold %s" % vm_entry.uuid)
else:
r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
uuid=request_entry.uuid,
destination=request_entry.destination,
request_prefix=env_vars.get("REQUEST_PREFIX"))
request_pool.put(r)
# If the Request is about a VM that just want to get started/created
else:
# assign_host only returns None when we couldn't be able to assign
# a host to a VM because of resource constraints
try:
assign_host(vm_entry)
except NoSuitableHostFound:
vm_entry.add_log("Can't schedule VM. No Resource Left.")
vm_pool.put(vm_entry)
pending_vms.append(vm_entry)
logger.info("No Resource Left. Emailing admin....")
if __name__ == "__main__":
main()

View file

@ -23,28 +23,28 @@ class CustomConfigParser(configparser.RawConfigParser):
class Settings(object): class Settings(object):
def __init__(self, config_key='/uncloud/config/'): def __init__(self, config_key='/uncloud/config/'):
conf_name = 'ucloud.conf' conf_name = 'ucloud.conf'
conf_dir = os.environ.get('UCLOUD_CONF_DIR', '/etc/ucloud') conf_dir = os.environ.get('UCLOUD_CONF_DIR', os.path.expanduser('~/ucloud/'))
config_file = os.path.join(conf_dir, conf_name) self.config_file = os.path.join(conf_dir, conf_name)
self.config_parser = CustomConfigParser(allow_no_value=True) self.config_parser = CustomConfigParser(allow_no_value=True)
self.config_key = config_key self.config_key = config_key
self.read_internal_values() self.read_internal_values()
self.read_config_file_values(config_file) self.config_parser.read(self.config_file)
self.etcd_wrapper_args = tuple()
self.etcd_wrapper_kwargs = {
'host': self.config_parser['etcd']['url'],
'port': self.config_parser['etcd']['port'],
'ca_cert': self.config_parser['etcd']['ca_cert'],
'cert_cert': self.config_parser['etcd']['cert_cert'],
'cert_key': self.config_parser['etcd']['cert_key']
}
def get_etcd_client(self): def get_etcd_client(self):
args = self.etcd_wrapper_args args = tuple()
kwargs = self.etcd_wrapper_kwargs try:
kwargs = {
'host': self.config_parser.get('etcd', 'url'),
'port': self.config_parser.get('etcd', 'port'),
'ca_cert': self.config_parser.get('etcd', 'ca_cert'),
'cert_cert': self.config_parser.get('etcd','cert_cert'),
'cert_key': self.config_parser.get('etcd','cert_key')
}
except configparser.Error as err:
raise configparser.Error('{} in config file {}'.format(err.message, self.config_file)) from err
return Etcd3Wrapper(*args, **kwargs) return Etcd3Wrapper(*args, **kwargs)
def read_internal_values(self): def read_internal_values(self):
@ -78,9 +78,11 @@ class Settings(object):
if config_from_etcd: if config_from_etcd:
self.config_parser.read_dict(config_from_etcd.value) self.config_parser.read_dict(config_from_etcd.value)
else: else:
return raise KeyError("Key '{}' not found in etcd".format(self.config_key))
sys.exit("No settings found in etcd at key {}".format(self.config_key))
def __getitem__(self, key): def __getitem__(self, key):
self.read_values_from_etcd() self.read_values_from_etcd()
return self.config_parser[key] return self.config_parser[key]
settings = Settings()

30
ucloud/shared/__init__.py Normal file
View file

@ -0,0 +1,30 @@
from ucloud.settings import settings
from ucloud.common.vm import VmPool
from ucloud.common.host import HostPool
from ucloud.common.request import RequestPool
from ucloud.common.storage_handlers import get_storage_handler
class Shared:
@property
def etcd_client(self):
return settings.get_etcd_client()
@property
def host_pool(self):
return HostPool(self.etcd_client, settings['etcd']['host_prefix'])
@property
def vm_pool(self):
return VmPool(self.etcd_client, settings['etcd']['vm_prefix'])
@property
def request_pool(self):
return RequestPool(self.etcd_client, settings['etcd']['request_prefix'])
@property
def storage_handler(self):
return get_storage_handler()
shared = Shared()