ucloud-{api,scheduler,host,filescanner,imagescanner,metadata} combined
This commit is contained in:
commit
da77ac65eb
29 changed files with 3941 additions and 0 deletions
310
host/virtualmachine.py
Executable file
310
host/virtualmachine.py
Executable file
|
|
@ -0,0 +1,310 @@
|
|||
# QEMU Manual
|
||||
# https://qemu.weilnetz.de/doc/qemu-doc.html
|
||||
|
||||
# For QEMU Monitor Protocol Commands Information, See
|
||||
# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor
|
||||
|
||||
import errno
|
||||
import os
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
from functools import wraps
|
||||
from os.path import join
|
||||
from typing import Union
|
||||
|
||||
import bitmath
|
||||
import sshtunnel
|
||||
from decouple import config
|
||||
|
||||
import qmp
|
||||
from config import (WITHOUT_CEPH, VM_PREFIX, VM_DIR, IMAGE_DIR,
|
||||
etcd_client, logging, request_pool,
|
||||
running_vms, vm_pool)
|
||||
from ucloud_common.helpers import get_ipv4_address
|
||||
from ucloud_common.request import RequestEntry, RequestType
|
||||
from ucloud_common.vm import VMEntry, VMStatus
|
||||
|
||||
|
||||
class VM:
|
||||
def __init__(self, key, handle, vnc_socket_file):
|
||||
self.key = key # type: str
|
||||
self.handle = handle # type: qmp.QEMUMachine
|
||||
self.vnc_socket_file = vnc_socket_file # type: tempfile.NamedTemporaryFile
|
||||
|
||||
def __repr__(self):
|
||||
return "VM({})".format(self.key)
|
||||
|
||||
|
||||
def get_start_command_args(
|
||||
vm_entry, vnc_sock_filename: str, migration=False, migration_port=4444
|
||||
):
|
||||
threads_per_core = 1
|
||||
vm_memory = int(bitmath.parse_string(vm_entry.specs["ram"]).to_MB())
|
||||
vm_cpus = int(vm_entry.specs["cpu"])
|
||||
vm_uuid = vm_entry.uuid
|
||||
|
||||
if WITHOUT_CEPH:
|
||||
command = "-drive file={},format=raw,if=virtio,cache=none".format(
|
||||
os.path.join(VM_DIR, vm_uuid)
|
||||
)
|
||||
else:
|
||||
command = "-drive file=rbd:uservms/{},format=raw,if=virtio,cache=none".format(
|
||||
vm_uuid
|
||||
)
|
||||
|
||||
command += " -device virtio-rng-pci -vnc unix:{}".format(vnc_sock_filename)
|
||||
command += " -m {} -smp cores={},threads={}".format(
|
||||
vm_memory, vm_cpus, threads_per_core
|
||||
)
|
||||
command += " -name {}".format(vm_uuid)
|
||||
|
||||
if migration:
|
||||
command += " -incoming tcp:0:{}".format(migration_port)
|
||||
|
||||
command += " -nic tap,model=virtio,mac={}".format(vm_entry.mac)
|
||||
return command.split(" ")
|
||||
|
||||
|
||||
def create_vm_object(vm_entry, migration=False, migration_port=4444):
|
||||
# NOTE: If migration suddenly stop working, having different
|
||||
# VNC unix filename on source and destination host can
|
||||
# be a possible cause of it.
|
||||
|
||||
# REQUIREMENT: Use Unix Socket instead of TCP Port for VNC
|
||||
vnc_sock_file = tempfile.NamedTemporaryFile()
|
||||
|
||||
qemu_args = get_start_command_args(
|
||||
vm_entry=vm_entry,
|
||||
vnc_sock_filename=vnc_sock_file.name,
|
||||
migration=migration,
|
||||
migration_port=migration_port,
|
||||
)
|
||||
qemu_machine = qmp.QEMUMachine("/usr/bin/qemu-system-x86_64", args=qemu_args)
|
||||
return VM(vm_entry.key, qemu_machine, vnc_sock_file)
|
||||
|
||||
|
||||
def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
|
||||
return next((vm for vm in vm_list if vm.key == vm_key), None)
|
||||
|
||||
|
||||
def need_running_vm(func):
|
||||
@wraps(func)
|
||||
def wrapper(e):
|
||||
vm = get_vm(running_vms, e.key)
|
||||
if vm:
|
||||
try:
|
||||
status = vm.handle.command("query-status")
|
||||
logging.debug("VM Status Check - %s", status)
|
||||
except Exception as exception:
|
||||
logging.info("%s failed - VM %s %s", func.__name__, e, exception)
|
||||
else:
|
||||
return func(e)
|
||||
|
||||
return None
|
||||
else:
|
||||
logging.info("%s failed because VM %s is not running", func.__name__, e.key)
|
||||
return None
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def create(vm_entry: VMEntry):
|
||||
vm_hdd = int(bitmath.parse_string(vm_entry.specs["os-ssd"]).to_MB())
|
||||
|
||||
if WITHOUT_CEPH:
|
||||
_command_to_create = [
|
||||
"cp",
|
||||
os.path.join(IMAGE_DIR, vm_entry.image_uuid),
|
||||
os.path.join(VM_DIR, vm_entry.uuid),
|
||||
]
|
||||
|
||||
_command_to_extend = [
|
||||
"qemu-img",
|
||||
"resize",
|
||||
"-f", "raw",
|
||||
os.path.join(VM_DIR, vm_entry.uuid),
|
||||
"{}M".format(vm_hdd),
|
||||
]
|
||||
else:
|
||||
_command_to_create = [
|
||||
"rbd",
|
||||
"clone",
|
||||
"images/{}@protected".format(vm_entry.image_uuid),
|
||||
"uservms/{}".format(vm_entry.uuid),
|
||||
]
|
||||
|
||||
_command_to_extend = [
|
||||
"rbd",
|
||||
"resize",
|
||||
"uservms/{}".format(vm_entry.uuid),
|
||||
"--size",
|
||||
vm_hdd,
|
||||
]
|
||||
|
||||
try:
|
||||
subprocess.check_output(_command_to_create)
|
||||
except subprocess.CalledProcessError as e:
|
||||
if e.returncode == errno.EEXIST:
|
||||
logging.debug("Image for vm %s exists", vm_entry.uuid)
|
||||
# File Already exists. No Problem Continue
|
||||
return
|
||||
|
||||
# This exception catches all other exceptions
|
||||
# i.e FileNotFound (BaseImage), pool Does Not Exists etc.
|
||||
logging.exception(e)
|
||||
|
||||
vm_entry.status = "ERROR"
|
||||
else:
|
||||
try:
|
||||
subprocess.check_output(_command_to_extend)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
else:
|
||||
logging.info("New VM Created")
|
||||
|
||||
|
||||
def start(vm_entry: VMEntry):
|
||||
_vm = get_vm(running_vms, vm_entry.key)
|
||||
|
||||
# VM already running. No need to proceed further.
|
||||
if _vm:
|
||||
logging.info("VM %s already running", vm_entry.uuid)
|
||||
return
|
||||
else:
|
||||
create(vm_entry)
|
||||
launch_vm(vm_entry)
|
||||
|
||||
|
||||
@need_running_vm
|
||||
def stop(vm_entry):
|
||||
vm = get_vm(running_vms, vm_entry.key)
|
||||
vm.handle.shutdown()
|
||||
if not vm.handle.is_running():
|
||||
vm_entry.add_log("Shutdown successfully")
|
||||
vm_entry.declare_stopped()
|
||||
vm_pool.put(vm_entry)
|
||||
running_vms.remove(vm)
|
||||
|
||||
|
||||
def delete(vm_entry):
|
||||
logging.info("Deleting VM | %s", vm_entry)
|
||||
stop(vm_entry)
|
||||
path_without_protocol = vm_entry.path[vm_entry.path.find(":") + 1 :]
|
||||
|
||||
if WITHOUT_CEPH:
|
||||
vm_deletion_command = ["rm", os.path.join(VM_DIR, vm_entry.uuid)]
|
||||
else:
|
||||
vm_deletion_command = ["rbd", "rm", path_without_protocol]
|
||||
|
||||
try:
|
||||
subprocess.check_output(vm_deletion_command)
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
else:
|
||||
etcd_client.client.delete(vm_entry.key)
|
||||
|
||||
|
||||
def transfer(request_event):
|
||||
# This function would run on source host i.e host on which the vm
|
||||
# is running initially. This host would be responsible for transferring
|
||||
# vm state to destination host.
|
||||
|
||||
_host, _port = request_event.parameters["host"], request_event.parameters["port"]
|
||||
_uuid = request_event.uuid
|
||||
_destination = request_event.destination_host_key
|
||||
vm = get_vm(running_vms, join(VM_PREFIX, _uuid))
|
||||
|
||||
if vm:
|
||||
tunnel = sshtunnel.SSHTunnelForwarder(
|
||||
(_host, 22),
|
||||
ssh_username=config("ssh_username"),
|
||||
ssh_pkey=config("ssh_pkey"),
|
||||
ssh_private_key_password=config("ssh_private_key_password"),
|
||||
remote_bind_address=("127.0.0.1", _port),
|
||||
)
|
||||
try:
|
||||
tunnel.start()
|
||||
except sshtunnel.BaseSSHTunnelForwarderError:
|
||||
logging.exception("Couldn't establish connection to (%s, 22)", _host)
|
||||
else:
|
||||
vm.handle.command(
|
||||
"migrate", uri="tcp:{}:{}".format(_host, tunnel.local_bind_port)
|
||||
)
|
||||
|
||||
status = vm.handle.command("query-migrate")["status"]
|
||||
while status not in ["failed", "completed"]:
|
||||
time.sleep(2)
|
||||
status = vm.handle.command("query-migrate")["status"]
|
||||
|
||||
with vm_pool.get_put(request_event.uuid) as source_vm:
|
||||
if status == "failed":
|
||||
source_vm.add_log("Migration Failed")
|
||||
elif status == "completed":
|
||||
# If VM is successfully migrated then shutdown the VM
|
||||
# on this host and update hostname to destination host key
|
||||
source_vm.add_log("Successfully migrated")
|
||||
source_vm.hostname = _destination
|
||||
running_vms.remove(vm)
|
||||
vm.handle.shutdown()
|
||||
source_vm.in_migration = False # VM transfer finished
|
||||
finally:
|
||||
tunnel.close()
|
||||
|
||||
|
||||
def init_migration(vm_entry, destination_host_key):
|
||||
# This function would run on destination host i.e host on which the vm
|
||||
# would be transferred after migration.
|
||||
# This host would be responsible for starting VM that would receive
|
||||
# state of VM running on source host.
|
||||
|
||||
_vm = get_vm(running_vms, vm_entry.key)
|
||||
|
||||
if _vm:
|
||||
# VM already running. No need to proceed further.
|
||||
logging.info("%s Already running", _vm.key)
|
||||
return
|
||||
|
||||
launch_vm(vm_entry, migration=True, migration_port=4444,
|
||||
destination_host_key=destination_host_key)
|
||||
|
||||
|
||||
def launch_vm(vm_entry, migration=False, migration_port=None, destination_host_key=None):
|
||||
logging.info("Starting %s", vm_entry.key)
|
||||
|
||||
vm = create_vm_object(vm_entry, migration=migration, migration_port=migration_port)
|
||||
try:
|
||||
vm.handle.launch()
|
||||
except Exception as e:
|
||||
logging.exception(e)
|
||||
|
||||
if migration:
|
||||
# We don't care whether MachineError or any other error occurred
|
||||
vm.handle.shutdown()
|
||||
else:
|
||||
# Error during typical launch of a vm
|
||||
vm_entry.add_log("Error Occurred while starting VM")
|
||||
vm_entry.declare_killed()
|
||||
vm_pool.put(vm_entry)
|
||||
else:
|
||||
vm_entry.vnc_socket = vm.vnc_socket_file.name
|
||||
running_vms.append(vm)
|
||||
|
||||
if migration:
|
||||
vm_entry.in_migration = True
|
||||
r = RequestEntry.from_scratch(
|
||||
type=RequestType.TransferVM,
|
||||
hostname=vm_entry.hostname,
|
||||
parameters={"host": get_ipv4_address(), "port": 4444},
|
||||
uuid=vm_entry.uuid,
|
||||
destination_host_key=destination_host_key,
|
||||
)
|
||||
request_pool.put(r)
|
||||
else:
|
||||
# Typical launching of a vm
|
||||
vm_entry.status = VMStatus.running
|
||||
vm_entry.add_log("Started successfully")
|
||||
|
||||
vm_pool.put(vm_entry)
|
||||
|
||||
Loading…
Add table
Add a link
Reference in a new issue