request mechanism helpers added and few other small changes

This commit is contained in:
ahmadbilalkhalid 2019-08-12 18:35:35 +05:00
parent 01296e998e
commit a014220f7a
7 changed files with 163 additions and 39 deletions

3
.gitignore vendored
View File

@ -1,4 +1,5 @@
*.png
__pycache__/
etcd3_wrapper
.idea
.vscode

View File

@ -0,0 +1,56 @@
digraph G{
rankdir=TB;
start [shape=point];
ucloud_api [style=filled,color=white];
start -> ucloud_api [label="User Requests to Start VM 'vm1'"];
subgraph cluster_common {
style=filled;
color=lightgrey;
RequestQueue [shape=rect, style=filled, color=plum1];
}
subgraph cluster_api {
style=filled;
color=lightgrey;
node [style=filled,color=white];
ucloud_api -> RequestQueue [label="RequestQueue.append(Request) "]
}
subgraph cluster_scheduler {
rankdir=RL;
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="ucloud-scheduler";
PENDING_REQUESTS [shape=rect, color=peachpuff]
RequestQueue -> FindHost [label="Get StartVM Requests Only"];
FindHost -> PENDING_REQUESTS [constraint=false, color=red, label="append this requests to pending requests"];
FindHost -> RequestQueue [label="RequestQueue.append(RunVmRequest(vm='vm1'))", color=darkgreen];
PENDING_REQUESTS -> FindHost [label="On Timeout Event.\n Try to fulfil the request again"]
}
subgraph cluster_vm {
rankdir=RL;
style=filled;
color=lightgrey;
node [style=filled,color=white];
label="ucloud-vm";
VmTrash [shape=point, color=red]
RequestQueue -> HandleRequest [label="Get Requests for VM Operations"]
HandleRequest -> Running [label="RunVMRequest", color=darkgreen]
HandleRequest -> Stopped [label="ShutdownVMRequest", color=darkgreen]
HandleRequest -> VmTrash [constraint=false, color=red, label="Put error message into VM's log"]
}
}

View File

@ -1,8 +1,10 @@
from .etcd3_wrapper import EtcdEntry
import socket
import logging
from etcd3_wrapper import EtcdEntry
class SpecificEtcdEntryBase(object):
def __init__(self, e: EtcdEntry):
self.key = e.key
@ -19,7 +21,22 @@ class SpecificEtcdEntryBase(object):
return self.original_keys()
def __repr__(self):
_return = f"""{self.key}"""
for key in self.original_keys():
_return += f",{key} = {self.__getattribute__(key)}"
return _return
return str(dict(self.__dict__))
def get_ipv4_address():
# If host is connected to internet
# Return IPv4 address of machine
# Otherwise, return 127.0.0.1
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
try:
s.connect(("8.8.8.8", 80))
except socket.timeout:
address = "127.0.0.1"
except Exception as e:
logging.getLogger().exception(e)
address = "127.0.0.1"
else:
address = s.getsockname()[0]
return address

10
host.py
View File

@ -1,6 +1,9 @@
from typing import List
from .helpers import SpecificEtcdEntryBase
from .etcd3_wrapper import EtcdEntry
from datetime import datetime
from os.path import join
class HostStatus(object):
@ -39,13 +42,16 @@ class HostPool(object):
self.prefix = host_prefix
@property
def hosts(self):
def hosts(self) -> List[HostEntry]:
_hosts = self.client.get_prefix(self.prefix, value_in_json=True)
return [HostEntry(host) for host in _hosts]
def get(self, key):
if not key.startswith(self.prefix):
key = join(self.prefix, key)
v = self.client.get(key, value_in_json=True)
return HostEntry(v)
if v:
return HostEntry(v)
def put(self, obj: HostEntry):
self.client.put(obj.key, obj.value, value_in_json=True)

39
request.py Normal file
View File

@ -0,0 +1,39 @@
import json
from etcd3_wrapper import Etcd3Wrapper, EtcdEntry, PsuedoEtcdEntry
from uuid import uuid4
from .helpers import SpecificEtcdEntryBase
from os.path import join
class RequestType(object):
CreateVM = "CreateVM"
ScheduleVM = "ScheduleVM"
StartVM = "StartVM"
StopVM = "StopVM"
InitVMMigration = "InitVMMigration"
TransferVM = "TransferVM"
DeleteVM = "DeleteVM"
class RequestEntry(SpecificEtcdEntryBase):
def __init__(self, e: EtcdEntry):
self.type = ""
super().__init__(e)
@classmethod
def from_scratch(cls, **kwargs):
e = PsuedoEtcdEntry(join("/v1/request/", uuid4().hex), value=json.dumps(kwargs).encode("utf-8"),
value_in_json=True)
return cls(e)
class RequestPool(object):
def __init__(self, etcd_client, request_prefix):
self.client = etcd_client
self.prefix = request_prefix
def put(self, obj: RequestEntry):
if not obj.key.startswith(self.prefix):
obj.key = join(self.prefix, obj.key)
self.client.put(obj.key, obj.value, value_in_json=True)

13
test.py
View File

@ -1,13 +0,0 @@
from vm import VmPool
from etcd3_wrapper import Etcd3Wrapper
client = Etcd3Wrapper()
vm_pool = VmPool(client, "/v1/vm")
# vms = vm_pool.by_status("REQUESTED_NEW")
# for vm in vms:
# print(vm)
# print()
v = vm_pool.get("/v1/vm/8402926c7a164966982eae48b2d6d1a9")
# print(dir(v))
print(v)

52
vm.py
View File

@ -1,23 +1,21 @@
from contextlib import contextmanager
from datetime import datetime
from etcd3_wrapper import EtcdEntry
from .helpers import SpecificEtcdEntryBase
from os.path import join
class VMStatus(object):
# Must be only assigned to brand new VM
requested_new = "REQUESTED_NEW"
# Host assigned to VM but not created yet.
scheduled_deploy = "SCHEDULED_DEPLOY"
# Only Assigned to already created vm
requested_start = "REQUESTED_START"
# These all are for running vms
requested_shutdown = "REQUESTED_SHUTDOWN"
requested_suspend = "REQUESTED_SUSPEND"
requested_resume = "REQUESTED_RESUME"
requested_migrate = "REQUESTED_MIGRATE"
requested_delete = "REQUESTED_DELETE"
# either its image is not found or user requested
# to delete it
deleted = "DELETED"
@ -26,12 +24,6 @@ class VMStatus(object):
killed = "KILLED" # either host died or vm died itself
running = "RUNNING"
suspended = "SUSPENDED"
RUNNING_VM_STATUSES = [VMStatus.requested_shutdown, VMStatus.requested_suspend,
VMStatus.requested_resume, VMStatus.requested_migrate,
VMStatus.running, VMStatus.suspended]
class VMEntry(SpecificEtcdEntryBase):
@ -41,18 +33,33 @@ class VMEntry(SpecificEtcdEntryBase):
self.hostname = ""
self.status = ""
self.image_uuid = ""
self.log = []
self.in_migration = False
super().__init__(e)
@property
def uuid(self):
return self.key.split("/")[-1]
def declare_killed(self):
self.hostname = ""
if self.status in RUNNING_VM_STATUSES:
self.in_migration = False
if self.status == VMStatus.running:
self.status = VMStatus.killed
def declare_stopped(self):
self.hostname = ""
self.in_migration = False
self.status = VMStatus.stopped
def add_log(self, msg):
self.log = self.log[:5]
self.log.append(f"{datetime.now().isoformat()} - {msg}")
@property
def path(self):
return f"rbd:uservms/{self.uuid}"
class VmPool(object):
def __init__(self, etcd_client, vm_prefix):
@ -80,8 +87,19 @@ class VmPool(object):
return list(filter(lambda x: x.status != status, _vms))
def get(self, key):
if not key.startswith(self.prefix):
key = join(self.prefix, key)
v = self.client.get(key, value_in_json=True)
return VMEntry(v)
if v:
return VMEntry(v)
def put(self, obj: VMEntry):
self.client.put(obj.key, obj.value, value_in_json=True)
@contextmanager
def get_put(self, key) -> VMEntry:
# Updates object at key on exit
obj = self.get(key)
yield obj
if obj:
self.put(obj)