import os import subprocess as sp import logging import socket import json import tempfile import time from contextlib import suppress from multiprocessing import Process from os.path import join as join_path from os.path import isdir logger = logging.getLogger(__name__) class VMQMPHandles: def __init__(self, path): self.path = path self.sock = socket.socket(socket.AF_UNIX) self.file = self.sock.makefile() def __enter__(self): self.sock.connect(self.path) # eat qmp greetings self.file.readline() # init qmp self.sock.sendall(b'{ "execute": "qmp_capabilities" }') self.file.readline() return self.sock, self.file def __exit__(self, exc_type, exc_val, exc_tb): self.file.close() self.sock.close() if exc_type: logger.error('Couldn\'t get handle for VM.', exc_type, exc_val, exc_tb) raise exc_type("Couldn't get handle for VM.") from exc_type class TransferVM(Process): def __init__(self, src_uuid, dest_uuid, host, socket_dir): self.src_uuid = src_uuid self.dest_uuid = dest_uuid self.host = host self.src_sock_path = os.path.join(socket_dir, self.src_uuid) self.dest_sock_path = os.path.join(socket_dir, self.dest_uuid) super().__init__() def run(self): with suppress(FileNotFoundError): os.remove(self.src_sock_path) command = ['ssh', '-nNT', '-L', '{}:{}'.format(self.src_sock_path, self.dest_sock_path), 'root@{}'.format(self.host)] try: p = sp.Popen(command) except Exception as e: logger.error('Couldn\' forward unix socks over ssh.', exc_info=e) else: time.sleep(2) vmm = VMM() logger.debug('Executing: ssh forwarding command: %s', command) vmm.execute_command(self.src_uuid, command='migrate', arguments={'uri': 'unix:{}'.format(self.src_sock_path)}) while p.poll() is None: success, output = vmm.execute_command(self.src_uuid, command='query-migrate') if success: status = output['return']['status'] if status != 'active': print('Migration Status: ', status) return else: print('Migration Status: ', status) else: return time.sleep(0.2) class VMM: # Virtual Machine Manager def __init__(self, qemu_path='/usr/bin/qemu-system-x86_64', vmm_backend=os.path.expanduser('~/ucloud/vmm/')): self.qemu_path = qemu_path self.vmm_backend = vmm_backend self.socket_dir = os.path.join(self.vmm_backend, 'sock') if not os.path.isdir(self.vmm_backend): logger.info('{} does not exists. Creating it...'.format(self.vmm_backend)) os.makedirs(self.vmm_backend, exist_ok=True) if not os.path.isdir(self.socket_dir): logger.info('{} does not exists. Creating it...'.format(self.socket_dir)) os.makedirs(self.socket_dir, exist_ok=True) def is_running(self, uuid): sock_path = os.path.join(self.vmm_backend, uuid) try: sock = socket.socket(socket.AF_UNIX) sock.connect(sock_path) recv = sock.recv(4096) except Exception as err: # unix sock doesn't exists or it is closed logger.debug('VM {} sock either don\' exists or it is closed. It mean VM is stopped.'.format(uuid), exc_info=err) else: # if we receive greetings from qmp it mean VM is running if len(recv) > 0: return True with suppress(FileNotFoundError): os.remove(sock_path) return False def start(self, *args, uuid, migration=False): # start --> sucess? migration_args = () if migration: migration_args = ('-incoming', 'unix:{}'.format(os.path.join(self.socket_dir, uuid))) if self.is_running(uuid): logger.warning('Cannot start VM. It is already running.') else: qmp_arg = ('-qmp', 'unix:{},server,nowait'.format(join_path(self.vmm_backend, uuid))) vnc_arg = ('-vnc', 'unix:{}'.format(tempfile.NamedTemporaryFile().name)) command = ['sudo', '-p', 'Enter password to start VM {}: '.format(uuid), self.qemu_path, *args, *qmp_arg, *migration_args, *vnc_arg, '-daemonize'] try: sp.check_output(command, stderr=sp.PIPE) except sp.CalledProcessError as err: logger.exception('Error occurred while starting VM.\nDetail %s', err.stderr.decode('utf-8')) else: with suppress(sp.CalledProcessError): sp.check_output([ 'sudo', '-p', 'Enter password to correct permission for uncloud-vmm\'s directory', 'chmod', '-R', 'o=rwx,g=rwx', self.vmm_backend ]) # TODO: Find some good way to check whether the virtual machine is up and # running without relying on non-guarenteed ways. for _ in range(10): time.sleep(2) status = self.get_status(uuid) if status in ['running', 'inmigrate']: return status logger.warning('Timeout on VM\'s status. Shutting down VM %s', uuid) self.stop(uuid) # TODO: What should we do more. VM can still continue to run in background. # If we have pid of vm we can kill it using OS. def execute_command(self, uuid, command, **kwargs): # execute_command -> sucess?, output try: with VMQMPHandles(os.path.join(self.vmm_backend, uuid)) as (sock_handle, file_handle): command_to_execute = { 'execute': command, **kwargs } sock_handle.sendall(json.dumps(command_to_execute).encode('utf-8')) output = file_handle.readline() except Exception: logger.exception('Error occurred while executing command and getting valid output from qmp') else: try: output = json.loads(output) except Exception: logger.exception('QMP Output isn\'t valid JSON. %s', output) else: return 'return' in output, output return False, None def stop(self, uuid): success, output = self.execute_command(command='quit', uuid=uuid) return success def get_status(self, uuid): success, output = self.execute_command(command='query-status', uuid=uuid) if success: return output['return']['status'] else: # TODO: Think about this for a little more return 'STOPPED' def discover(self): vms = [ uuid for uuid in os.listdir(self.vmm_backend) if not isdir(join_path(self.vmm_backend, uuid)) ] return vms def get_vnc(self, uuid): success, output = self.execute_command(uuid, command='query-vnc') if success: return output['return']['service'] return None def transfer(self, src_uuid, dest_uuid, host): p = TransferVM(src_uuid, dest_uuid, socket_dir=self.socket_dir, host=host) p.start()