From 080933b1404ab78deecd9990d9f8f393d783f6dc Mon Sep 17 00:00:00 2001 From: meow Date: Sun, 11 Aug 2019 22:01:27 +0500 Subject: [PATCH] Rolled out new request mechanism, vm migration view added --- .gitmodules | 3 -- etcd3_wrapper | 1 - main.py | 117 ++++++++++++++++++++++++++++++++++++++------------ schemas.py | 53 ++++++++++++++++++++--- 4 files changed, 137 insertions(+), 37 deletions(-) delete mode 160000 etcd3_wrapper diff --git a/.gitmodules b/.gitmodules index 316b9ea..e69de29 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +0,0 @@ -[submodule "etcd3_wrapper"] - path = etcd3_wrapper - url = https://code.ungleich.ch/ahmedbilal/etcd3_wrapper diff --git a/etcd3_wrapper b/etcd3_wrapper deleted file mode 160000 index cb2a416..0000000 --- a/etcd3_wrapper +++ /dev/null @@ -1 +0,0 @@ -Subproject commit cb2a416a17d6789e613ba3b9957917770f4211e1 diff --git a/main.py b/main.py index 8bad54e..5132372 100644 --- a/main.py +++ b/main.py @@ -3,45 +3,63 @@ # any user vm. import json +import subprocess from flask import Flask, request from flask_restful import Resource, Api from uuid import uuid4 from etcd3_wrapper import Etcd3Wrapper +from os.path import join from ucloud_common.vm import VmPool, VMStatus +from ucloud_common.host import HostPool +from ucloud_common.request import RequestEntry, RequestPool, RequestType from schemas import (CreateVMSchema, VMStatusSchema, CreateImageSchema, VmActionSchema, - OTPSchema, CreateHostSchema) + OTPSchema, CreateHostSchema, + VmMigrationSchema) app = Flask(__name__) api = Api(app) client = Etcd3Wrapper() vm_pool = VmPool(client, "/v1/vm") +host_pool = HostPool(client, "/v1/host") +request_pool = RequestPool(client, "/v1/request") class CreateVM(Resource): - def post(self): + @staticmethod + def post(): data = request.json validator = CreateVMSchema(data) if validator.is_valid(): - vm_key = f"/v1/vm/{uuid4().hex}" + # Create VM Entry under /v1/vm/ + vm_uuid = uuid4().hex + vm_key = f"/v1/vm/{vm_uuid}" vm_entry = { "owner": data["name"], "specs": data["specs"], "hostname": "", - "status": "REQUESTED_NEW", + "status": "", "image_uuid": data["image_uuid"], + "log": [] } client.put(vm_key, vm_entry, value_in_json=True) + + # Create ScheduleVM Request + r = RequestEntry.from_scratch(type=RequestType.ScheduleVM, + uuid=vm_uuid) + request_pool.put(r) + return {"message": "VM Creation Queued"}, 200 else: return validator.get_errors(), 400 class VmStatus(Resource): - def get(self): + @staticmethod + def get(): data = request.json validator = VMStatusSchema(data) if validator.is_valid(): @@ -52,7 +70,8 @@ class VmStatus(Resource): class CreateImage(Resource): - def post(self): + @staticmethod + def post(): data = request.json validator = CreateImageSchema(data) if validator.is_valid(): @@ -69,13 +88,14 @@ class CreateImage(Resource): } client.put(f"/v1/image/{data['uuid']}", json.dumps(image_entry_json)) - return {"Message": "Image successfully created"} + return {"message": "Image successfully created"} else: return validator.get_errors(), 400 class ListPublicImages(Resource): - def get(self): + @staticmethod + def get(): images = client.get_prefix("/v1/image/") r = {} for image in images: @@ -85,32 +105,64 @@ class ListPublicImages(Resource): class VMAction(Resource): - def post(self): + @staticmethod + def post(): data = request.json validator = VmActionSchema(data) - action = data["action"] if validator.is_valid(): - vm = vm_pool.get(f"/v1/vm/{data['uuid']}") - if action == "start": - vm.status = VMStatus.requested_start - elif action == "suspend": - vm.status = VMStatus.requested_suspend - elif action == "resume": - vm.status = VMStatus.requested_resume - elif action == "shutdown": - vm.status = VMStatus.requested_shutdown - elif action == "delete": - vm.status = VMStatus.requested_delete + vm_entry = vm_pool.get(f"/v1/vm/{data['uuid']}") + action = data["action"] - client.put(vm.key, json.dumps(vm.value)) - return {"message": f"VM Start Queued"} + if action == "start": + vm_entry.status = VMStatus.requested_start + vm_pool.put(vm_entry) + action = "schedule" + + if action == "delete" and vm_entry.hostname == "": + try: + path_without_protocol = vm_entry.path[vm_entry.path.find(":")+1:] + rc = subprocess.call(f"rbd rm {path_without_protocol}".split(" ")) + except FileNotFoundError: + return {"message": "VM image does not exists"} + else: + if rc == 0: + client.client.delete(vm_entry.key) + return {"message": "VM successfully deleted"} + else: + return {"message": "Some error occurred while deleting VM"} + + r = RequestEntry.from_scratch(type=f"{action.title()}VM", + uuid=data['uuid'], + hostname=vm_entry.hostname) + request_pool.put(r) + return {"message": f"VM {action.title()} Queued"}, 200 + else: + return validator.get_errors(), 400 + + +class VMMigration(Resource): + @staticmethod + def post(): + data = request.json + validator = VmMigrationSchema(data) + + if validator.is_valid(): + vm = vm_pool.get(data['uuid']) + + r = RequestEntry.from_scratch(type=RequestType.ScheduleVM, + uuid=vm.uuid, + destination=join("/v1/host", data["destination"]), + migration=True) + request_pool.put(r) + return {"message": f"VM Migration Initialization Queued"}, 200 else: return validator.get_errors(), 400 class ListUserVM(Resource): - def get(self): + @staticmethod + def get(): data = request.json validator = OTPSchema(data) @@ -135,7 +187,8 @@ class ListUserVM(Resource): class ListUserFiles(Resource): - def get(self): + @staticmethod + def get(): data = request.json validator = OTPSchema(data) @@ -159,7 +212,8 @@ class ListUserFiles(Resource): class CreateHost(Resource): - def post(self): + @staticmethod + def post(): data = request.json validator = CreateHostSchema(data) if validator.is_valid(): @@ -177,10 +231,19 @@ class CreateHost(Resource): return validator.get_errors(), 400 +class ListHost(Resource): + @staticmethod + def get(): + hosts = host_pool.hosts + r = {host.key: {"status": host.status, "specs": host.specs, "hostname": host.hostname} for host in hosts} + return r, 200 + + api.add_resource(CreateVM, "/vm/create") api.add_resource(VmStatus, "/vm/status") api.add_resource(VMAction, "/vm/action") +api.add_resource(VMMigration, "/vm/migrate") api.add_resource(CreateImage, "/image/create") api.add_resource(ListPublicImages, "/image/list-public") @@ -189,7 +252,7 @@ api.add_resource(ListUserVM, "/user/vms") api.add_resource(ListUserFiles, "/user/files") api.add_resource(CreateHost, "/host/create") - +api.add_resource(ListHost, "/host/list") if __name__ == "__main__": app.run(host="::", debug=True) diff --git a/schemas.py b/schemas.py index a11d900..37f2f8a 100644 --- a/schemas.py +++ b/schemas.py @@ -1,10 +1,15 @@ import json from common_fields import Field, VmUUIDField, SpecsField +from ucloud_common.host import HostPool, HostStatus +from ucloud_common.vm import VmPool, VMStatus from helper import check_otp from etcd3_wrapper import Etcd3Wrapper +from os.path import join client = Etcd3Wrapper() +host_pool = HostPool(client, "/v1/host") +vm_pool = VmPool(client, "/v1/vm") class BaseSchema(object): @@ -39,7 +44,7 @@ class BaseSchema(object): return True def get_errors(self): - return {"Message": self.__errors} + return {"message": self.__errors} def add_field_errors(self, field: Field): self.__errors += field.get_errors() @@ -125,19 +130,55 @@ class VmActionSchema(OTPSchema): self.action.validation = self.action_validation - fields = [self.uuid, self.action] - super().__init__(data=data, fields=fields) + _fields = [self.uuid, self.action] + super().__init__(data=data, fields=_fields) def action_validation(self): - allowed_actions = ["start", "shutdown", "suspend", "resume", "delete"] + allowed_actions = ["start", "stop", "delete"] if self.action.value not in allowed_actions: self.add_error(f"Invalid Action. Allowed Actions are {allowed_actions}") def validation(self): - vm = client.get(f"/v1/vm/{self.uuid.value}", value_in_json=True) - if vm.value["owner"] != self.name: + vm = vm_pool.get(self.uuid.value) + if vm.value["owner"] != self.name.value: self.add_error("Invalid User") + if self.action.value == "start" and vm.status == VMStatus.running and vm.hostname != "": + self.add_error("VM Already Running") + + if self.action.value == "stop" and vm.status == VMStatus.stopped: + self.add_error("VM Already Stopped") + + +class VmMigrationSchema(OTPSchema): + def __init__(self, data): + self.uuid = VmUUIDField(data) + self.destination = Field("destination", str, data.get("destination", KeyError)) + + self.destination.validation = self.destination_validation + + fields = [self.destination] + super().__init__(data=data, fields=fields) + + def destination_validation(self): + host_key = self.destination.value + host = host_pool.get(host_key) + if not host: + self.add_error(f"No Such Host ({self.destination.value}) exists") + elif host.status != HostStatus.alive: + self.add_error("Destination Host is dead") + + def validation(self): + vm = vm_pool.get(self.uuid.value) + if vm.owner != self.name.value: + self.add_error("Invalid User") + + if vm.status != VMStatus.running: + self.add_error("Can't migrate non-running VM") + + if vm.hostname == join("/v1/host", self.destination.value): + self.add_error("Destination host couldn't be same as Source Host") + class CreateHostSchema(OTPSchema): def __init__(self, data):