diff --git a/.gitignore b/.gitignore index 93056d5..f51cc79 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.png __pycache__/ etcd3_wrapper - +.idea +.vscode diff --git a/docs/ucloud-general-working (current).dot b/docs/ucloud-general-working (current).dot new file mode 100644 index 0000000..e826ea9 --- /dev/null +++ b/docs/ucloud-general-working (current).dot @@ -0,0 +1,56 @@ +digraph G{ + + rankdir=TB; + start [shape=point]; + ucloud_api [style=filled,color=white]; + start -> ucloud_api [label="User Requests to Start VM 'vm1'"]; + + subgraph cluster_common { + style=filled; + color=lightgrey; + RequestQueue [shape=rect, style=filled, color=plum1]; + + } + + + subgraph cluster_api { + style=filled; + color=lightgrey; + node [style=filled,color=white]; + + ucloud_api -> RequestQueue [label="RequestQueue.append(Request) "] + } + + + subgraph cluster_scheduler { + rankdir=RL; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + label="ucloud-scheduler"; + + PENDING_REQUESTS [shape=rect, color=peachpuff] + + RequestQueue -> FindHost [label="Get StartVM Requests Only"]; + FindHost -> PENDING_REQUESTS [constraint=false, color=red, label="append this requests to pending requests"]; + FindHost -> RequestQueue [label="RequestQueue.append(RunVmRequest(vm='vm1'))", color=darkgreen]; + PENDING_REQUESTS -> FindHost [label="On Timeout Event.\n Try to fulfil the request again"] + + } + + subgraph cluster_vm { + rankdir=RL; + style=filled; + color=lightgrey; + node [style=filled,color=white]; + label="ucloud-vm"; + + VmTrash [shape=point, color=red] + + RequestQueue -> HandleRequest [label="Get Requests for VM Operations"] + HandleRequest -> Running [label="RunVMRequest", color=darkgreen] + HandleRequest -> Stopped [label="ShutdownVMRequest", color=darkgreen] + HandleRequest -> VmTrash [constraint=false, color=red, label="Put error message into VM's log"] + } + +} \ No newline at end of file diff --git a/helpers.py b/helpers.py index 951bf18..ec5726d 100644 --- a/helpers.py +++ b/helpers.py @@ -1,8 +1,10 @@ -from .etcd3_wrapper import EtcdEntry +import socket +import logging + +from etcd3_wrapper import EtcdEntry class SpecificEtcdEntryBase(object): - def __init__(self, e: EtcdEntry): self.key = e.key @@ -19,7 +21,22 @@ class SpecificEtcdEntryBase(object): return self.original_keys() def __repr__(self): - _return = f"""{self.key}""" - for key in self.original_keys(): - _return += f",{key} = {self.__getattribute__(key)}" - return _return + return str(dict(self.__dict__)) + + +def get_ipv4_address(): + # If host is connected to internet + # Return IPv4 address of machine + # Otherwise, return 127.0.0.1 + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s: + try: + s.connect(("8.8.8.8", 80)) + except socket.timeout: + address = "127.0.0.1" + except Exception as e: + logging.getLogger().exception(e) + address = "127.0.0.1" + else: + address = s.getsockname()[0] + + return address diff --git a/host.py b/host.py index 03ee057..5d23397 100644 --- a/host.py +++ b/host.py @@ -1,6 +1,9 @@ +from typing import List + from .helpers import SpecificEtcdEntryBase from .etcd3_wrapper import EtcdEntry from datetime import datetime +from os.path import join class HostStatus(object): @@ -39,13 +42,16 @@ class HostPool(object): self.prefix = host_prefix @property - def hosts(self): + def hosts(self) -> List[HostEntry]: _hosts = self.client.get_prefix(self.prefix, value_in_json=True) return [HostEntry(host) for host in _hosts] def get(self, key): + if not key.startswith(self.prefix): + key = join(self.prefix, key) v = self.client.get(key, value_in_json=True) - return HostEntry(v) + if v: + return HostEntry(v) def put(self, obj: HostEntry): self.client.put(obj.key, obj.value, value_in_json=True) diff --git a/request.py b/request.py new file mode 100644 index 0000000..c30406f --- /dev/null +++ b/request.py @@ -0,0 +1,39 @@ +import json +from etcd3_wrapper import Etcd3Wrapper, EtcdEntry, PsuedoEtcdEntry +from uuid import uuid4 +from .helpers import SpecificEtcdEntryBase +from os.path import join + + +class RequestType(object): + CreateVM = "CreateVM" + ScheduleVM = "ScheduleVM" + StartVM = "StartVM" + StopVM = "StopVM" + InitVMMigration = "InitVMMigration" + TransferVM = "TransferVM" + DeleteVM = "DeleteVM" + + +class RequestEntry(SpecificEtcdEntryBase): + def __init__(self, e: EtcdEntry): + self.type = "" + super().__init__(e) + + @classmethod + def from_scratch(cls, **kwargs): + e = PsuedoEtcdEntry(join("/v1/request/", uuid4().hex), value=json.dumps(kwargs).encode("utf-8"), + value_in_json=True) + return cls(e) + + +class RequestPool(object): + def __init__(self, etcd_client, request_prefix): + self.client = etcd_client + self.prefix = request_prefix + + def put(self, obj: RequestEntry): + if not obj.key.startswith(self.prefix): + obj.key = join(self.prefix, obj.key) + + self.client.put(obj.key, obj.value, value_in_json=True) \ No newline at end of file diff --git a/test.py b/test.py deleted file mode 100644 index 5549645..0000000 --- a/test.py +++ /dev/null @@ -1,13 +0,0 @@ -from vm import VmPool -from etcd3_wrapper import Etcd3Wrapper - -client = Etcd3Wrapper() -vm_pool = VmPool(client, "/v1/vm") -# vms = vm_pool.by_status("REQUESTED_NEW") -# for vm in vms: -# print(vm) -# print() - -v = vm_pool.get("/v1/vm/8402926c7a164966982eae48b2d6d1a9") -# print(dir(v)) -print(v) \ No newline at end of file diff --git a/vm.py b/vm.py index b544166..25f94fd 100644 --- a/vm.py +++ b/vm.py @@ -1,23 +1,21 @@ +from contextlib import contextmanager +from datetime import datetime from etcd3_wrapper import EtcdEntry from .helpers import SpecificEtcdEntryBase +from os.path import join class VMStatus(object): # Must be only assigned to brand new VM requested_new = "REQUESTED_NEW" - # Host assigned to VM but not created yet. - scheduled_deploy = "SCHEDULED_DEPLOY" - # Only Assigned to already created vm requested_start = "REQUESTED_START" # These all are for running vms requested_shutdown = "REQUESTED_SHUTDOWN" - requested_suspend = "REQUESTED_SUSPEND" - requested_resume = "REQUESTED_RESUME" requested_migrate = "REQUESTED_MIGRATE" - + requested_delete = "REQUESTED_DELETE" # either its image is not found or user requested # to delete it deleted = "DELETED" @@ -26,12 +24,6 @@ class VMStatus(object): killed = "KILLED" # either host died or vm died itself running = "RUNNING" - suspended = "SUSPENDED" - - -RUNNING_VM_STATUSES = [VMStatus.requested_shutdown, VMStatus.requested_suspend, - VMStatus.requested_resume, VMStatus.requested_migrate, - VMStatus.running, VMStatus.suspended] class VMEntry(SpecificEtcdEntryBase): @@ -41,18 +33,33 @@ class VMEntry(SpecificEtcdEntryBase): self.hostname = "" self.status = "" self.image_uuid = "" - + self.log = [] + self.in_migration = False super().__init__(e) - + @property def uuid(self): return self.key.split("/")[-1] def declare_killed(self): self.hostname = "" - if self.status in RUNNING_VM_STATUSES: + self.in_migration = False + if self.status == VMStatus.running: self.status = VMStatus.killed + def declare_stopped(self): + self.hostname = "" + self.in_migration = False + self.status = VMStatus.stopped + + def add_log(self, msg): + self.log = self.log[:5] + self.log.append(f"{datetime.now().isoformat()} - {msg}") + + @property + def path(self): + return f"rbd:uservms/{self.uuid}" + class VmPool(object): def __init__(self, etcd_client, vm_prefix): @@ -80,8 +87,19 @@ class VmPool(object): return list(filter(lambda x: x.status != status, _vms)) def get(self, key): + if not key.startswith(self.prefix): + key = join(self.prefix, key) v = self.client.get(key, value_in_json=True) - return VMEntry(v) - + if v: + return VMEntry(v) + def put(self, obj: VMEntry): self.client.put(obj.key, obj.value, value_in_json=True) + + @contextmanager + def get_put(self, key) -> VMEntry: + # Updates object at key on exit + obj = self.get(key) + yield obj + if obj: + self.put(obj)