uncloud-mravi/ucloud/host/virtualmachine.py

384 lines
13 KiB
Python
Executable file

# QEMU Manual
# https://qemu.weilnetz.de/doc/qemu-doc.html
# For QEMU Monitor Protocol Commands Information, See
# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor
import os
import random
import subprocess as sp
import tempfile
import time
from functools import wraps
from string import Template
from typing import Union
from os.path import join as join_path
import bitmath
import sshtunnel
from ucloud.common.helpers import get_ipv6_address
from ucloud.common.request import RequestEntry, RequestType
from ucloud.common.vm import VMEntry, VMStatus
from ucloud.config import (etcd_client, request_pool,
running_vms, vm_pool, env_vars,
image_storage_handler)
from . import qmp
from ucloud.host import logger
class VM:
def __init__(self, key, handle, vnc_socket_file):
self.key = key # type: str
self.handle = handle # type: qmp.QEMUMachine
self.vnc_socket_file = vnc_socket_file # type: tempfile.NamedTemporaryFile
def __repr__(self):
return "VM({})".format(self.key)
def delete_network_interface(iface):
try:
sp.check_output(['ip', 'link', 'del', iface])
except Exception:
pass
def resolve_network(network_name, network_owner):
network = etcd_client.get(join_path(env_vars.get("NETWORK_PREFIX"),
network_owner,
network_name),
value_in_json=True)
return network
def delete_vm_network(vm_entry):
try:
for network in vm_entry.network:
network_name = network[0]
tap_mac = network[1]
tap_id = network[2]
delete_network_interface('tap{}'.format(tap_id))
owners_vms = vm_pool.by_owner(vm_entry.owner)
owners_running_vms = vm_pool.by_status(VMStatus.running,
_vms=owners_vms)
networks = map(lambda n: n[0],
map(lambda vm: vm.network, owners_running_vms)
)
networks_in_use_by_user_vms = [vm[0] for vm in networks]
if network_name not in networks_in_use_by_user_vms:
network_entry = resolve_network(network[0], vm_entry.owner)
if network_entry:
network_type = network_entry.value["type"]
network_id = network_entry.value["id"]
if network_type == "vxlan":
delete_network_interface('br{}'.format(network_id))
delete_network_interface('vxlan{}'.format(network_id))
except Exception:
logger.exception("Exception in network interface deletion")
def create_dev(script, _id, dev, ip=None):
command = [script, _id, dev]
if ip:
command.append(ip)
try:
output = sp.check_output(command, stderr=sp.PIPE)
except Exception as e:
print(e.stderr)
return None
else:
return output.decode("utf-8").strip()
def create_vxlan_br_tap(_id, _dev, tap_id, ip=None):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'),
_id=_id, dev=_dev)
if vxlan:
bridge = create_dev(script=os.path.join(network_script_base, 'create-bridge.sh'),
_id=_id, dev=vxlan, ip=ip)
if bridge:
tap = create_dev(script=os.path.join(network_script_base, 'create-tap.sh'),
_id=str(tap_id), dev=bridge)
if tap:
return tap
def random_bytes(num=6):
return [random.randrange(256) for _ in range(num)]
def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='%02x'):
mac = random_bytes()
if oui:
if type(oui) == str:
oui = [int(chunk) for chunk in oui.split(separator)]
mac = oui + random_bytes(num=6 - len(oui))
else:
if multicast:
mac[0] |= 1 # set bit 0
else:
mac[0] &= ~1 # clear bit 0
if uaa:
mac[0] &= ~(1 << 1) # clear bit 1
else:
mac[0] |= 1 << 1 # set bit 1
return separator.join(byte_fmt % b for b in mac)
def update_radvd_conf(etcd_client):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
networks = {
net.value['ipv6']: net.value['id']
for net in etcd_client.get_prefix('/v1/network/', value_in_json=True)
if net.value.get('ipv6')
}
radvd_template = open(os.path.join(network_script_base,
'radvd-template.conf'), 'r').read()
radvd_template = Template(radvd_template)
content = [radvd_template.safe_substitute(bridge='br{}'.format(networks[net]),
prefix=net)
for net in networks if networks.get(net)]
with open('/etc/radvd.conf', 'w') as radvd_conf:
radvd_conf.writelines(content)
try:
sp.check_output(['systemctl', 'restart', 'radvd'])
except Exception:
sp.check_output(['service', 'radvd', 'restart'])
def get_start_command_args(vm_entry, vnc_sock_filename: str, migration=False, migration_port=None):
threads_per_core = 1
vm_memory = int(bitmath.parse_string_unsafe(vm_entry.specs["ram"]).to_MB())
vm_cpus = int(vm_entry.specs["cpu"])
vm_uuid = vm_entry.uuid
vm_networks = vm_entry.network
command = "-name {}_{}".format(vm_entry.owner, vm_entry.name)
command += " -drive file={},format=raw,if=virtio,cache=none".format(
image_storage_handler.qemu_path_string(vm_uuid)
)
command += " -device virtio-rng-pci -vnc unix:{}".format(vnc_sock_filename)
command += " -m {} -smp cores={},threads={}".format(
vm_memory, vm_cpus, threads_per_core
)
if migration:
command += " -incoming tcp:[::]:{}".format(migration_port)
tap = None
for network_mac_and_tap in vm_networks:
network_name, mac, tap = network_mac_and_tap
_key = os.path.join(env_vars.get('NETWORK_PREFIX'), vm_entry.owner, network_name)
network = etcd_client.get(_key, value_in_json=True)
network_type = network.value["type"]
network_id = str(network.value["id"])
network_ipv6 = network.value["ipv6"]
if network_type == "vxlan":
tap = create_vxlan_br_tap(_id=network_id,
_dev=env_vars.get("VXLAN_PHY_DEV"),
tap_id=tap,
ip=network_ipv6)
update_radvd_conf(etcd_client)
command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no" \
" -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}" \
.format(tap=tap, net_id=network_id, mac=mac)
return command.split(" ")
def create_vm_object(vm_entry, migration=False, migration_port=None):
# NOTE: If migration suddenly stop working, having different
# VNC unix filename on source and destination host can
# be a possible cause of it.
# REQUIREMENT: Use Unix Socket instead of TCP Port for VNC
vnc_sock_file = tempfile.NamedTemporaryFile()
qemu_args = get_start_command_args(
vm_entry=vm_entry,
vnc_sock_filename=vnc_sock_file.name,
migration=migration,
migration_port=migration_port,
)
qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args)
return VM(vm_entry.key, qemu_machine, vnc_sock_file)
def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
return next((vm for vm in vm_list if vm.key == vm_key), None)
def need_running_vm(func):
@wraps(func)
def wrapper(e):
vm = get_vm(running_vms, e.key)
if vm:
try:
status = vm.handle.command("query-status")
logger.debug("VM Status Check - %s", status)
except Exception as exception:
logger.info("%s failed - VM %s %s", func.__name__, e, exception)
else:
return func(e)
return None
else:
logger.info("%s failed because VM %s is not running", func.__name__, e.key)
return None
return wrapper
def create(vm_entry: VMEntry):
if image_storage_handler.is_vm_image_exists(vm_entry.uuid):
# File Already exists. No Problem Continue
logger.debug("Image for vm %s exists", vm_entry.uuid)
else:
vm_hdd = int(bitmath.parse_string_unsafe(vm_entry.specs["os-ssd"]).to_MB())
if image_storage_handler.make_vm_image(src=vm_entry.image_uuid, dest=vm_entry.uuid):
if not image_storage_handler.resize_vm_image(path=vm_entry.uuid, size=vm_hdd):
vm_entry.status = VMStatus.error
else:
logger.info("New VM Created")
def start(vm_entry: VMEntry, destination_host_key=None, migration_port=None):
_vm = get_vm(running_vms, vm_entry.key)
# VM already running. No need to proceed further.
if _vm:
logger.info("VM %s already running" % vm_entry.uuid)
return
else:
logger.info("Trying to start %s" % vm_entry.uuid)
if destination_host_key:
launch_vm(vm_entry, migration=True, migration_port=migration_port,
destination_host_key=destination_host_key)
else:
create(vm_entry)
launch_vm(vm_entry)
@need_running_vm
def stop(vm_entry):
vm = get_vm(running_vms, vm_entry.key)
vm.handle.shutdown()
if not vm.handle.is_running():
vm_entry.add_log("Shutdown successfully")
vm_entry.declare_stopped()
vm_pool.put(vm_entry)
running_vms.remove(vm)
delete_vm_network(vm_entry)
def delete(vm_entry):
logger.info("Deleting VM | %s", vm_entry)
stop(vm_entry)
if image_storage_handler.is_vm_image_exists(vm_entry.uuid):
r_status = image_storage_handler.delete_vm_image(vm_entry.uuid)
if r_status:
etcd_client.client.delete(vm_entry.key)
else:
etcd_client.client.delete(vm_entry.key)
def transfer(request_event):
# This function would run on source host i.e host on which the vm
# is running initially. This host would be responsible for transferring
# vm state to destination host.
_host, _port = request_event.parameters["host"], request_event.parameters["port"]
_uuid = request_event.uuid
_destination = request_event.destination_host_key
vm = get_vm(running_vms, join_path(env_vars.get('VM_PREFIX'), _uuid))
if vm:
tunnel = sshtunnel.SSHTunnelForwarder(
_host,
ssh_username=env_vars.get("ssh_username"),
ssh_pkey=env_vars.get("ssh_pkey"),
remote_bind_address=("127.0.0.1", _port),
ssh_proxy_enabled=True,
ssh_proxy=(_host, 22)
)
try:
tunnel.start()
except sshtunnel.BaseSSHTunnelForwarderError:
logger.exception("Couldn't establish connection to (%s, 22)", _host)
else:
vm.handle.command(
"migrate", uri="tcp:0.0.0.0:{}".format(tunnel.local_bind_port)
)
status = vm.handle.command("query-migrate")["status"]
while status not in ["failed", "completed"]:
time.sleep(2)
status = vm.handle.command("query-migrate")["status"]
with vm_pool.get_put(request_event.uuid) as source_vm:
if status == "failed":
source_vm.add_log("Migration Failed")
elif status == "completed":
# If VM is successfully migrated then shutdown the VM
# on this host and update hostname to destination host key
source_vm.add_log("Successfully migrated")
source_vm.hostname = _destination
running_vms.remove(vm)
vm.handle.shutdown()
source_vm.in_migration = False # VM transfer finished
finally:
tunnel.close()
def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_key=None):
logger.info("Starting %s" % vm_entry.key)
vm = create_vm_object(vm_entry, migration=migration, migration_port=migration_port)
try:
vm.handle.launch()
except Exception:
logger.exception("Error Occured while starting VM")
vm.handle.shutdown()
if migration:
# We don't care whether MachineError or any other error occurred
pass
else:
# Error during typical launch of a vm
vm.handle.shutdown()
vm_entry.declare_killed()
vm_pool.put(vm_entry)
else:
vm_entry.vnc_socket = vm.vnc_socket_file.name
running_vms.append(vm)
if migration:
vm_entry.in_migration = True
r = RequestEntry.from_scratch(
type=RequestType.TransferVM,
hostname=vm_entry.hostname,
parameters={"host": get_ipv6_address(), "port": migration_port},
uuid=vm_entry.uuid,
destination_host_key=destination_host_key,
request_prefix=env_vars.get("REQUEST_PREFIX")
)
request_pool.put(r)
else:
# Typical launching of a vm
vm_entry.status = VMStatus.running
vm_entry.add_log("Started successfully")
vm_pool.put(vm_entry)