uncloud-mravi/uncloud/vmm/__init__.py

284 lines
8.8 KiB
Python

import os
import subprocess as sp
import logging
import socket
import json
import tempfile
import time
from contextlib import suppress
from multiprocessing import Process
from os.path import join as join_path
from os.path import isdir
logger = logging.getLogger(__name__)
class VMQMPHandles:
def __init__(self, path):
self.path = path
self.sock = socket.socket(socket.AF_UNIX)
self.file = self.sock.makefile()
def __enter__(self):
self.sock.connect(self.path)
# eat qmp greetings
self.file.readline()
# init qmp
self.sock.sendall(b'{ "execute": "qmp_capabilities" }')
self.file.readline()
return self.sock, self.file
def __exit__(self, exc_type, exc_val, exc_tb):
self.file.close()
self.sock.close()
if exc_type:
logger.error(
"Couldn't get handle for VM.", exc_type, exc_val, exc_tb
)
raise exc_type("Couldn't get handle for VM.") from exc_type
class TransferVM(Process):
def __init__(self, src_uuid, dest_sock_path, host, socket_dir):
self.src_uuid = src_uuid
self.host = host
self.src_sock_path = os.path.join(socket_dir, self.src_uuid)
self.dest_sock_path = dest_sock_path
super().__init__()
def run(self):
with suppress(FileNotFoundError):
os.remove(self.src_sock_path)
command = [
"ssh",
"-nNT",
"-L",
"{}:{}".format(self.src_sock_path, self.dest_sock_path),
"root@{}".format(self.host),
]
try:
p = sp.Popen(command)
except Exception as e:
logger.error(
"Couldn' forward unix socks over ssh.", exc_info=e
)
else:
time.sleep(2)
vmm = VMM()
logger.debug("Executing: ssh forwarding command: %s", command)
vmm.execute_command(
self.src_uuid,
command="migrate",
arguments={"uri": "unix:{}".format(self.src_sock_path)},
)
while p.poll() is None:
success, output = vmm.execute_command(self.src_uuid, command="query-migrate")
if success:
status = output["return"]["status"]
logger.info('Migration Status: {}'.format(status))
if status == "completed":
vmm.stop(self.src_uuid)
return
elif status in ['failed', 'cancelled']:
return
else:
logger.error("Couldn't be able to query VM {} that was in migration".format(self.src_uuid))
return
time.sleep(2)
class VMM:
# Virtual Machine Manager
def __init__(
self,
qemu_path="/usr/bin/qemu-system-x86_64",
vmm_backend=os.path.expanduser("~/uncloud/vmm/"),
):
self.qemu_path = qemu_path
self.vmm_backend = vmm_backend
self.socket_dir = os.path.join(self.vmm_backend, "sock")
if not os.path.isdir(self.vmm_backend):
logger.info(
"{} does not exists. Creating it...".format(
self.vmm_backend
)
)
os.makedirs(self.vmm_backend, exist_ok=True)
if not os.path.isdir(self.socket_dir):
logger.info(
"{} does not exists. Creating it...".format(
self.socket_dir
)
)
os.makedirs(self.socket_dir, exist_ok=True)
def is_running(self, uuid):
sock_path = os.path.join(self.vmm_backend, uuid)
try:
sock = socket.socket(socket.AF_UNIX)
sock.connect(sock_path)
recv = sock.recv(4096)
except Exception as err:
# unix sock doesn't exists or it is closed
logger.debug(
"VM {} sock either don' exists or it is closed. It mean VM is stopped.".format(
uuid
),
exc_info=err,
)
else:
# if we receive greetings from qmp it mean VM is running
if len(recv) > 0:
return True
with suppress(FileNotFoundError):
os.remove(sock_path)
return False
def start(self, *args, uuid, migration=False):
# start --> sucess?
migration_args = ()
if migration:
migration_args = (
"-incoming",
"unix:{}".format(os.path.join(self.socket_dir, uuid)),
)
if self.is_running(uuid):
logger.warning("Cannot start VM. It is already running.")
else:
qmp_arg = (
"-qmp",
"unix:{},server,nowait".format(
join_path(self.vmm_backend, uuid)
),
)
vnc_arg = (
"-vnc",
"unix:{}".format(tempfile.NamedTemporaryFile().name),
)
command = [
"sudo",
"-p",
"Enter password to start VM {}: ".format(uuid),
self.qemu_path,
*args,
*qmp_arg,
*migration_args,
*vnc_arg,
"-daemonize",
]
try:
sp.check_output(command, stderr=sp.PIPE)
except sp.CalledProcessError as err:
logger.exception(
"Error occurred while starting VM.\nDetail %s",
err.stderr.decode("utf-8"),
)
else:
sp.check_output(
["sudo", "-p", "Enter password to correct permission for uncloud-vmm's directory",
"chmod", "-R", "o=rwx,g=rwx", self.vmm_backend]
)
# TODO: Find some good way to check whether the virtual machine is up and
# running without relying on non-guarenteed ways.
for _ in range(10):
time.sleep(2)
status = self.get_status(uuid)
if status in ["running", "inmigrate"]:
return status
logger.warning(
"Timeout on VM's status. Shutting down VM %s", uuid
)
self.stop(uuid)
# TODO: What should we do more. VM can still continue to run in background.
# If we have pid of vm we can kill it using OS.
def execute_command(self, uuid, command, **kwargs):
# execute_command -> sucess?, output
try:
with VMQMPHandles(os.path.join(self.vmm_backend, uuid)) as (
sock_handle,
file_handle,
):
command_to_execute = {"execute": command, **kwargs}
sock_handle.sendall(
json.dumps(command_to_execute).encode("utf-8")
)
output = file_handle.readline()
except Exception:
logger.exception(
"Error occurred while executing command and getting valid output from qmp"
)
else:
try:
output = json.loads(output)
except Exception:
logger.exception(
"QMP Output isn't valid JSON. %s", output
)
else:
return "return" in output, output
return False, None
def stop(self, uuid):
success, output = self.execute_command(
command="quit", uuid=uuid
)
return success
def get_status(self, uuid):
success, output = self.execute_command(
command="query-status", uuid=uuid
)
if success:
return output["return"]["status"]
else:
# TODO: Think about this for a little more
return "STOPPED"
def discover(self):
vms = [
uuid
for uuid in os.listdir(self.vmm_backend)
if not isdir(join_path(self.vmm_backend, uuid))
]
return vms
def get_vnc(self, uuid):
success, output = self.execute_command(
uuid, command="query-vnc"
)
if success:
return output["return"]["service"]
return None
def transfer(self, src_uuid, destination_sock_path, host):
p = TransferVM(
src_uuid,
destination_sock_path,
socket_dir=self.socket_dir,
host=host,
)
p.start()
# TODO: the following method should clean things that went wrong
# e.g If VM migration fails or didn't start for long time
# i.e 15 minutes we should stop the waiting VM.
def maintenace(self):
pass