import os import subprocess as sp import logging import socket import json import tempfile import time from contextlib import suppress from multiprocessing import Process from os.path import join as join_path from os.path import isdir logger = logging.getLogger(__name__) class VMQMPHandles: def __init__(self, path): self.path = path self.sock = socket.socket(socket.AF_UNIX) self.file = self.sock.makefile() def __enter__(self): self.sock.connect(self.path) # eat qmp greetings self.file.readline() # init qmp self.sock.sendall(b'{ "execute": "qmp_capabilities" }') self.file.readline() return self.sock, self.file def __exit__(self, exc_type, exc_val, exc_tb): self.file.close() self.sock.close() if exc_type: logger.error('Couldn\'t get handle for VM.', exc_type, exc_val, exc_tb) raise exc_type("Couldn't get handle for VM.") from exc_type class TransferVM(Process): def __init__(self, src_uuid, dest_uuid, host, socket_dir): self.src_uuid = src_uuid self.dest_uuid = dest_uuid self.host = host self.src_sock_path = os.path.join(socket_dir, self.src_uuid) self.dest_sock_path = os.path.join(socket_dir, self.dest_uuid) super().__init__() def run(self): with suppress(FileNotFoundError): os.remove(self.src_sock_path) command = ['ssh', '-nNT', '-L', '{}:{}'.format(self.src_sock_path, self.dest_sock_path), 'root@{}'.format(self.host)] try: p = sp.Popen(command) except Exception as e: logger.error('Couldn\' forward unix socks over ssh.', exc_info=e) else: time.sleep(2) vmm = VMM() logger.debug('Executing: ssh forwarding command: %s', command) vmm.execute_command(self.src_uuid, command='migrate', arguments={'uri': 'unix:{}'.format(self.src_sock_path)}) while p.poll() is None: success, output = vmm.execute_command(self.src_uuid, command='query-migrate') if success: status = output['return']['status'] if status != 'active': print('Migration Status: ', status) return else: print('Migration Status: ', status) else: return time.sleep(0.2) class VMM: # Virtual Machine Manager def __init__(self, qemu_path='/usr/bin/qemu-system-x86_64', vmm_backend=os.path.expanduser('~/ucloud/vmm/')): self.qemu_path = qemu_path self.vmm_backend = vmm_backend self.socket_dir = os.path.join(self.vmm_backend, 'sock') def is_running(self, uuid): sock_path = os.path.join(self.vmm_backend, uuid) try: sock = socket.socket(socket.AF_UNIX) sock.connect(sock_path) recv = sock.recv(4096) except Exception as err: # unix sock doesn't exists or it is closed logger.info('VM %s sock either don\' exists or it is closed.', uuid, 'It mean VM is stopped.', exc_info=err) else: # if we receive greetings from qmp it mean VM is running if len(recv) > 0: return True with suppress(FileNotFoundError): os.remove(sock_path) return False def start(self, *args, uuid, migration=False): # start --> sucess? migration_args = () if migration: migration_args = ('-incoming', 'unix:{}'.format(os.path.join(self.socket_dir, uuid))) if self.is_running(uuid): logger.warning('Cannot start VM. It is already running.') else: qmp_arg = ('-qmp', 'unix:{}/{},server,nowait'.format(self.vmm_backend, uuid)) vnc_arg = ('-vnc', 'unix:{}'.format(tempfile.NamedTemporaryFile().name)) command = [self.qemu_path, *args, *qmp_arg, *migration_args, *vnc_arg, '-daemonize'] try: sp.check_output(command, stderr=sp.PIPE) except sp.CalledProcessError as err: logger.exception('Error occurred while starting VM.\nDetail %s', err.stderr.decode('utf-8')) else: time.sleep(2) def execute_command(self, uuid, command, **kwargs): # execute_command -> sucess?, output try: with VMQMPHandles(os.path.join(self.vmm_backend, uuid)) as (sock_handle, file_handle): command_to_execute = { 'execute': command, **kwargs } sock_handle.sendall(json.dumps(command_to_execute).encode('utf-8')) output = file_handle.readline() except Exception as err: logger.exception('Error occurred while executing command and getting valid output from qmp') else: try: output = json.loads(output) except: logger.exception('QMP Output isn\'t valid JSON. %s', output) else: return 'return' in output, output return False, None def stop(self, uuid): success, output = self.execute_command(command='quit', uuid=uuid) return success def get_status(self, uuid): success, output = self.execute_command(command='query-status', uuid=uuid) if success: return output['return']['status'] else: return 'STOPPED' def discover(self): vms = [ uuid for uuid in os.listdir(self.vmm_backend) if not isdir(join_path(self.vmm_backend, uuid)) ] return vms def get_vnc(self, uuid): success, output = self.execute_command(uuid, command='query-vnc') if success: return output['return']['service'] return None def transfer(self, src_uuid, dest_uuid, host): p = TransferVM(src_uuid, dest_uuid, socket_dir=self.socket_dir, host=host) p.start()