import os import subprocess as sp import logging import socket import json import tempfile import time from contextlib import suppress from multiprocessing import Process from os.path import join as join_path from os.path import isdir logger = logging.getLogger(__name__) class VMQMPHandles: def __init__(self, path): self.path = path self.sock = socket.socket(socket.AF_UNIX) self.file = self.sock.makefile() def __enter__(self): self.sock.connect(self.path) # eat qmp greetings self.file.readline() # init qmp self.sock.sendall(b'{ "execute": "qmp_capabilities" }') self.file.readline() return self.sock, self.file def __exit__(self, exc_type, exc_val, exc_tb): self.file.close() self.sock.close() if exc_type: logger.error( "Couldn't get handle for VM.", exc_type, exc_val, exc_tb ) raise exc_type("Couldn't get handle for VM.") from exc_type class TransferVM(Process): def __init__(self, src_uuid, dest_sock_path, host, socket_dir): self.src_uuid = src_uuid self.host = host self.src_sock_path = os.path.join(socket_dir, self.src_uuid) self.dest_sock_path = dest_sock_path super().__init__() def run(self): with suppress(FileNotFoundError): os.remove(self.src_sock_path) command = [ "ssh", "-nNT", "-L", "{}:{}".format(self.src_sock_path, self.dest_sock_path), "root@{}".format(self.host), ] try: p = sp.Popen(command) except Exception as e: logger.error( "Couldn' forward unix socks over ssh.", exc_info=e ) else: time.sleep(2) vmm = VMM() logger.debug("Executing: ssh forwarding command: %s", command) vmm.execute_command( self.src_uuid, command="migrate", arguments={"uri": "unix:{}".format(self.src_sock_path)}, ) while p.poll() is None: success, output = vmm.execute_command(self.src_uuid, command="query-migrate") if success: status = output["return"]["status"] logger.info('Migration Status: {}'.format(status)) if status == "completed": vmm.stop(self.src_uuid) return elif status in ['failed', 'cancelled']: return else: logger.error("Couldn't be able to query VM {} that was in migration".format(self.src_uuid)) return time.sleep(2) class VMM: # Virtual Machine Manager def __init__( self, qemu_path="/usr/bin/qemu-system-x86_64", vmm_backend=os.path.expanduser("~/ucloud/vmm/"), ): self.qemu_path = qemu_path self.vmm_backend = vmm_backend self.socket_dir = os.path.join(self.vmm_backend, "sock") if not os.path.isdir(self.vmm_backend): logger.info( "{} does not exists. Creating it...".format( self.vmm_backend ) ) os.makedirs(self.vmm_backend, exist_ok=True) if not os.path.isdir(self.socket_dir): logger.info( "{} does not exists. Creating it...".format( self.socket_dir ) ) os.makedirs(self.socket_dir, exist_ok=True) def is_running(self, uuid): sock_path = os.path.join(self.vmm_backend, uuid) try: sock = socket.socket(socket.AF_UNIX) sock.connect(sock_path) recv = sock.recv(4096) except Exception as err: # unix sock doesn't exists or it is closed logger.debug( "VM {} sock either don' exists or it is closed. It mean VM is stopped.".format( uuid ), exc_info=err, ) else: # if we receive greetings from qmp it mean VM is running if len(recv) > 0: return True with suppress(FileNotFoundError): os.remove(sock_path) return False def start(self, *args, uuid, migration=False): # start --> sucess? migration_args = () if migration: migration_args = ( "-incoming", "unix:{}".format(os.path.join(self.socket_dir, uuid)), ) if self.is_running(uuid): logger.warning("Cannot start VM. It is already running.") else: qmp_arg = ( "-qmp", "unix:{},server,nowait".format( join_path(self.vmm_backend, uuid) ), ) vnc_arg = ( "-vnc", "unix:{}".format(tempfile.NamedTemporaryFile().name), ) command = [ "sudo", "-p", "Enter password to start VM {}: ".format(uuid), self.qemu_path, *args, *qmp_arg, *migration_args, *vnc_arg, "-daemonize", ] try: sp.check_output(command, stderr=sp.PIPE) except sp.CalledProcessError as err: logger.exception( "Error occurred while starting VM.\nDetail %s", err.stderr.decode("utf-8"), ) else: with suppress(sp.CalledProcessError): sp.check_output( [ "sudo", "-p", "Enter password to correct permission for uncloud-vmm's directory", "chmod", "-R", "o=rwx,g=rwx", self.vmm_backend, ] ) # TODO: Find some good way to check whether the virtual machine is up and # running without relying on non-guarenteed ways. for _ in range(10): time.sleep(2) status = self.get_status(uuid) if status in ["running", "inmigrate"]: return status logger.warning( "Timeout on VM's status. Shutting down VM %s", uuid ) self.stop(uuid) # TODO: What should we do more. VM can still continue to run in background. # If we have pid of vm we can kill it using OS. def execute_command(self, uuid, command, **kwargs): # execute_command -> sucess?, output try: with VMQMPHandles(os.path.join(self.vmm_backend, uuid)) as ( sock_handle, file_handle, ): command_to_execute = {"execute": command, **kwargs} sock_handle.sendall( json.dumps(command_to_execute).encode("utf-8") ) output = file_handle.readline() except Exception: logger.exception( "Error occurred while executing command and getting valid output from qmp" ) else: try: output = json.loads(output) except Exception: logger.exception( "QMP Output isn't valid JSON. %s", output ) else: return "return" in output, output return False, None def stop(self, uuid): success, output = self.execute_command( command="quit", uuid=uuid ) return success def get_status(self, uuid): success, output = self.execute_command( command="query-status", uuid=uuid ) if success: return output["return"]["status"] else: # TODO: Think about this for a little more return "STOPPED" def discover(self): vms = [ uuid for uuid in os.listdir(self.vmm_backend) if not isdir(join_path(self.vmm_backend, uuid)) ] return vms def get_vnc(self, uuid): success, output = self.execute_command( uuid, command="query-vnc" ) if success: return output["return"]["service"] return None def transfer(self, src_uuid, destination_sock_path, host): p = TransferVM( src_uuid, destination_sock_path, socket_dir=self.socket_dir, host=host, ) p.start() # TODO: the following method should clean things that went wrong # e.g If VM migration fails or didn't start for long time # i.e 15 minutes we should stop the waiting VM. def maintenace(self): pass