uncloud/ucloud/host/virtualmachine.py

386 lines
15 KiB
Python
Executable file

# QEMU Manual
# https://qemu.weilnetz.de/doc/qemu-doc.html
# For QEMU Monitor Protocol Commands Information, See
# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor
import os
import subprocess as sp
import tempfile
import time
import ipaddress
from functools import wraps
from string import Template
from typing import Union
from os.path import join as join_path
import bitmath
import sshtunnel
from ucloud.common.helpers import get_ipv6_address
from ucloud.common.request import RequestEntry, RequestType
from ucloud.common.vm import VMEntry, VMStatus
from ucloud.common.network import create_dev, delete_network_interface, find_free_port
from ucloud.host import logger
from ucloud.shared import shared
from ucloud.settings import settings
from . import qmp
class VM:
def __init__(self, key, handle, vnc_socket_file):
self.key = key # type: str
self.handle = handle # type: qmp.QEMUMachine
self.vnc_socket_file = vnc_socket_file # type: tempfile.NamedTemporaryFile
def __repr__(self):
return "VM({})".format(self.key)
def capture_all_exception(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except Exception:
logger.exception('Unhandled exception occur in %s. For more details see Syslog.', __name__)
return wrapper
class VMM:
def __init__(self):
self.etcd_client = shared.etcd_client
self.storage_handler = shared.storage_handler
self.running_vms = []
def get_start_command_args(self, vm_entry, vnc_sock_filename: str, migration=False, migration_port=None):
threads_per_core = 1
vm_memory = int(bitmath.parse_string_unsafe(vm_entry.specs['ram']).to_MB())
vm_cpus = int(vm_entry.specs['cpu'])
vm_uuid = vm_entry.uuid
vm_networks = vm_entry.network
command = '-name {}_{}'.format(vm_entry.owner, vm_entry.name)
command += ' -drive file={},format=raw,if=virtio,cache=none'.format(
self.storage_handler.qemu_path_string(vm_uuid)
)
command += ' -device virtio-rng-pci -vnc unix:{}'.format(vnc_sock_filename)
command += ' -m {} -smp cores={},threads={}'.format(
vm_memory, vm_cpus, threads_per_core
)
if migration:
command += ' -incoming tcp:[::]:{}'.format(migration_port)
for network_mac_and_tap in vm_networks:
network_name, mac, tap = network_mac_and_tap
_key = os.path.join(settings['etcd']['network_prefix'], vm_entry.owner, network_name)
network = self.etcd_client.get(_key, value_in_json=True)
network_type = network.value["type"]
network_id = str(network.value["id"])
network_ipv6 = network.value["ipv6"]
if network_type == "vxlan":
tap = create_vxlan_br_tap(_id=network_id,
_dev=settings['network']['vxlan_phy_dev'],
tap_id=tap,
ip=network_ipv6)
all_networks = self.etcd_client.get_prefix('/v1/network/', value_in_json=True)
if ipaddress.ip_network(network_ipv6).is_global:
update_radvd_conf(all_networks)
command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" \
" -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}" \
.format(tap=tap, net_id=network_id, mac=mac)
return command.split(" ")
def create_vm_object(self, vm_entry, migration=False, migration_port=None):
vnc_sock_file = tempfile.NamedTemporaryFile()
qemu_args = self.get_start_command_args(
vm_entry=vm_entry,
vnc_sock_filename=vnc_sock_file.name,
migration=migration,
migration_port=migration_port,
)
qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args)
return VM(vm_entry.key, qemu_machine, vnc_sock_file)
@staticmethod
def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
return next((vm for vm in vm_list if vm.key == vm_key), None)
@capture_all_exception
def create(self, vm_entry: VMEntry):
if self.storage_handler.is_vm_image_exists(vm_entry.uuid):
# File Already exists. No Problem Continue
logger.debug("Image for vm %s exists", vm_entry.uuid)
return None
else:
vm_hdd = int(bitmath.parse_string_unsafe(vm_entry.specs["os-ssd"]).to_MB())
if self.storage_handler.make_vm_image(src=vm_entry.image_uuid, dest=vm_entry.uuid):
if not self.storage_handler.resize_vm_image(path=vm_entry.uuid, size=vm_hdd):
vm_entry.status = VMStatus.error
else:
logger.info("New VM Created")
@capture_all_exception
def start(self, vm_entry: VMEntry, destination_host_key=None):
_vm = self.get_vm(self.running_vms, vm_entry.key)
# VM already running. No need to proceed further.
if _vm:
logger.info("VM %s already running" % vm_entry.uuid)
return
else:
logger.info("Trying to start %s" % vm_entry.uuid)
if destination_host_key:
migration_port = find_free_port()
self.launch_vm(vm_entry, migration=True, migration_port=migration_port,
destination_host_key=destination_host_key)
else:
self.create(vm_entry)
self.launch_vm(vm_entry)
@capture_all_exception
def stop(self, vm_entry):
vm = self.get_vm(self.running_vms, vm_entry.key)
vm.handle.shutdown()
if not vm.handle.is_running():
vm_entry.add_log("Shutdown successfully")
vm_entry.declare_stopped()
shared.vm_pool.put(vm_entry)
self.running_vms.remove(vm)
delete_vm_network(vm_entry)
@capture_all_exception
def delete(self, vm_entry):
logger.info("Deleting VM | %s", vm_entry)
self.stop(vm_entry)
if self.storage_handler.is_vm_image_exists(vm_entry.uuid):
r_status = self.storage_handler.delete_vm_image(vm_entry.uuid)
if r_status:
shared.etcd_client.client.delete(vm_entry.key)
else:
shared.etcd_client.client.delete(vm_entry.key)
@capture_all_exception
def transfer(self, request_event):
# This function would run on source host i.e host on which the vm
# is running initially. This host would be responsible for transferring
# vm state to destination host.
_host, _port = request_event.parameters["host"], request_event.parameters["port"]
_uuid = request_event.uuid
_destination = request_event.destination_host_key
vm = self.get_vm(self.running_vms, join_path(settings['etcd']['vm_prefix'], _uuid))
if vm:
tunnel = sshtunnel.SSHTunnelForwarder(
_host,
ssh_username=settings['ssh']['username'],
ssh_pkey=settings['ssh']['private_key_path'],
remote_bind_address=("127.0.0.1", _port),
ssh_proxy_enabled=True,
ssh_proxy=(_host, 22)
)
try:
tunnel.start()
except sshtunnel.BaseSSHTunnelForwarderError:
logger.exception("Couldn't establish connection to (%s, 22)", _host)
else:
vm.handle.command(
"migrate", uri="tcp:0.0.0.0:{}".format(tunnel.local_bind_port)
)
status = vm.handle.command("query-migrate")["status"]
while status not in ["failed", "completed"]:
time.sleep(2)
status = vm.handle.command("query-migrate")["status"]
with shared.vm_pool.get_put(request_event.uuid) as source_vm:
if status == "failed":
source_vm.add_log("Migration Failed")
elif status == "completed":
# If VM is successfully migrated then shutdown the VM
# on this host and update hostname to destination host key
source_vm.add_log("Successfully migrated")
source_vm.hostname = _destination
self.running_vms.remove(vm)
vm.handle.shutdown()
source_vm.in_migration = False # VM transfer finished
finally:
tunnel.close()
@capture_all_exception
def launch_vm(self, vm_entry, migration=False, migration_port=None, destination_host_key=None):
logger.info("Starting %s" % vm_entry.key)
vm = self.create_vm_object(vm_entry, migration=migration, migration_port=migration_port)
try:
vm.handle.launch()
except Exception:
logger.exception("Error Occured while starting VM")
vm.handle.shutdown()
if migration:
# We don't care whether MachineError or any other error occurred
pass
else:
# Error during typical launch of a vm
vm.handle.shutdown()
vm_entry.declare_killed()
shared.vm_pool.put(vm_entry)
else:
vm_entry.vnc_socket = vm.vnc_socket_file.name
self.running_vms.append(vm)
if migration:
vm_entry.in_migration = True
r = RequestEntry.from_scratch(
type=RequestType.TransferVM,
hostname=vm_entry.hostname,
parameters={"host": get_ipv6_address(), "port": migration_port},
uuid=vm_entry.uuid,
destination_host_key=destination_host_key,
request_prefix=settings['etcd']['request_prefix']
)
shared.request_pool.put(r)
else:
# Typical launching of a vm
vm_entry.status = VMStatus.running
vm_entry.add_log("Started successfully")
shared.vm_pool.put(vm_entry)
@capture_all_exception
def maintenance(self, host):
# To capture vm running according to running_vms list
# This is to capture successful migration of a VM.
# Suppose, this host is running "vm1" and user initiated
# request to migrate this "vm1" to some other host. On,
# successful migration the destination host would set
# the vm hostname to itself. Thus, we are checking
# whether this host vm is successfully migrated. If yes
# then we shutdown "vm1" on this host.
logger.debug("Starting Maintenance!!")
to_be_removed = []
for running_vm in self.running_vms:
with shared.vm_pool.get_put(running_vm.key) as vm_entry:
if vm_entry.hostname != host.key and not vm_entry.in_migration:
running_vm.handle.shutdown()
logger.info("VM migration not completed successfully.")
to_be_removed.append(running_vm)
for r in to_be_removed:
self.running_vms.remove(r)
# To check vm running according to etcd entries
alleged_running_vms = shared.vm_pool.by_status("RUNNING", shared.vm_pool.by_host(host.key))
for vm_entry in alleged_running_vms:
_vm = self.get_vm(self.running_vms, vm_entry.key)
# Whether, the allegedly running vm is in our
# running_vms list or not if it is said to be
# running on this host but it is not then we
# need to shut it down
# This is to capture poweroff/shutdown of a VM
# initiated by user inside VM. OR crash of VM by some
# user running process
if (_vm and not _vm.handle.is_running()) or not _vm:
logger.debug("_vm = %s, is_running() = %s" % (_vm, _vm.handle.is_running()))
vm_entry.add_log("""{} is not running but is said to be running.
So, shutting it down and declare it killed""".format(vm_entry.key))
vm_entry.declare_killed()
shared.vm_pool.put(vm_entry)
if _vm:
self.running_vms.remove(_vm)
def resolve_network(network_name, network_owner):
network = shared.etcd_client.get(join_path(settings['etcd']['network_prefix'],
network_owner,
network_name),
value_in_json=True)
return network
def delete_vm_network(vm_entry):
try:
for network in vm_entry.network:
network_name = network[0]
tap_mac = network[1]
tap_id = network[2]
delete_network_interface('tap{}'.format(tap_id))
owners_vms = shared.vm_pool.by_owner(vm_entry.owner)
owners_running_vms = shared.vm_pool.by_status(VMStatus.running,
_vms=owners_vms)
networks = map(
lambda n: n[0], map(lambda vm: vm.network, owners_running_vms)
)
networks_in_use_by_user_vms = [vm[0] for vm in networks]
if network_name not in networks_in_use_by_user_vms:
network_entry = resolve_network(network[0], vm_entry.owner)
if network_entry:
network_type = network_entry.value["type"]
network_id = network_entry.value["id"]
if network_type == "vxlan":
delete_network_interface('br{}'.format(network_id))
delete_network_interface('vxlan{}'.format(network_id))
except Exception:
logger.exception("Exception in network interface deletion")
def create_vxlan_br_tap(_id, _dev, tap_id, ip=None):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'),
_id=_id, dev=_dev)
if vxlan:
bridge = create_dev(script=os.path.join(network_script_base, 'create-bridge.sh'),
_id=_id, dev=vxlan, ip=ip)
if bridge:
tap = create_dev(script=os.path.join(network_script_base, 'create-tap.sh'),
_id=str(tap_id), dev=bridge)
if tap:
return tap
def update_radvd_conf(all_networks):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
networks = {
net.value['ipv6']: net.value['id']
for net in all_networks
if net.value.get('ipv6') and ipaddress.ip_network(net.value.get('ipv6')).is_global
}
radvd_template = open(os.path.join(network_script_base,
'radvd-template.conf'), 'r').read()
radvd_template = Template(radvd_template)
content = [
radvd_template.safe_substitute(
bridge='br{}'.format(networks[net]),
prefix=net
)
for net in networks if networks.get(net)
]
with open('/etc/radvd.conf', 'w') as radvd_conf:
radvd_conf.writelines(content)
try:
sp.check_output(['systemctl', 'restart', 'radvd'])
except Exception:
sp.check_output(['service', 'radvd', 'restart'])