uncloud-mravi/host/virtualmachine.py

403 lines
No EOL
13 KiB
Python
Executable file

# QEMU Manual
# https://qemu.weilnetz.de/doc/qemu-doc.html
# For QEMU Monitor Protocol Commands Information, See
# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor
import errno
import os
import subprocess as sp
import tempfile
import time
import random
import ipaddress
from functools import wraps
from os.path import join
from typing import Union
from string import Template
import bitmath
import sshtunnel
import qmp
from decouple import config
from ucloud_common.helpers import get_ipv4_address
from ucloud_common.request import RequestEntry, RequestType
from ucloud_common.vm import VMEntry, VMStatus
from config import (WITHOUT_CEPH, VM_PREFIX, VM_DIR, IMAGE_DIR,
NETWORK_PREFIX, etcd_client, logging,
request_pool, running_vms, vm_pool)
class VM:
def __init__(self, key, handle, vnc_socket_file):
self.key = key # type: str
self.handle = handle # type: qmp.QEMUMachine
self.vnc_socket_file = vnc_socket_file # type: tempfile.NamedTemporaryFile
def __repr__(self):
return "VM({})".format(self.key)
def create_dev(script, _id, dev, ip=None):
command = [script, _id, dev]
if ip:
command.append(ip)
try:
output = sp.check_output(command, stderr=sp.PIPE)
except Exception as e:
print(e.stderr)
return None
else:
return output.decode("utf-8").strip()
def create_vxlan_br_tap(_id, _dev, ip=None):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
vxlan = create_dev(script=os.path.join(network_script_base, 'create-vxlan.sh'),
_id=_id, dev=_dev)
if vxlan:
bridge = create_dev(script=os.path.join(network_script_base, 'create-bridge.sh'),
_id=_id, dev=vxlan, ip=ip)
if bridge:
tap = create_dev(script=os.path.join(network_script_base, 'create-tap.sh'),
_id=str(random.randint(1, 100000)), dev=bridge)
if tap:
return tap
def random_bytes(num=6):
return [random.randrange(256) for _ in range(num)]
def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt='%02x'):
mac = random_bytes()
if oui:
if type(oui) == str:
oui = [int(chunk) for chunk in oui.split(separator)]
mac = oui + random_bytes(num=6-len(oui))
else:
if multicast:
mac[0] |= 1 # set bit 0
else:
mac[0] &= ~1 # clear bit 0
if uaa:
mac[0] &= ~(1 << 1) # clear bit 1
else:
mac[0] |= 1 << 1 # set bit 1
return separator.join(byte_fmt % b for b in mac)
def update_radvd_conf(etcd_client):
network_script_base = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'network')
networks = {
net.value['ipv6']:net.value['id']
for net in etcd_client.get_prefix('/v1/network/', value_in_json=True)
if net.value.get('ipv6')
}
radvd_template = open(os.path.join(network_script_base,
'radvd-template.conf'), 'r').read()
radvd_template = Template(radvd_template)
content = [radvd_template.safe_substitute(bridge='br{}'.format(networks[net]),
prefix=net)
for net in networks if networks.get(net)]
with open('/etc/radvd.conf', 'w') as radvd_conf:
radvd_conf.writelines(content)
sp.check_output(['systemctl', 'restart', 'radvd'])
def get_start_command_args(
vm_entry, vnc_sock_filename: str, migration=False, migration_port=4444,
):
threads_per_core = 1
vm_memory = int(bitmath.parse_string(vm_entry.specs["ram"]).to_MB())
vm_cpus = int(vm_entry.specs["cpu"])
vm_uuid = vm_entry.uuid
vm_networks = vm_entry.network
if WITHOUT_CEPH:
command = "-drive file={},format=raw,if=virtio,cache=none".format(
os.path.join(VM_DIR, vm_uuid)
)
else:
command = "-drive file=rbd:uservms/{},format=raw,if=virtio,cache=none".format(
vm_uuid
)
command += " -device virtio-rng-pci -vnc unix:{}".format(vnc_sock_filename)
command += " -m {} -smp cores={},threads={}".format(
vm_memory, vm_cpus, threads_per_core
)
command += " -name {}".format(vm_uuid)
if migration:
command += " -incoming tcp:0:{}".format(migration_port)
tap = None
for network_and_mac in vm_networks:
network_name, mac = network_and_mac
_key = os.path.join(NETWORK_PREFIX, vm_entry.owner, network_name)
network = etcd_client.get(_key, value_in_json=True)
network_type = network.value["type"]
network_id = str(network.value["id"])
network_ipv6 = network.value["ipv6"]
if network_type == "vxlan":
tap = create_vxlan_br_tap(network_id, config("VXLAN_PHY_DEV"), network_ipv6)
update_radvd_conf(etcd_client)
command += " -netdev tap,id=vmnet{net_id},ifname={tap},script=no,downscript=no"\
" -device virtio-net-pci,netdev=vmnet{net_id},mac={mac}"\
.format(tap=tap, net_id=network_id, mac=mac)
return command.split(" ")
def create_vm_object(vm_entry, migration=False, migration_port=4444):
# NOTE: If migration suddenly stop working, having different
# VNC unix filename on source and destination host can
# be a possible cause of it.
# REQUIREMENT: Use Unix Socket instead of TCP Port for VNC
vnc_sock_file = tempfile.NamedTemporaryFile()
qemu_args = get_start_command_args(
vm_entry=vm_entry,
vnc_sock_filename=vnc_sock_file.name,
migration=migration,
migration_port=migration_port,
)
qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args)
return VM(vm_entry.key, qemu_machine, vnc_sock_file)
def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
return next((vm for vm in vm_list if vm.key == vm_key), None)
def need_running_vm(func):
@wraps(func)
def wrapper(e):
vm = get_vm(running_vms, e.key)
if vm:
try:
status = vm.handle.command("query-status")
logging.debug("VM Status Check - %s", status)
except Exception as exception:
logging.info("%s failed - VM %s %s", func.__name__, e, exception)
else:
return func(e)
return None
else:
logging.info("%s failed because VM %s is not running", func.__name__, e.key)
return None
return wrapper
def create(vm_entry: VMEntry):
vm_hdd = int(bitmath.parse_string(vm_entry.specs["os-ssd"]).to_MB())
if WITHOUT_CEPH:
_command_to_create = [
"cp",
os.path.join(IMAGE_DIR, vm_entry.image_uuid),
os.path.join(VM_DIR, vm_entry.uuid),
]
_command_to_extend = [
"qemu-img",
"resize",
"-f", "raw",
os.path.join(VM_DIR, vm_entry.uuid),
"{}M".format(vm_hdd),
]
else:
_command_to_create = [
"rbd",
"clone",
"images/{}@protected".format(vm_entry.image_uuid),
"uservms/{}".format(vm_entry.uuid),
]
_command_to_extend = [
"rbd",
"resize",
"uservms/{}".format(vm_entry.uuid),
"--size",
vm_hdd,
]
try:
sp.check_output(_command_to_create)
except sp.CalledProcessError as e:
if e.returncode == errno.EEXIST:
logging.debug("Image for vm %s exists", vm_entry.uuid)
# File Already exists. No Problem Continue
return
# This exception catches all other exceptions
# i.e FileNotFound (BaseImage), pool Does Not Exists etc.
logging.exception(e)
vm_entry.status = "ERROR"
else:
try:
sp.check_output(_command_to_extend)
except Exception as e:
logging.exception(e)
else:
logging.info("New VM Created")
def start(vm_entry: VMEntry):
_vm = get_vm(running_vms, vm_entry.key)
# VM already running. No need to proceed further.
if _vm:
logging.info("VM %s already running", vm_entry.uuid)
return
else:
create(vm_entry)
launch_vm(vm_entry)
@need_running_vm
def stop(vm_entry):
vm = get_vm(running_vms, vm_entry.key)
vm.handle.shutdown()
if not vm.handle.is_running():
vm_entry.add_log("Shutdown successfully")
vm_entry.declare_stopped()
vm_pool.put(vm_entry)
running_vms.remove(vm)
def delete(vm_entry):
logging.info("Deleting VM | %s", vm_entry)
stop(vm_entry)
path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1 :]
if WITHOUT_CEPH:
vm_deletion_command = ["rm", os.path.join(VM_DIR, vm_entry.uuid)]
else:
vm_deletion_command = ["rbd", "rm", path_without_protocol]
try:
sp.check_output(vm_deletion_command)
except Exception as e:
logging.exception(e)
else:
etcd_client.client.delete(vm_entry.key)
def transfer(request_event):
# This function would run on source host i.e host on which the vm
# is running initially. This host would be responsible for transferring
# vm state to destination host.
_host, _port = request_event.parameters["host"], request_event.parameters["port"]
_uuid = request_event.uuid
_destination = request_event.destination_host_key
vm = get_vm(running_vms, join(VM_PREFIX, _uuid))
if vm:
tunnel = sshtunnel.SSHTunnelForwarder(
(_host, 22),
ssh_username=config("ssh_username"),
ssh_pkey=config("ssh_pkey"),
ssh_private_key_password=config("ssh_private_key_password"),
remote_bind_address=("127.0.0.1", _port),
)
try:
tunnel.start()
except sshtunnel.BaseSSHTunnelForwarderError:
logging.exception("Couldn't establish connection to (%s, 22)", _host)
else:
vm.handle.command(
"migrate", uri="tcp:{}:{}".format(_host, tunnel.local_bind_port)
)
status = vm.handle.command("query-migrate")["status"]
while status not in ["failed", "completed"]:
time.sleep(2)
status = vm.handle.command("query-migrate")["status"]
with vm_pool.get_put(request_event.uuid) as source_vm:
if status == "failed":
source_vm.add_log("Migration Failed")
elif status == "completed":
# If VM is successfully migrated then shutdown the VM
# on this host and update hostname to destination host key
source_vm.add_log("Successfully migrated")
source_vm.hostname = _destination
running_vms.remove(vm)
vm.handle.shutdown()
source_vm.in_migration = False # VM transfer finished
finally:
tunnel.close()
def init_migration(vm_entry, destination_host_key):
# This function would run on destination host i.e host on which the vm
# would be transferred after migration.
# This host would be responsible for starting VM that would receive
# state of VM running on source host.
_vm = get_vm(running_vms, vm_entry.key)
if _vm:
# VM already running. No need to proceed further.
logging.info("%s Already running", _vm.key)
return
launch_vm(vm_entry, migration=True, migration_port=4444,
destination_host_key=destination_host_key)
def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_key=None):
logging.info("Starting %s", vm_entry.key)
vm = create_vm_object(vm_entry, migration=migration, migration_port=migration_port)
try:
vm.handle.launch()
except Exception as e:
logging.exception(e)
if migration:
# We don't care whether MachineError or any other error occurred
vm.handle.shutdown()
else:
# Error during typical launch of a vm
vm_entry.add_log("Error Occurred while starting VM")
vm_entry.declare_killed()
vm_pool.put(vm_entry)
else:
vm_entry.vnc_socket = vm.vnc_socket_file.name
running_vms.append(vm)
if migration:
vm_entry.in_migration = True
r = RequestEntry.from_scratch(
type=RequestType.TransferVM,
hostname=vm_entry.hostname,
parameters={"host": get_ipv4_address(), "port": 4444},
uuid=vm_entry.uuid,
destination_host_key=destination_host_key,
)
request_pool.put(r)
else:
# Typical launching of a vm
vm_entry.status = VMStatus.running
vm_entry.add_log("Started successfully")
vm_pool.put(vm_entry)