commit f479b961b031d497b9902f47ea4090d26ebacbc0 Author: Ahmad Bilal Khalid Date: Wed Jul 3 18:02:21 2019 +0500 initial code added diff --git a/.env b/.env new file mode 100644 index 0000000..0c1a50c --- /dev/null +++ b/.env @@ -0,0 +1 @@ +HOST_NAME=/v1/host/1 \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..e0e2034 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.vscode/ +.idea/ +__pycache__/ +venv/ diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..316b9ea --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "etcd3_wrapper"] + path = etcd3_wrapper + url = https://code.ungleich.ch/ahmedbilal/etcd3_wrapper diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..c8444f0 --- /dev/null +++ b/Pipfile @@ -0,0 +1,13 @@ +[[source]] +name = "pypi" +url = "https://pypi.org/simple" +verify_ssl = true + +[dev-packages] + +[packages] +python-decouple = "*" +etcd3 = "*" + +[requires] +python_version = "3.7" diff --git a/etcd3_wrapper b/etcd3_wrapper new file mode 160000 index 0000000..cb2a416 --- /dev/null +++ b/etcd3_wrapper @@ -0,0 +1 @@ +Subproject commit cb2a416a17d6789e613ba3b9957917770f4211e1 diff --git a/main.py b/main.py new file mode 100644 index 0000000..1aaa914 --- /dev/null +++ b/main.py @@ -0,0 +1,115 @@ +# TODO +# Implement Monitoring of VM + +import json +import shutil +import os +import subprocess + +# For Commands Information +# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor + + +import qmp + +from etcd3_wrapper import Etcd3Wrapper +from decouple import config + + +def get_vm_start_cmd(owner_dir, vm_uuid): + vm_sock_flags = f"-qmp unix:{owner_dir}/.vm/{vm_uuid}-sock,server,nowait" + vm_start_command_flags = f"-boot c -net nic -net user -m 256 {vm_sock_flags} -daemonize" + vm_start_command = f"""qemu-system-x86_64 {owner_dir}/.vm/{vm_uuid}.raw {vm_start_command_flags}""" + return vm_start_command + + +def get_qemu_mon(sock_file): + m = qmp.QEMUMonitorProtocol(sock_file) + try: + m.connect() + except FileNotFoundError as _: + return None + return m + + +def main(): + client = Etcd3Wrapper() + hostname = config("HOST_NAME") + + events = client.watch_prefix("/v1/vm/") + + # events = client.get_prefix("/v1/vm/") + for e in events: + e.value = json.loads(e.value) + e_hostname = e.value["hostname"] + e_status = e.value["status"] + vm_uuid = e.key.split("/")[-1] + owner_dir = f"/var/www/{e.value['owner']}" + + # If it is not for me then skip it + if e_hostname != hostname: + continue + print(e_status, e) + + if e_status == "SCHEDULED_DEPLOY": + image = client.get( + f"/v1/image/{e.value['image_uuid']}", value_in_json=True + ) + if image: + image_uuid = e.value["image_uuid"] + print(image) + print("Creating New VM...") + + os.makedirs(f"{owner_dir}/.vm", exist_ok=True) + + if not os.path.isfile(f"{owner_dir}/.vm/{vm_uuid}.raw"): + shutil.copy( + f"/var/vm/{image_uuid}.raw", + f"{owner_dir}/.vm/{vm_uuid}.raw", + ) + + e.value["status"] = "REQUESTED_START" + client.put(e.key, json.dumps(e.value)) + + elif e_status == "REQUESTED_SUSPEND": + m = get_qemu_mon(f"{owner_dir}/.vm/{vm_uuid}-sock") + + if m: + print("Suspending") + m.command("stop") + m.close() + e.value["status"] = "SUSPENDED" + client.put(e.key, json.dumps(e.value)) + else: + print("VM Not Running") + + elif e_status == "REQUESTED_RESUME": + m = get_qemu_mon(f"{owner_dir}/.vm/{vm_uuid}-sock") + if m: + print("Resuming") + + m.command("cont") + m.close() + + e.value["status"] = "RESUMED" + client.put(e.key, json.dumps(e.value)) + else: + print("VM Not Running") + + elif e_status == "REQUESTED_START": + m = get_qemu_mon(f"{owner_dir}/.vm/{vm_uuid}-sock") + if m: + m.close() + print("VM already running") + e.value["status"] = "RUNNING" + client.put(e.key, e.value, value_in_json=True) + else: + print("Starting VM") + subprocess.run(get_vm_start_cmd(owner_dir, vm_uuid).split(" ")) + e.value["status"] = "RUNNING" + client.put(e.key, e.value, value_in_json=True) + else: + continue + + +main() diff --git a/qmp.py b/qmp.py new file mode 100644 index 0000000..5c8cf6a --- /dev/null +++ b/qmp.py @@ -0,0 +1,256 @@ +# QEMU Monitor Protocol Python class +# +# Copyright (C) 2009, 2010 Red Hat Inc. +# +# Authors: +# Luiz Capitulino +# +# This work is licensed under the terms of the GNU GPL, version 2. See +# the COPYING file in the top-level directory. + +import json +import errno +import socket +import logging + + +class QMPError(Exception): + pass + + +class QMPConnectError(QMPError): + pass + + +class QMPCapabilitiesError(QMPError): + pass + + +class QMPTimeoutError(QMPError): + pass + + +class QEMUMonitorProtocol(object): + + #: Logger object for debugging messages + logger = logging.getLogger('QMP') + #: Socket's error class + error = socket.error + #: Socket's timeout + timeout = socket.timeout + + def __init__(self, address, server=False): + """ + Create a QEMUMonitorProtocol class. + + @param address: QEMU address, can be either a unix socket path (string) + or a tuple in the form ( address, port ) for a TCP + connection + @param server: server mode listens on the socket (bool) + @raise socket.error on socket connection errors + @note No connection is established, this is done by the connect() or + accept() methods + """ + self.__events = [] + self.__address = address + self.__sock = self.__get_sock() + self.__sockfile = None + if server: + self.__sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + self.__sock.bind(self.__address) + self.__sock.listen(1) + + def __get_sock(self): + if isinstance(self.__address, tuple): + family = socket.AF_INET + else: + family = socket.AF_UNIX + return socket.socket(family, socket.SOCK_STREAM) + + def __negotiate_capabilities(self): + greeting = self.__json_read() + if greeting is None or "QMP" not in greeting: + raise QMPConnectError + # Greeting seems ok, negotiate capabilities + resp = self.cmd('qmp_capabilities') + if "return" in resp: + return greeting + raise QMPCapabilitiesError + + def __json_read(self, only_event=False): + while True: + data = self.__sockfile.readline() + if not data: + return + resp = json.loads(data) + if 'event' in resp: + self.logger.debug("<<< %s", resp) + self.__events.append(resp) + if not only_event: + continue + return resp + + def __get_events(self, wait=False): + """ + Check for new events in the stream and cache them in __events. + + @param wait (bool): block until an event is available. + @param wait (float): If wait is a float, treat it as a timeout value. + + @raise QMPTimeoutError: If a timeout float is provided and the timeout + period elapses. + @raise QMPConnectError: If wait is True but no events could be + retrieved or if some other error occurred. + """ + + # Check for new events regardless and pull them into the cache: + self.__sock.setblocking(0) + try: + self.__json_read() + except socket.error as err: + if err[0] == errno.EAGAIN: + # No data available + pass + self.__sock.setblocking(1) + + # Wait for new events, if needed. + # if wait is 0.0, this means "no wait" and is also implicitly false. + if not self.__events and wait: + if isinstance(wait, float): + self.__sock.settimeout(wait) + try: + ret = self.__json_read(only_event=True) + except socket.timeout: + raise QMPTimeoutError("Timeout waiting for event") + except: + raise QMPConnectError("Error while reading from socket") + if ret is None: + raise QMPConnectError("Error while reading from socket") + self.__sock.settimeout(None) + + def connect(self, negotiate=True): + """ + Connect to the QMP Monitor and perform capabilities negotiation. + + @return QMP greeting dict + @raise socket.error on socket connection errors + @raise QMPConnectError if the greeting is not received + @raise QMPCapabilitiesError if fails to negotiate capabilities + """ + self.__sock.connect(self.__address) + self.__sockfile = self.__sock.makefile() + if negotiate: + return self.__negotiate_capabilities() + + def accept(self): + """ + Await connection from QMP Monitor and perform capabilities negotiation. + + @return QMP greeting dict + @raise socket.error on socket connection errors + @raise QMPConnectError if the greeting is not received + @raise QMPCapabilitiesError if fails to negotiate capabilities + """ + self.__sock.settimeout(15) + self.__sock, _ = self.__sock.accept() + self.__sockfile = self.__sock.makefile() + return self.__negotiate_capabilities() + + def cmd_obj(self, qmp_cmd): + """ + Send a QMP command to the QMP Monitor. + + @param qmp_cmd: QMP command to be sent as a Python dict + @return QMP response as a Python dict or None if the connection has + been closed + """ + self.logger.debug(">>> %s", qmp_cmd) + try: + self.__sock.sendall(json.dumps(qmp_cmd).encode('utf-8')) + except socket.error as err: + if err[0] == errno.EPIPE: + return + raise socket.error(err) + resp = self.__json_read() + self.logger.debug("<<< %s", resp) + return resp + + def cmd(self, name, args=None, cmd_id=None): + """ + Build a QMP command and send it to the QMP Monitor. + + @param name: command name (string) + @param args: command arguments (dict) + @param cmd_id: command id (dict, list, string or int) + """ + qmp_cmd = {'execute': name} + if args: + qmp_cmd['arguments'] = args + if cmd_id: + qmp_cmd['id'] = cmd_id + return self.cmd_obj(qmp_cmd) + + def command(self, cmd, **kwds): + """ + Build and send a QMP command to the monitor, report errors if any + """ + ret = self.cmd(cmd, kwds) + if "error" in ret: + raise Exception(ret['error']['desc']) + return ret['return'] + + def pull_event(self, wait=False): + """ + Pulls a single event. + + @param wait (bool): block until an event is available. + @param wait (float): If wait is a float, treat it as a timeout value. + + @raise QMPTimeoutError: If a timeout float is provided and the timeout + period elapses. + @raise QMPConnectError: If wait is True but no events could be + retrieved or if some other error occurred. + + @return The first available QMP event, or None. + """ + self.__get_events(wait) + + if self.__events: + return self.__events.pop(0) + return None + + def get_events(self, wait=False): + """ + Get a list of available QMP events. + + @param wait (bool): block until an event is available. + @param wait (float): If wait is a float, treat it as a timeout value. + + @raise QMPTimeoutError: If a timeout float is provided and the timeout + period elapses. + @raise QMPConnectError: If wait is True but no events could be + retrieved or if some other error occurred. + + @return The list of available QMP events. + """ + self.__get_events(wait) + return self.__events + + def clear_events(self): + """ + Clear current list of pending events. + """ + self.__events = [] + + def close(self): + self.__sock.close() + self.__sockfile.close() + + def settimeout(self, timeout): + self.__sock.settimeout(timeout) + + def get_sock_fd(self): + return self.__sock.fileno() + + def is_scm_available(self): + return self.__sock.family == socket.AF_UNIX