forked from uncloud/uncloud
284 lines
8.8 KiB
Python
284 lines
8.8 KiB
Python
import os
|
|
import subprocess as sp
|
|
import logging
|
|
import socket
|
|
import json
|
|
import tempfile
|
|
import time
|
|
|
|
from contextlib import suppress
|
|
from multiprocessing import Process
|
|
from os.path import join as join_path
|
|
from os.path import isdir
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class VMQMPHandles:
|
|
def __init__(self, path):
|
|
self.path = path
|
|
self.sock = socket.socket(socket.AF_UNIX)
|
|
self.file = self.sock.makefile()
|
|
|
|
def __enter__(self):
|
|
self.sock.connect(self.path)
|
|
|
|
# eat qmp greetings
|
|
self.file.readline()
|
|
|
|
# init qmp
|
|
self.sock.sendall(b'{ "execute": "qmp_capabilities" }')
|
|
self.file.readline()
|
|
|
|
return self.sock, self.file
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb):
|
|
self.file.close()
|
|
self.sock.close()
|
|
|
|
if exc_type:
|
|
logger.error(
|
|
"Couldn't get handle for VM.", exc_type, exc_val, exc_tb
|
|
)
|
|
raise exc_type("Couldn't get handle for VM.") from exc_type
|
|
|
|
|
|
class TransferVM(Process):
|
|
def __init__(self, src_uuid, dest_sock_path, host, socket_dir):
|
|
self.src_uuid = src_uuid
|
|
self.host = host
|
|
self.src_sock_path = os.path.join(socket_dir, self.src_uuid)
|
|
self.dest_sock_path = dest_sock_path
|
|
|
|
super().__init__()
|
|
|
|
def run(self):
|
|
with suppress(FileNotFoundError):
|
|
os.remove(self.src_sock_path)
|
|
|
|
command = [
|
|
"ssh",
|
|
"-nNT",
|
|
"-L",
|
|
"{}:{}".format(self.src_sock_path, self.dest_sock_path),
|
|
"root@{}".format(self.host),
|
|
]
|
|
|
|
try:
|
|
p = sp.Popen(command)
|
|
except Exception as e:
|
|
logger.error(
|
|
"Couldn' forward unix socks over ssh.", exc_info=e
|
|
)
|
|
else:
|
|
time.sleep(2)
|
|
vmm = VMM()
|
|
logger.debug("Executing: ssh forwarding command: %s", command)
|
|
vmm.execute_command(
|
|
self.src_uuid,
|
|
command="migrate",
|
|
arguments={"uri": "unix:{}".format(self.src_sock_path)},
|
|
)
|
|
|
|
while p.poll() is None:
|
|
success, output = vmm.execute_command(self.src_uuid, command="query-migrate")
|
|
if success:
|
|
status = output["return"]["status"]
|
|
logger.info('Migration Status: {}'.format(status))
|
|
if status == "completed":
|
|
vmm.stop(self.src_uuid)
|
|
return
|
|
elif status in ['failed', 'cancelled']:
|
|
return
|
|
else:
|
|
logger.error("Couldn't be able to query VM {} that was in migration".format(self.src_uuid))
|
|
return
|
|
|
|
time.sleep(2)
|
|
|
|
|
|
class VMM:
|
|
# Virtual Machine Manager
|
|
def __init__(
|
|
self,
|
|
qemu_path="/usr/bin/qemu-system-x86_64",
|
|
vmm_backend=os.path.expanduser("~/uncloud/vmm/"),
|
|
):
|
|
self.qemu_path = qemu_path
|
|
self.vmm_backend = vmm_backend
|
|
self.socket_dir = os.path.join(self.vmm_backend, "sock")
|
|
|
|
if not os.path.isdir(self.vmm_backend):
|
|
logger.info(
|
|
"{} does not exists. Creating it...".format(
|
|
self.vmm_backend
|
|
)
|
|
)
|
|
os.makedirs(self.vmm_backend, exist_ok=True)
|
|
|
|
if not os.path.isdir(self.socket_dir):
|
|
logger.info(
|
|
"{} does not exists. Creating it...".format(
|
|
self.socket_dir
|
|
)
|
|
)
|
|
os.makedirs(self.socket_dir, exist_ok=True)
|
|
|
|
def is_running(self, uuid):
|
|
sock_path = os.path.join(self.vmm_backend, uuid)
|
|
try:
|
|
sock = socket.socket(socket.AF_UNIX)
|
|
sock.connect(sock_path)
|
|
recv = sock.recv(4096)
|
|
except Exception as err:
|
|
# unix sock doesn't exists or it is closed
|
|
logger.debug(
|
|
"VM {} sock either don' exists or it is closed. It mean VM is stopped.".format(
|
|
uuid
|
|
),
|
|
exc_info=err,
|
|
)
|
|
else:
|
|
# if we receive greetings from qmp it mean VM is running
|
|
if len(recv) > 0:
|
|
return True
|
|
|
|
with suppress(FileNotFoundError):
|
|
os.remove(sock_path)
|
|
|
|
return False
|
|
|
|
def start(self, *args, uuid, migration=False):
|
|
# start --> sucess?
|
|
migration_args = ()
|
|
if migration:
|
|
migration_args = (
|
|
"-incoming",
|
|
"unix:{}".format(os.path.join(self.socket_dir, uuid)),
|
|
)
|
|
|
|
if self.is_running(uuid):
|
|
logger.warning("Cannot start VM. It is already running.")
|
|
else:
|
|
qmp_arg = (
|
|
"-qmp",
|
|
"unix:{},server,nowait".format(
|
|
join_path(self.vmm_backend, uuid)
|
|
),
|
|
)
|
|
vnc_arg = (
|
|
"-vnc",
|
|
"unix:{}".format(tempfile.NamedTemporaryFile().name),
|
|
)
|
|
|
|
command = [
|
|
"sudo",
|
|
"-p",
|
|
"Enter password to start VM {}: ".format(uuid),
|
|
self.qemu_path,
|
|
*args,
|
|
*qmp_arg,
|
|
*migration_args,
|
|
*vnc_arg,
|
|
"-daemonize",
|
|
]
|
|
try:
|
|
sp.check_output(command, stderr=sp.PIPE)
|
|
except sp.CalledProcessError as err:
|
|
logger.exception(
|
|
"Error occurred while starting VM.\nDetail %s",
|
|
err.stderr.decode("utf-8"),
|
|
)
|
|
else:
|
|
sp.check_output(
|
|
["sudo", "-p", "Enter password to correct permission for uncloud-vmm's directory",
|
|
"chmod", "-R", "o=rwx,g=rwx", self.vmm_backend]
|
|
)
|
|
|
|
# TODO: Find some good way to check whether the virtual machine is up and
|
|
# running without relying on non-guarenteed ways.
|
|
for _ in range(10):
|
|
time.sleep(2)
|
|
status = self.get_status(uuid)
|
|
if status in ["running", "inmigrate"]:
|
|
return status
|
|
logger.warning(
|
|
"Timeout on VM's status. Shutting down VM %s", uuid
|
|
)
|
|
self.stop(uuid)
|
|
# TODO: What should we do more. VM can still continue to run in background.
|
|
# If we have pid of vm we can kill it using OS.
|
|
|
|
def execute_command(self, uuid, command, **kwargs):
|
|
# execute_command -> sucess?, output
|
|
try:
|
|
with VMQMPHandles(os.path.join(self.vmm_backend, uuid)) as (
|
|
sock_handle,
|
|
file_handle,
|
|
):
|
|
command_to_execute = {"execute": command, **kwargs}
|
|
sock_handle.sendall(
|
|
json.dumps(command_to_execute).encode("utf-8")
|
|
)
|
|
output = file_handle.readline()
|
|
except Exception:
|
|
logger.exception(
|
|
"Error occurred while executing command and getting valid output from qmp"
|
|
)
|
|
else:
|
|
try:
|
|
output = json.loads(output)
|
|
except Exception:
|
|
logger.exception(
|
|
"QMP Output isn't valid JSON. %s", output
|
|
)
|
|
else:
|
|
return "return" in output, output
|
|
return False, None
|
|
|
|
def stop(self, uuid):
|
|
success, output = self.execute_command(
|
|
command="quit", uuid=uuid
|
|
)
|
|
return success
|
|
|
|
def get_status(self, uuid):
|
|
success, output = self.execute_command(
|
|
command="query-status", uuid=uuid
|
|
)
|
|
if success:
|
|
return output["return"]["status"]
|
|
else:
|
|
# TODO: Think about this for a little more
|
|
return "STOPPED"
|
|
|
|
def discover(self):
|
|
vms = [
|
|
uuid
|
|
for uuid in os.listdir(self.vmm_backend)
|
|
if not isdir(join_path(self.vmm_backend, uuid))
|
|
]
|
|
return vms
|
|
|
|
def get_vnc(self, uuid):
|
|
success, output = self.execute_command(
|
|
uuid, command="query-vnc"
|
|
)
|
|
if success:
|
|
return output["return"]["service"]
|
|
return None
|
|
|
|
def transfer(self, src_uuid, destination_sock_path, host):
|
|
p = TransferVM(
|
|
src_uuid,
|
|
destination_sock_path,
|
|
socket_dir=self.socket_dir,
|
|
host=host,
|
|
)
|
|
p.start()
|
|
|
|
# TODO: the following method should clean things that went wrong
|
|
# e.g If VM migration fails or didn't start for long time
|
|
# i.e 15 minutes we should stop the waiting VM.
|
|
def maintenace(self):
|
|
pass
|