Refactoring, VMM added, uncloud-host mostly new, migration is better now

This commit is contained in:
ahmadbilalkhalid 2019-12-28 15:39:11 +05:00
parent cd9d4cb78c
commit ba515f0b48
12 changed files with 423 additions and 364 deletions

View file

@ -40,7 +40,7 @@ setup(name='ucloud',
'colorama',
'sphinx-rtd-theme',
'etcd3 @ https://github.com/kragniz/python-etcd3/tarball/master#egg=etcd3',
'werkzeug'
'werkzeug', 'marshmallow'
],
scripts=['scripts/ucloud'],
data_files=[(os.path.expanduser('~/ucloud/'), ['conf/ucloud.conf'])],

View file

@ -1,7 +1,6 @@
import json
import pynetbox
import logging
import urllib3
from uuid import uuid4
from os.path import join as join_path
@ -78,6 +77,7 @@ class CreateVM(Resource):
"vnc_socket": "",
"network": list(zip(data["network"], macs, tap_ids)),
"metadata": {"ssh-keys": []},
"in_migration": False
}
shared.etcd_client.put(vm_key, vm_entry, value_in_json=True)
@ -216,16 +216,13 @@ class VMMigration(Resource):
if validator.is_valid():
vm = shared.vm_pool.get(data["uuid"])
r = RequestEntry.from_scratch(
type=RequestType.ScheduleVM,
r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
uuid=vm.uuid,
destination=join_path(
hostname=join_path(
settings['etcd']['host_prefix'], validator.destination.value
),
migration=True,
request_prefix=settings['etcd']['request_prefix']
)
request_prefix=settings['etcd']['request_prefix'])
shared.request_pool.put(r)
return {"message": "VM Migration Initialization Queued"}, 200
else:

View file

@ -30,7 +30,7 @@ def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='
def create_dev(script, _id, dev, ip=None):
command = [script, _id, dev]
command = [script, str(_id), dev]
if ip:
command.append(ip)
try:

View file

@ -19,6 +19,7 @@ class RequestType:
class RequestEntry(SpecificEtcdEntryBase):
def __init__(self, e):
self.destination_host_key = None
self.type = None # type: str
self.migration = None # type: bool
self.destination = None # type: str

39
ucloud/common/schemas.py Normal file
View file

@ -0,0 +1,39 @@
import bitmath
from marshmallow import fields, Schema
class StorageUnit(fields.Field):
def _serialize(self, value, attr, obj, **kwargs):
return str(value)
def _deserialize(self, value, attr, data, **kwargs):
return bitmath.parse_string_unsafe(value)
class SpecsSchema(Schema):
cpu = fields.Int()
ram = StorageUnit()
os_ssd = StorageUnit(data_key='os-ssd', attribute='os-ssd')
hdd = fields.List(StorageUnit())
class VMSchema(Schema):
name = fields.Str()
owner = fields.Str()
owner_realm = fields.Str()
specs = fields.Nested(SpecsSchema)
status = fields.Str()
log = fields.List(fields.Str())
vnc_socket = fields.Str()
image_uuid = fields.Str()
hostname = fields.Str()
metadata = fields.Dict()
network = fields.List(fields.Tuple((fields.Str(), fields.Str(), fields.Int())))
in_migration = fields.Bool()
class NetworkSchema(Schema):
_id = fields.Int(data_key='id', attribute='id')
_type = fields.Str(data_key='type', attribute='type')
ipv6 = fields.Str()

View file

@ -19,8 +19,8 @@ class ImageStorageHandler(ABC):
def import_image(self, image_src, image_dest, protect=False):
"""Put an image at the destination
:param src: An Image file
:param dest: A path where :param src: is to be put.
:param image_src: An Image file
:param image_dest: A path where :param src: is to be put.
:param protect: If protect is true then the dest is protect (readonly etc)
The obj must exist on filesystem.
"""
@ -30,8 +30,8 @@ class ImageStorageHandler(ABC):
def make_vm_image(self, image_path, path):
"""Copy image from src to dest
:param src: A path
:param dest: A path
:param image_path: A path
:param path: A path
src and destination must be on same storage system i.e both on file system or both on CEPH etc.
"""

View file

@ -12,6 +12,12 @@ class VMStatus:
error = "ERROR" # An error occurred that cannot be resolved automatically
def declare_stopped(vm):
vm['hostname'] = ''
vm['in_migration'] = False
vm['status'] = VMStatus.stopped
class VMEntry(SpecificEtcdEntryBase):
def __init__(self, e):

View file

@ -1,17 +1,16 @@
import argparse
import multiprocessing as mp
import time
import sys
from ucloud.common.request import RequestEntry, RequestType
from ucloud.common.host import HostPool
from ucloud.shared import shared
from ucloud.settings import settings
from ucloud.common.vm import VMStatus
from ucloud.vmm import VMM
from os.path import join as join_path
from . import virtualmachine, logger
vmm = virtualmachine.VMM()
def update_heartbeat(hostname):
"""Update Last HeartBeat Time for :param hostname: in etcd"""
@ -25,6 +24,16 @@ def update_heartbeat(hostname):
time.sleep(10)
def maintenance():
vmm = VMM()
running_vms = vmm.discover()
for vm_uuid in running_vms:
if vmm.is_running(vm_uuid) and vmm.get_status(vm_uuid) == 'running':
vm = shared.vm_pool.get(join_path(settings['etcd']['vm_prefix'], vm_uuid))
vm.status = VMStatus.running
shared.vm_pool.put(vm)
def main(hostname):
host_pool = shared.host_pool
host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None)
@ -34,8 +43,7 @@ def main(hostname):
heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,))
heartbeat_updating_process.start()
except Exception as e:
logger.exception(e)
sys.exit("No Need To Go Further. ucloud-host heartbeat updating mechanism is not working")
raise e.__class__('ucloud-host heartbeat updating mechanism is not working') from e
for events_iterator in [
shared.etcd_client.get_prefix(settings['etcd']['request_prefix'], value_in_json=True),
@ -45,36 +53,37 @@ def main(hostname):
request_event = RequestEntry(request_event)
if request_event.type == "TIMEOUT":
vmm.maintenance(host)
continue
maintenance()
# If the event is directed toward me OR I am destination of a InitVMMigration
if request_event.hostname == host.key or request_event.destination == host.key:
if request_event.hostname == host.key:
logger.debug("VM Request: %s", request_event)
shared.request_pool.client.client.delete(request_event.key)
vm_entry = shared.vm_pool.get(request_event.uuid)
vm_entry = shared.etcd_client.get(join_path(settings['etcd']['vm_prefix'], request_event.uuid))
if vm_entry:
vm = virtualmachine.VM(vm_entry)
if request_event.type == RequestType.StartVM:
vmm.start(vm_entry)
vm.start()
elif request_event.type == RequestType.StopVM:
vmm.stop(vm_entry)
vm.stop()
elif request_event.type == RequestType.DeleteVM:
vmm.delete(vm_entry)
vm.delete()
elif request_event.type == RequestType.InitVMMigration:
vmm.start(vm_entry, host.key)
vm.start(destination_host_key=host.key)
elif request_event.type == RequestType.TransferVM:
vmm.transfer(request_event)
host = host_pool.get(request_event.destination_host_key)
if host:
vm.migrate(destination=host.hostname)
else:
logger.error('Host %s not found!', request_event.destination_host_key)
else:
logger.info("VM Entry missing")
logger.info("Running VMs %s", vmm.running_vms)
if __name__ == "__main__":
argparser = argparse.ArgumentParser()

View file

@ -6,325 +6,138 @@
import os
import subprocess as sp
import tempfile
import time
import ipaddress
from functools import wraps
from string import Template
from typing import Union
from os.path import join as join_path
import bitmath
import sshtunnel
from ucloud.common.helpers import get_ipv6_address
from ucloud.common.request import RequestEntry, RequestType
from ucloud.common.vm import VMEntry, VMStatus
from ucloud.common.network import create_dev, delete_network_interface, find_free_port
from ucloud.common.vm import VMStatus, declare_stopped
from ucloud.common.network import create_dev, delete_network_interface
from ucloud.common.schemas import VMSchema, NetworkSchema
from ucloud.host import logger
from ucloud.shared import shared
from ucloud.settings import settings
from ucloud.vmm import VMM
from . import qmp
from marshmallow import ValidationError
def maintenance():
pass
class VM:
def __init__(self, key, handle, vnc_socket_file):
self.key = key # type: str
self.handle = handle # type: qmp.QEMUMachine
self.vnc_socket_file = vnc_socket_file # type: tempfile.NamedTemporaryFile
def __repr__(self):
return "VM({})".format(self.key)
def capture_all_exception(func):
@wraps(func)
def wrapper(*args, **kwargs):
def __init__(self, vm_entry):
self.schema = VMSchema()
self.vmm = VMM()
self.key = vm_entry.key
try:
func(*args, **kwargs)
except Exception:
logger.exception('Unhandled exception occur in %s. For more details see Syslog.', __name__)
return wrapper
class VMM:
def __init__(self):
self.etcd_client = shared.etcd_client
self.storage_handler = shared.storage_handler
self.running_vms = []
def get_start_command_args(self, vm_entry, vnc_sock_filename: str, migration=False, migration_port=None):
threads_per_core = 1
vm_memory = int(bitmath.parse_string_unsafe(vm_entry.specs['ram']).to_MB())
vm_cpus = int(vm_entry.specs['cpu'])
vm_uuid = vm_entry.uuid
vm_networks = vm_entry.network
command = '-name {}_{}'.format(vm_entry.owner, vm_entry.name)
command += ' -drive file={},format=raw,if=virtio,cache=none'.format(
self.storage_handler.qemu_path_string(vm_uuid)
)
command += ' -device virtio-rng-pci -vnc unix:{}'.format(vnc_sock_filename)
command += ' -m {} -smp cores={},threads={}'.format(
vm_memory, vm_cpus, threads_per_core
)
if migration:
command += ' -incoming tcp:[::]:{}'.format(migration_port)
for network_mac_and_tap in vm_networks:
network_name, mac, tap = network_mac_and_tap
_key = os.path.join(settings['etcd']['network_prefix'], vm_entry.owner, network_name)
network = self.etcd_client.get(_key, value_in_json=True)
network_type = network.value["type"]
network_id = str(network.value["id"])
network_ipv6 = network.value["ipv6"]
if network_type == "vxlan":
tap = create_vxlan_br_tap(_id=network_id,
_dev=settings['network']['vxlan_phy_dev'],
tap_id=tap,
ip=network_ipv6)
all_networks = self.etcd_client.get_prefix('/v1/network/', value_in_json=True)
if ipaddress.ip_network(network_ipv6).is_global:
update_radvd_conf(all_networks)
command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" \
" -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}" \
.format(tap=tap, net_id=network_id, mac=mac)
return command.split(" ")
def create_vm_object(self, vm_entry, migration=False, migration_port=None):
vnc_sock_file = tempfile.NamedTemporaryFile()
qemu_args = self.get_start_command_args(
vm_entry=vm_entry,
vnc_sock_filename=vnc_sock_file.name,
migration=migration,
migration_port=migration_port,
)
qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args)
return VM(vm_entry.key, qemu_machine, vnc_sock_file)
@staticmethod
def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
return next((vm for vm in vm_list if vm.key == vm_key), None)
@capture_all_exception
def create(self, vm_entry: VMEntry):
if self.storage_handler.is_vm_image_exists(vm_entry.uuid):
# File Already exists. No Problem Continue
logger.debug("Image for vm %s exists", vm_entry.uuid)
return None
self.vm = self.schema.loads(vm_entry.value)
except ValidationError:
logger.exception('Couldn\'t validate VM Entry', vm_entry.value)
self.vm = None
else:
vm_hdd = int(bitmath.parse_string_unsafe(vm_entry.specs["os-ssd"]).to_MB())
if self.storage_handler.make_vm_image(src=vm_entry.image_uuid, dest=vm_entry.uuid):
if not self.storage_handler.resize_vm_image(path=vm_entry.uuid, size=vm_hdd):
vm_entry.status = VMStatus.error
else:
logger.info("New VM Created")
self.uuid = vm_entry.key.split('/')[-1]
self.host_key = self.vm['hostname']
@capture_all_exception
def start(self, vm_entry: VMEntry, destination_host_key=None):
_vm = self.get_vm(self.running_vms, vm_entry.key)
def get_qemu_args(self):
command = (
'-name {owner}_{name}'
' -drive file={file},format=raw,if=virtio,cache=none'
' -device virtio-rng-pci'
' -m {memory} -smp cores={cores},threads={threads}'
).format(owner=self.vm['owner'], name=self.vm['name'],
memory=int(self.vm['specs']['ram'].to_MB()), cores=self.vm['specs']['cpu'],
threads=1, file=shared.storage_handler.qemu_path_string(self.uuid))
# VM already running. No need to proceed further.
if _vm:
logger.info("VM %s already running" % vm_entry.uuid)
return
else:
logger.info("Trying to start %s" % vm_entry.uuid)
return command.split(' ')
def start(self, destination_host_key=None):
migration = False
if destination_host_key:
migration_port = find_free_port()
self.launch_vm(vm_entry, migration=True, migration_port=migration_port,
destination_host_key=destination_host_key)
else:
self.create(vm_entry)
self.launch_vm(vm_entry)
migration = True
@capture_all_exception
def stop(self, vm_entry):
vm = self.get_vm(self.running_vms, vm_entry.key)
vm.handle.shutdown()
if not vm.handle.is_running():
vm_entry.add_log("Shutdown successfully")
vm_entry.declare_stopped()
shared.vm_pool.put(vm_entry)
self.running_vms.remove(vm)
delete_vm_network(vm_entry)
@capture_all_exception
def delete(self, vm_entry):
logger.info("Deleting VM | %s", vm_entry)
self.stop(vm_entry)
if self.storage_handler.is_vm_image_exists(vm_entry.uuid):
r_status = self.storage_handler.delete_vm_image(vm_entry.uuid)
if r_status:
shared.etcd_client.client.delete(vm_entry.key)
else:
shared.etcd_client.client.delete(vm_entry.key)
@capture_all_exception
def transfer(self, request_event):
# This function would run on source host i.e host on which the vm
# is running initially. This host would be responsible for transferring
# vm state to destination host.
_host, _port = request_event.parameters["host"], request_event.parameters["port"]
_uuid = request_event.uuid
_destination = request_event.destination_host_key
vm = self.get_vm(self.running_vms, join_path(settings['etcd']['vm_prefix'], _uuid))
if vm:
tunnel = sshtunnel.SSHTunnelForwarder(
_host,
ssh_username=settings['ssh']['username'],
ssh_pkey=settings['ssh']['private_key_path'],
remote_bind_address=("127.0.0.1", _port),
ssh_proxy_enabled=True,
ssh_proxy=(_host, 22)
)
self.create()
try:
tunnel.start()
except sshtunnel.BaseSSHTunnelForwarderError:
logger.exception("Couldn't establish connection to (%s, 22)", _host)
network_args = self.create_network_dev()
except Exception as err:
declare_stopped(self.vm)
self.vm['log'].append('Cannot Setup Network Properly')
logger.error('Cannot Setup Network Properly for vm %s', self.uuid, exc_info=err)
else:
vm.handle.command(
"migrate", uri="tcp:0.0.0.0:{}".format(tunnel.local_bind_port)
)
self.vmm.start(uuid=self.uuid, migration=migration,
*self.get_qemu_args(), *network_args)
status = vm.handle.command("query-migrate")["status"]
while status not in ["failed", "completed"]:
time.sleep(2)
status = vm.handle.command("query-migrate")["status"]
with shared.vm_pool.get_put(request_event.uuid) as source_vm:
if status == "failed":
source_vm.add_log("Migration Failed")
elif status == "completed":
# If VM is successfully migrated then shutdown the VM
# on this host and update hostname to destination host key
source_vm.add_log("Successfully migrated")
source_vm.hostname = _destination
self.running_vms.remove(vm)
vm.handle.shutdown()
source_vm.in_migration = False # VM transfer finished
finally:
tunnel.close()
@capture_all_exception
def launch_vm(self, vm_entry, migration=False, migration_port=None, destination_host_key=None):
logger.info("Starting %s" % vm_entry.key)
vm = self.create_vm_object(vm_entry, migration=migration, migration_port=migration_port)
try:
vm.handle.launch()
except Exception:
logger.exception("Error Occured while starting VM")
vm.handle.shutdown()
if migration:
# We don't care whether MachineError or any other error occurred
pass
else:
# Error during typical launch of a vm
vm.handle.shutdown()
vm_entry.declare_killed()
shared.vm_pool.put(vm_entry)
else:
vm_entry.vnc_socket = vm.vnc_socket_file.name
self.running_vms.append(vm)
if migration:
vm_entry.in_migration = True
status = self.vmm.get_status(self.uuid)
if status == 'running':
self.vm['status'] = VMStatus.running
self.vm['vnc_socket'] = self.vmm.get_vnc(self.uuid)
elif status == 'inmigrate':
r = RequestEntry.from_scratch(
type=RequestType.TransferVM,
hostname=vm_entry.hostname,
parameters={"host": get_ipv6_address(), "port": migration_port},
uuid=vm_entry.uuid,
destination_host_key=destination_host_key,
type=RequestType.TransferVM, # Transfer VM
hostname=self.host_key, # Which VM should get this request. It is source host
uuid=self.uuid, # uuid of VM
destination_host_key=destination_host_key, # Where source host transfer VM
request_prefix=settings['etcd']['request_prefix']
)
shared.request_pool.put(r)
else:
# Typical launching of a vm
vm_entry.status = VMStatus.running
vm_entry.add_log("Started successfully")
self.stop()
declare_stopped(self.vm)
shared.vm_pool.put(vm_entry)
self.sync()
@capture_all_exception
def maintenance(self, host):
# To capture vm running according to running_vms list
def stop(self):
self.vmm.stop(self.uuid)
self.delete_network_dev()
declare_stopped(self.vm)
self.sync()
# This is to capture successful migration of a VM.
# Suppose, this host is running "vm1" and user initiated
# request to migrate this "vm1" to some other host. On,
# successful migration the destination host would set
# the vm hostname to itself. Thus, we are checking
# whether this host vm is successfully migrated. If yes
# then we shutdown "vm1" on this host.
logger.debug("Starting Maintenance!!")
to_be_removed = []
for running_vm in self.running_vms:
with shared.vm_pool.get_put(running_vm.key) as vm_entry:
if vm_entry.hostname != host.key and not vm_entry.in_migration:
running_vm.handle.shutdown()
logger.info("VM migration not completed successfully.")
to_be_removed.append(running_vm)
def migrate(self, destination):
self.vmm.transfer(src_uuid=self.uuid, dest_uuid=self.uuid, host=destination)
for r in to_be_removed:
self.running_vms.remove(r)
def create_network_dev(self):
command = ''
for network_mac_and_tap in self.vm['network']:
network_name, mac, tap = network_mac_and_tap
# To check vm running according to etcd entries
alleged_running_vms = shared.vm_pool.by_status("RUNNING", shared.vm_pool.by_host(host.key))
for vm_entry in alleged_running_vms:
_vm = self.get_vm(self.running_vms, vm_entry.key)
# Whether, the allegedly running vm is in our
# running_vms list or not if it is said to be
# running on this host but it is not then we
# need to shut it down
# This is to capture poweroff/shutdown of a VM
# initiated by user inside VM. OR crash of VM by some
# user running process
if (_vm and not _vm.handle.is_running()) or not _vm:
logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running()))
vm_entry.add_log("""{} is not running but is said to be running.
So, shutting it down and declare it killed""".format(vm_entry.key))
vm_entry.declare_killed()
shared.vm_pool.put(vm_entry)
if _vm:
self.running_vms.remove(_vm)
def resolve_network(network_name, network_owner):
network = shared.etcd_client.get(join_path(settings['etcd']['network_prefix'],
network_owner,
network_name),
value_in_json=True)
return network
def delete_vm_network(vm_entry):
_key = os.path.join(settings['etcd']['network_prefix'], self.vm['owner'], network_name)
network = shared.etcd_client.get(_key, value_in_json=True)
network_schema = NetworkSchema()
try:
for network in vm_entry.network:
network = network_schema.load(network.value)
except ValidationError:
continue
if network['type'] == "vxlan":
tap = create_vxlan_br_tap(_id=network['id'],
_dev=settings['network']['vxlan_phy_dev'],
tap_id=tap,
ip=network['ipv6'])
all_networks = shared.etcd_client.get_prefix(settings['etcd']['network_prefix'],
value_in_json=True)
if ipaddress.ip_network(network['ipv6']).is_global:
update_radvd_conf(all_networks)
command += '-netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no' \
' -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}' \
.format(tap=tap, net_id=network['id'], mac=mac)
return command.split(' ')
def delete_network_dev(self):
try:
for network in self.vm['network']:
network_name = network[0]
tap_mac = network[1]
_ = network[1] # tap_mac
tap_id = network[2]
delete_network_interface('tap{}'.format(tap_id))
owners_vms = shared.vm_pool.by_owner(vm_entry.owner)
owners_vms = shared.vm_pool.by_owner(self.vm['owner'])
owners_running_vms = shared.vm_pool.by_status(VMStatus.running,
_vms=owners_vms)
@ -333,7 +146,7 @@ def delete_vm_network(vm_entry):
)
networks_in_use_by_user_vms = [vm[0] for vm in networks]
if network_name not in networks_in_use_by_user_vms:
network_entry = resolve_network(network[0], vm_entry.owner)
network_entry = resolve_network(network[0], self.vm['owner'])
if network_entry:
network_type = network_entry.value["type"]
network_id = network_entry.value["id"]
@ -343,6 +156,38 @@ def delete_vm_network(vm_entry):
except Exception:
logger.exception("Exception in network interface deletion")
def create(self):
if shared.storage_handler.is_vm_image_exists(self.uuid):
# File Already exists. No Problem Continue
logger.debug("Image for vm %s exists", self.uuid)
else:
if shared.storage_handler.make_vm_image(src=self.vm['image_uuid'], dest=self.uuid):
if not shared.storage_handler.resize_vm_image(path=self.uuid,
size=int(self.vm['specs']['os-ssd'].to_MB())):
self.vm['status'] = VMStatus.error
else:
logger.info("New VM Created")
def sync(self):
shared.etcd_client.put(self.key, self.schema.dump(self.vm), value_in_json=True)
def delete(self):
self.stop()
if shared.storage_handler.is_vm_image_exists(self.uuid):
r_status = shared.storage_handler.delete_vm_image(self.uuid)
if r_status:
shared.etcd_client.client.delete(self.key)
else:
shared.etcd_client.client.delete(self.key)
def resolve_network(network_name, network_owner):
network = shared.etcd_client.get(
join_path(settings['etcd']['network_prefix'], network_owner, network_name), value_in_json=True
)
return network
def create_vxlan_br_tap(_id, _dev, tap_id, ip=None):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
@ -377,10 +222,12 @@ def update_radvd_conf(all_networks):
)
for net in networks if networks.get(net)
]
with open('/etc/radvd.conf', 'w') as radvd_conf:
radvd_conf.writelines(content)
try:
sp.check_output(['systemctl', 'restart', 'radvd'])
except Exception:
except sp.CalledProcessError:
try:
sp.check_output(['service', 'radvd', 'restart'])
except sp.CalledProcessError as err:
raise err.__class__('Cannot start/restart radvd service', err.cmd) from err

View file

@ -95,7 +95,7 @@ def dead_host_mitigation(dead_hosts_keys):
vms_hosted_on_dead_host = shared.vm_pool.by_host(host_key)
for vm in vms_hosted_on_dead_host:
vm.declare_killed()
vm.status = 'UNKNOWN'
shared.vm_pool.put(vm)
shared.host_pool.put(host)

View file

@ -56,27 +56,6 @@ def main():
continue
shared.etcd_client.client.delete(request_entry.key) # consume Request
# If the Request is about a VM which is labelled as "migration"
# and has a destination
if hasattr(request_entry, "migration") and request_entry.migration \
and hasattr(request_entry, "destination") and request_entry.destination:
try:
get_suitable_host(vm_specs=vm_entry.specs,
hosts=[shared.host_pool.get(request_entry.destination)])
except NoSuitableHostFound:
logger.info("Requested destination host doesn't have enough capacity"
"to hold %s" % vm_entry.uuid)
else:
r = RequestEntry.from_scratch(type=RequestType.InitVMMigration,
uuid=request_entry.uuid,
destination=request_entry.destination,
request_prefix=settings['etcd']['request_prefix'])
shared.request_pool.put(r)
# If the Request is about a VM that just want to get started/created
else:
# assign_host only returns None when we couldn't be able to assign
# a host to a VM because of resource constraints
try:
assign_host(vm_entry)
except NoSuitableHostFound:

181
ucloud/vmm/__init__.py Normal file
View file

@ -0,0 +1,181 @@
import os
import subprocess as sp
import logging
import socket
import json
import tempfile
import time
from contextlib import suppress
from multiprocessing import Process
from os.path import join as join_path
from os.path import isdir
logger = logging.getLogger(__name__)
class VMQMPHandles:
def __init__(self, path):
self.path = path
self.sock = socket.socket(socket.AF_UNIX)
self.file = self.sock.makefile()
def __enter__(self):
self.sock.connect(self.path)
# eat qmp greetings
self.file.readline()
# init qmp
self.sock.sendall(b'{ "execute": "qmp_capabilities" }')
self.file.readline()
return self.sock, self.file
def __exit__(self, exc_type, exc_val, exc_tb):
self.file.close()
self.sock.close()
if exc_type:
logger.error('Couldn\'t get handle for VM.', exc_type, exc_val, exc_tb)
raise exc_type("Couldn't get handle for VM.") from exc_type
class TransferVM(Process):
def __init__(self, src_uuid, dest_uuid, host, socket_dir):
self.src_uuid = src_uuid
self.dest_uuid = dest_uuid
self.host = host
self.src_sock_path = os.path.join(socket_dir, self.src_uuid)
self.dest_sock_path = os.path.join(socket_dir, self.dest_uuid)
super().__init__()
def run(self):
with suppress(FileNotFoundError):
os.remove(self.src_sock_path)
command = ['ssh', '-nNT', '-L', '{}:{}'.format(self.src_sock_path, self.dest_sock_path),
'root@{}'.format(self.host)]
try:
p = sp.Popen(command)
except Exception as e:
logger.error('Couldn\' forward unix socks over ssh.', exc_info=e)
else:
time.sleep(2)
vmm = VMM()
logger.debug('Executing: ssh forwarding command: %s', command)
vmm.execute_command(self.src_uuid, command='migrate',
arguments={'uri': 'unix:{}'.format(self.src_sock_path)})
while p.poll() is None:
success, output = vmm.execute_command(self.src_uuid, command='query-migrate')
if success:
status = output['return']['status']
if status != 'active':
print('Migration Status: ', status)
return
else:
print('Migration Status: ', status)
else:
return
time.sleep(0.2)
class VMM:
# Virtual Machine Manager
def __init__(self, qemu_path='/usr/bin/qemu-system-x86_64',
vmm_backend=os.path.expanduser('~/ucloud/vmm/')):
self.qemu_path = qemu_path
self.vmm_backend = vmm_backend
self.socket_dir = os.path.join(self.vmm_backend, 'sock')
def is_running(self, uuid):
sock_path = os.path.join(self.vmm_backend, uuid)
try:
sock = socket.socket(socket.AF_UNIX)
sock.connect(sock_path)
recv = sock.recv(4096)
except Exception as err:
# unix sock doesn't exists or it is closed
logger.info('VM %s sock either don\' exists or it is closed.', uuid,
'It mean VM is stopped.', exc_info=err)
else:
# if we receive greetings from qmp it mean VM is running
if len(recv) > 0:
return True
with suppress(FileNotFoundError):
os.remove(sock_path)
return False
def start(self, *args, uuid, migration=False):
# start --> sucess?
migration_args = ()
if migration:
migration_args = ('-incoming', 'unix:{}'.format(os.path.join(self.socket_dir, uuid)))
if self.is_running(uuid):
logger.warning('Cannot start VM. It is already running.')
else:
qmp_arg = ('-qmp', 'unix:{}/{},server,nowait'.format(self.vmm_backend, uuid))
vnc_arg = ('-vnc', 'unix:{}'.format(tempfile.NamedTemporaryFile().name))
command = [self.qemu_path, *args, *qmp_arg, *migration_args, *vnc_arg, '-daemonize']
try:
sp.check_output(command, stderr=sp.PIPE)
except sp.CalledProcessError as err:
logger.exception('Error occurred while starting VM.\nDetail %s', err.stderr.decode('utf-8'))
else:
time.sleep(2)
def execute_command(self, uuid, command, **kwargs):
# execute_command -> sucess?, output
try:
with VMQMPHandles(os.path.join(self.vmm_backend, uuid)) as (sock_handle, file_handle):
command_to_execute = {
'execute': command,
**kwargs
}
sock_handle.sendall(json.dumps(command_to_execute).encode('utf-8'))
output = file_handle.readline()
except Exception as err:
logger.exception('Error occurred while executing command and getting valid output from qmp')
else:
try:
output = json.loads(output)
except:
logger.exception('QMP Output isn\'t valid JSON. %s', output)
else:
return 'return' in output, output
return False, None
def stop(self, uuid):
success, output = self.execute_command(command='quit', uuid=uuid)
return success
def get_status(self, uuid):
success, output = self.execute_command(command='query-status', uuid=uuid)
if success:
return output['return']['status']
else:
return 'STOPPED'
def discover(self):
vms = [
uuid for uuid in os.listdir(self.vmm_backend)
if not isdir(join_path(self.vmm_backend, uuid))
]
return vms
def get_vnc(self, uuid):
success, output = self.execute_command(uuid, command='query-vnc')
if success:
return output['return']['service']
return None
def transfer(self, src_uuid, dest_uuid, host):
p = TransferVM(src_uuid, dest_uuid, socket_dir=self.socket_dir, host=host)
p.start()