From 93dee1c9fcd18e095d355f026409afa6e43f5b91 Mon Sep 17 00:00:00 2001 From: meow Date: Sat, 2 Nov 2019 20:42:24 +0500 Subject: [PATCH] New Features + Refactoring 1. User can now use image name instead of image uuid when creation vm. For Example, now user can create an alpine vm using the following command ```shell ucloud-cli vm create --vm-name myvm --cpu 2 --ram '2GB' \ --os-ssd '10GB' --image images:alpine ``` 2. Instead of directly running code, code is now placed under a function main and is called using the following code ```python if __name__ == "__main__": main() ``` 3. Multiprocess (Process) is used instead of threading (Thread) to update heart beat of host. 4. IP Address of vm is included in vm's status which is retrieved by the following command ```shell ucloud-cli vm status --vm-name myvm ``` --- .gitignore | 2 + Pipfile.lock | 60 ++++++++------- TODO.md | 3 +- api/helper.py | 76 ++++++++++++++++++- api/main.py | 56 +++++++------- api/schemas.py | 20 +++-- filescanner/main.py | 125 ++++++++++++++++++------------- host/config.py | 5 +- host/main.py | 55 ++++++++------ imagescanner/main.py | 172 ++++++++++++++++++++++--------------------- metadata/config.py | 1 + metadata/main.py | 5 +- scheduler/main.py | 7 +- 13 files changed, 354 insertions(+), 233 deletions(-) diff --git a/.gitignore b/.gitignore index 5430a7a..bfdbce1 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,6 @@ .vscode .env +__pycache__ + */log.txt \ No newline at end of file diff --git a/Pipfile.lock b/Pipfile.lock index aaa7369..62c17ca 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -83,10 +83,12 @@ "sha256:7cfcfda59ef1f95b9f729c56fe8a4041899f96b72685d36ef16a3440a0f85da8", "sha256:819f8d5197c2684524637f940445c06e003c4a541f9983fd30d6deaa2a5487d8", "sha256:825ecffd9574557590e3225560a8a9d751f6ffe4a49e3c40918c9969b93395fa", + "sha256:8a2bcae2258d00fcfc96a9bde4a6177bc4274fe033f79311c5dd3d3148c26518", "sha256:9009e917d8f5ef780c2626e29b6bc126f4cb2a4d43ca67aa2b40f2a5d6385e78", "sha256:9c77564a51d4d914ed5af096cd9843d90c45b784b511723bd46a8a9d09cf16fc", "sha256:a19089fa74ed19c4fe96502a291cfdb89223a9705b1d73b3005df4256976142e", "sha256:a40ed527bffa2b7ebe07acc5a3f782da072e262ca994b4f2085100b5a444bbb2", + "sha256:b8f09f21544b9899defb09afbdaeb200e6a87a2b8e604892940044cf94444644", "sha256:bb75ba21d5716abc41af16eac1145ab2e471deedde1f22c6f99bd9f995504df0", "sha256:e22a00c0c81ffcecaf07c2bfb3672fa372c50e2bd1024ffee0da191c1b27fc71", "sha256:e55b5a746fb77f10c83e8af081979351722f6ea48facea79d470b3731c7b2891", @@ -293,7 +295,6 @@ }, "pycparser": { "hashes": [ - "sha256:9d97450dc26e1d2581c18881d8d1c0a92e84c9ac074961e3dc66e70d745a0643", "sha256:a988718abfad80b6b157acce7bf130a30876d27603738ac39f140993246b25b3" ], "version": "==2.19" @@ -375,15 +376,15 @@ }, "tenacity": { "hashes": [ - "sha256:6a7511a59145c2e319b7d04ddd93c12d48cc3d3c8fa42c2846d33a620ee91f57", - "sha256:a4eb168dbf55ed2cae27e7c6b2bd48ab54dabaf294177d998330cf59f294c112" + "sha256:3a916e734559f1baa2cab965ee00061540c41db71c3bf25375b81540a19758fc", + "sha256:e664bd94f088b17f46da33255ae33911ca6a0fe04b156d334b601a4ef66d3c5f" ], - "version": "==5.1.1" + "version": "==5.1.5" }, "ucloud-common": { "editable": true, "git": "https://code.ungleich.ch/ucloud/ucloud_common.git", - "ref": "9f229eae27f9007e9c6c1021d3d5b12452863763" + "ref": "eba92e5d6723093a3cc2999ae1f5c284e65dc809" }, "urllib3": { "hashes": [ @@ -509,26 +510,29 @@ }, "lazy-object-proxy": { "hashes": [ - "sha256:02b260c8deb80db09325b99edf62ae344ce9bc64d68b7a634410b8e9a568edbf", - "sha256:18f9c401083a4ba6e162355873f906315332ea7035803d0fd8166051e3d402e3", - "sha256:1f2c6209a8917c525c1e2b55a716135ca4658a3042b5122d4e3413a4030c26ce", - "sha256:2f06d97f0ca0f414f6b707c974aaf8829c2292c1c497642f63824119d770226f", - "sha256:616c94f8176808f4018b39f9638080ed86f96b55370b5a9463b2ee5c926f6c5f", - "sha256:63b91e30ef47ef68a30f0c3c278fbfe9822319c15f34b7538a829515b84ca2a0", - "sha256:77b454f03860b844f758c5d5c6e5f18d27de899a3db367f4af06bec2e6013a8e", - "sha256:83fe27ba321e4cfac466178606147d3c0aa18e8087507caec78ed5a966a64905", - "sha256:84742532d39f72df959d237912344d8a1764c2d03fe58beba96a87bfa11a76d8", - "sha256:874ebf3caaf55a020aeb08acead813baf5a305927a71ce88c9377970fe7ad3c2", - "sha256:9f5caf2c7436d44f3cec97c2fa7791f8a675170badbfa86e1992ca1b84c37009", - "sha256:a0c8758d01fcdfe7ae8e4b4017b13552efa7f1197dd7358dc9da0576f9d0328a", - "sha256:a4def978d9d28cda2d960c279318d46b327632686d82b4917516c36d4c274512", - "sha256:ad4f4be843dace866af5fc142509e9b9817ca0c59342fdb176ab6ad552c927f5", - "sha256:ae33dd198f772f714420c5ab698ff05ff900150486c648d29951e9c70694338e", - "sha256:b4a2b782b8a8c5522ad35c93e04d60e2ba7f7dcb9271ec8e8c3e08239be6c7b4", - "sha256:c462eb33f6abca3b34cdedbe84d761f31a60b814e173b98ede3c81bb48967c4f", - "sha256:fd135b8d35dfdcdb984828c84d695937e58cc5f49e1c854eb311c4d6aa03f4f1" + "sha256:0c4b206227a8097f05c4dbdd323c50edf81f15db3b8dc064d08c62d37e1a504d", + "sha256:194d092e6f246b906e8f70884e620e459fc54db3259e60cf69a4d66c3fda3449", + "sha256:1be7e4c9f96948003609aa6c974ae59830a6baecc5376c25c92d7d697e684c08", + "sha256:4677f594e474c91da97f489fea5b7daa17b5517190899cf213697e48d3902f5a", + "sha256:48dab84ebd4831077b150572aec802f303117c8cc5c871e182447281ebf3ac50", + "sha256:5541cada25cd173702dbd99f8e22434105456314462326f06dba3e180f203dfd", + "sha256:59f79fef100b09564bc2df42ea2d8d21a64fdcda64979c0fa3db7bdaabaf6239", + "sha256:8d859b89baf8ef7f8bc6b00aa20316483d67f0b1cbf422f5b4dc56701c8f2ffb", + "sha256:9254f4358b9b541e3441b007a0ea0764b9d056afdeafc1a5569eee1cc6c1b9ea", + "sha256:9651375199045a358eb6741df3e02a651e0330be090b3bc79f6d0de31a80ec3e", + "sha256:97bb5884f6f1cdce0099f86b907aa41c970c3c672ac8b9c8352789e103cf3156", + "sha256:9b15f3f4c0f35727d3a0fba4b770b3c4ebbb1fa907dbcc046a1d2799f3edd142", + "sha256:a2238e9d1bb71a56cd710611a1614d1194dc10a175c1e08d75e1a7bcc250d442", + "sha256:a6ae12d08c0bf9909ce12385803a543bfe99b95fe01e752536a60af2b7797c62", + "sha256:ca0a928a3ddbc5725be2dd1cf895ec0a254798915fb3a36af0964a0a4149e3db", + "sha256:cb2c7c57005a6804ab66f106ceb8482da55f5314b7fcb06551db1edae4ad1531", + "sha256:d74bb8693bf9cf75ac3b47a54d716bbb1a92648d5f781fc799347cfc95952383", + "sha256:d945239a5639b3ff35b70a88c5f2f491913eb94871780ebfabb2568bd58afc5a", + "sha256:eba7011090323c1dadf18b3b689845fd96a61ba0a1dfbd7f24b921398affc357", + "sha256:efa1909120ce98bbb3777e8b6f92237f5d5c8ea6758efea36a473e1d38f7d3e4", + "sha256:f3900e8a5de27447acbf900b4750b0ddfd7ec1ea7fbaf11dfa911141bc522af0" ], - "version": "==1.4.2" + "version": "==1.4.3" }, "mccabe": { "hashes": [ @@ -743,11 +747,11 @@ }, "typing-extensions": { "hashes": [ - "sha256:2ed632b30bb54fc3941c382decfd0ee4148f5c591651c9272473fea2c6397d95", - "sha256:b1edbbf0652660e32ae780ac9433f4231e7339c7f9a8057d0f042fcbcea49b87", - "sha256:d8179012ec2c620d3791ca6fe2bf7979d979acdbef1fca0bc56b37411db682ed" + "sha256:091ecc894d5e908ac75209f10d5b4f118fbdb2eb1ede6a63544054bb1edb41f2", + "sha256:910f4656f54de5993ad9304959ce9bb903f90aadc7c67a0bef07e678014e892d", + "sha256:cf8b63fedea4d89bab840ecbb93e75578af28f76f66c35889bd7065f5af88575" ], - "version": "==3.7.4" + "version": "==3.7.4.1" }, "urllib3": { "hashes": [ diff --git a/TODO.md b/TODO.md index 09f6205..20be658 100644 --- a/TODO.md +++ b/TODO.md @@ -2,5 +2,4 @@ - Check for `etcd3.exceptions.ConnectionFailedError` when calling some etcd operation to avoid crashing whole application -- Throw KeyError instead of returning None when some key is not found in etcd -- Specify image format when using qemu-img when creating virtual machine \ No newline at end of file +- Throw KeyError instead of returning None when some key is not found in etcd \ No newline at end of file diff --git a/api/helper.py b/api/helper.py index 67a0379..06b45b1 100755 --- a/api/helper.py +++ b/api/helper.py @@ -1,9 +1,12 @@ import binascii import requests +import random +import subprocess as sp +import ipaddress from decouple import config from pyotp import TOTP -from config import VM_POOL +from config import VM_POOL, etcd_client, IMAGE_PREFIX def check_otp(name, realm, token): @@ -47,7 +50,47 @@ def resolve_vm_name(name, owner): return None -import random + +def resolve_image_name(name, etcd_client): + """Return image uuid given its name and its store + + * If the provided name is not in correct format + i.e {store_name}:{image_name} return ValueError + * If no such image found then return KeyError + + """ + + seperator = ":" + + # Ensure, user/program passed valid name that is of type string + try: + store_name_and_image_name = name.split(seperator) + + """ + Examples, where it would work and where it would raise exception + "images:alpine" --> ["images", "alpine"] + + "images" --> ["images"] it would raise Exception as non enough value to unpack + + "images:alpine:meow" --> ["images", "alpine", "meow"] it would raise Exception + as too many values to unpack + """ + store_name, image_name = store_name_and_image_name + except Exception: + raise ValueError("Image name not in correct format i.e {store_name}:{image_name}") + + images = etcd_client.get_prefix(IMAGE_PREFIX, value_in_json=True) + + # Try to find image with name == image_name and store_name == store_name + try: + image = next(filter(lambda im: im.value['name'] == image_name \ + and im.value['store_name'] == store_name, images)) + except StopIteration: + raise KeyError("No image with name {} found.".format(name)) + else: + image_uuid = image.key.split('/')[-1] + + return image_uuid def random_bytes(num=6): return [random.randrange(256) for _ in range(num)] @@ -68,3 +111,32 @@ def generate_mac(uaa=False, multicast=False, oui=None, separator=':', byte_fmt=' else: mac[0] |= 1 << 1 # set bit 1 return separator.join(byte_fmt % b for b in mac) + +def get_ip_addr(mac_address, device): + """Return IP address of a device provided its mac address / link local address + and the device with which it is connected. + + For Example, if we call get_ip_addr(mac_address="52:54:00:12:34:56", device="br0") + the following two scenarios can happen + 1. It would return None if we can't be able to find device whose mac_address is equal + to the arg:mac_address or the mentioned arg:device does not exists or the ip address + we found is local. + 2. It would return ip_address of device whose mac_address is equal to arg:mac_address + and is connected/neighbor of arg:device + """ + try: + output = sp.check_output(['ip','-6','neigh', 'show', 'dev', device], stderr=sp.PIPE) + except sp.CalledProcessError: + return None + else: + result = [] + output = output.strip().decode("utf-8") + output = output.split("\n") + for entry in output: + entry = entry.split() + if entry: + ip = ipaddress.ip_address(entry[0]) + mac = entry[2] + if ip.is_global and mac_address == mac: + result.append(ip) + return result diff --git a/api/main.py b/api/main.py index e8082d5..1b25802 100644 --- a/api/main.py +++ b/api/main.py @@ -1,6 +1,9 @@ import json import subprocess import os + +import schemas + from uuid import uuid4 from flask import Flask, request @@ -9,7 +12,7 @@ from flask_restful import Resource, Api from ucloud_common.vm import VMStatus from ucloud_common.request import RequestEntry, RequestType -from helper import generate_mac +from helper import generate_mac, get_ip_addr from config import ( etcd_client, @@ -23,18 +26,6 @@ from config import ( VM_POOL, HOST_POOL, ) -from schemas import ( - CreateVMSchema, - VMStatusSchema, - CreateImageSchema, - VmActionSchema, - OTPSchema, - CreateHostSchema, - VmMigrationSchema, - AddSSHSchema, - RemoveSSHSchema, - GetSSHSchema -) app = Flask(__name__) api = Api(app) @@ -45,7 +36,7 @@ class CreateVM(Resource): def post(): data = request.json print(data) - validator = CreateVMSchema(data) + validator = schemas.CreateVMSchema(data) if validator.is_valid(): vm_uuid = uuid4().hex vm_key = os.path.join(VM_PREFIX, vm_uuid) @@ -63,7 +54,7 @@ class CreateVM(Resource): "specs": specs, "hostname": "", "status": "", - "image_uuid": data["image_uuid"], + "image_uuid": validator.image_uuid, "log": [], "vnc_socket": "", "mac": str(generate_mac()), @@ -85,10 +76,14 @@ class VmStatus(Resource): @staticmethod def get(): data = request.json - validator = VMStatusSchema(data) + validator = schemas.VMStatusSchema(data) if validator.is_valid(): vm = VM_POOL.get(os.path.join(VM_PREFIX, data["uuid"])) - return json.dumps(str(vm)) + vm_value = vm.value.copy() + vm_value["ip"] = list(map(str, get_ip_addr(vm.mac, "br0"))) + vm.value = vm_value + print(vm.value) + return vm.value else: return validator.get_errors(), 400 @@ -97,7 +92,7 @@ class CreateImage(Resource): @staticmethod def post(): data = request.json - validator = CreateImageSchema(data) + validator = schemas.CreateImageSchema(data) if validator.is_valid(): file_entry = etcd_client.get(os.path.join(FILE_PREFIX, data["uuid"])) file_entry_value = json.loads(file_entry.value) @@ -121,10 +116,15 @@ class CreateImage(Resource): class ListPublicImages(Resource): @staticmethod def get(): - images = etcd_client.get_prefix(IMAGE_PREFIX) + images = etcd_client.get_prefix(IMAGE_PREFIX, value_in_json=True) r = {} + r["images"] = [] for image in images: - r[image.key.split("/")[-1]] = json.loads(image.value) + image_key = "{}:{}".format(image.value["store_name"], image.value["name"]) + r["images"].append({ + "name":image_key, + "status": image.value["status"] + }) return r, 200 @@ -132,7 +132,7 @@ class VMAction(Resource): @staticmethod def post(): data = request.json - validator = VmActionSchema(data) + validator = schemas.VmActionSchema(data) if validator.is_valid(): vm_entry = VM_POOL.get(os.path.join(VM_PREFIX, data["uuid"])) @@ -182,7 +182,7 @@ class VMMigration(Resource): @staticmethod def post(): data = request.json - validator = VmMigrationSchema(data) + validator = schemas.VmMigrationSchema(data) if validator.is_valid(): vm = VM_POOL.get(data["uuid"]) @@ -203,7 +203,7 @@ class ListUserVM(Resource): @staticmethod def get(): data = request.json - validator = OTPSchema(data) + validator = schemas.OTPSchema(data) if validator.is_valid(): vms = etcd_client.get_prefix(VM_PREFIX, value_in_json=True) @@ -235,7 +235,7 @@ class ListUserFiles(Resource): @staticmethod def get(): data = request.json - validator = OTPSchema(data) + validator = schemas.OTPSchema(data) if validator.is_valid(): files = etcd_client.get_prefix(FILE_PREFIX, value_in_json=True) @@ -257,7 +257,7 @@ class CreateHost(Resource): @staticmethod def post(): data = request.json - validator = CreateHostSchema(data) + validator = schemas.CreateHostSchema(data) if validator.is_valid(): host_key = os.path.join(HOST_PREFIX, uuid4().hex) host_entry = { @@ -292,7 +292,7 @@ class GetSSHKeys(Resource): @staticmethod def get(): data = request.json - validator = GetSSHSchema(data) + validator = schemas.GetSSHSchema(data) if validator.is_valid(): if not validator.key_name.value: @@ -321,7 +321,7 @@ class AddSSHKey(Resource): @staticmethod def post(): data = request.json - validator = AddSSHSchema(data) + validator = schemas.AddSSHSchema(data) if validator.is_valid(): # {user_prefix}/{realm}/{name}/key/{key_name} @@ -342,7 +342,7 @@ class RemoveSSHKey(Resource): @staticmethod def get(): data = request.json - validator = RemoveSSHSchema(data) + validator = schemas.RemoveSSHSchema(data) if validator.is_valid(): # {user_prefix}/{realm}/{name}/key/{key_name} diff --git a/api/schemas.py b/api/schemas.py index 8f366bf..b0e49b7 100755 --- a/api/schemas.py +++ b/api/schemas.py @@ -18,6 +18,8 @@ import json import os import bitmath +import helper + from ucloud_common.host import HostPool, HostStatus from ucloud_common.vm import VmPool, VMStatus @@ -207,22 +209,24 @@ class CreateVMSchema(OTPSchema): # Fields self.specs = Field("specs", dict, data.get("specs", KeyError)) self.vm_name = Field("vm_name", str, data.get("vm_name", KeyError)) - self.image_uuid = Field("image_uuid", str, data.get("image_uuid", KeyError)) + self.image = Field("image", str, data.get("image", KeyError)) # Validation - self.image_uuid.validation = self.image_uuid_validation + self.image.validation = self.image_validation self.vm_name.validation = self.vm_name_validation self.specs.validation = self.specs_validation - fields = [self.vm_name, self.image_uuid, self.specs] + fields = [self.vm_name, self.image, self.specs] super().__init__(data=data, fields=fields) - def image_uuid_validation(self): - images = client.get_prefix(IMAGE_PREFIX) - - if self.image_uuid.value not in [i.key.split("/")[-1] for i in images]: - self.add_error("Image UUID not valid") + def image_validation(self): + try: + image_uuid = helper.resolve_image_name(self.image.value, client) + except Exception as e: + self.add_error(str(e)) + else: + self.image_uuid = image_uuid def vm_name_validation(self): if resolve_vm_name(name=self.vm_name.value, owner=self.name.value): diff --git a/filescanner/main.py b/filescanner/main.py index 84ac53e..6495886 100755 --- a/filescanner/main.py +++ b/filescanner/main.py @@ -12,6 +12,7 @@ from uuid import uuid4 def getxattr(file, attr): + """Get specified user extended attribute (arg:attr) of a file (arg:file)""" try: attr = "user." + attr value = sp.check_output(['getfattr', file, @@ -24,25 +25,39 @@ def getxattr(file, attr): return value + def setxattr(file, attr, value): + """Set specified user extended attribute (arg:attr) equal to (arg:value) + of a file (arg:file)""" + attr = "user." + attr sp.check_output(['setfattr', file, '--name', attr, '--value', str(value)]) -def sha512sum(filename): - _sum = hashlib.sha512() - buffer_size = 2**16 - - with open(filename, "rb") as f: - while True: - data = f.read(buffer_size) - if not data: - break - _sum.update(data) - - return _sum.hexdigest() +def sha512sum(file: str): + """Use sha512sum utility to compute sha512 sum of arg:file + + IF arg:file does not exists: + raise FileNotFoundError exception + ELSE IF sum successfully computer: + return computed sha512 sum + ELSE: + return None + """ + if not isinstance(file, str): raise TypeError + try: + output = sp.check_output(["sha512sum", file], stderr=sp.PIPE) + except sp.CalledProcessError as e: + error = e.stderr.decode("utf-8") + if "No such file or directory" in error: + raise FileNotFoundError from None + else: + output = output.decode("utf-8").strip() + output = output.split(" ") + return output[0] + return None try: @@ -53,57 +68,61 @@ except Exception as e: print('Make sure you have getfattr and setfattr available') exit(1) +def main(): + BASE_DIR = config("BASE_DIR") -BASE_DIR = config("BASE_DIR") + FILE_PREFIX = config("FILE_PREFIX") -FILE_PREFIX = config("FILE_PREFIX") + etcd_client = Etcd3Wrapper(host=config("ETCD_URL")) -etcd_client = Etcd3Wrapper(host=config("ETCD_URL")) + # Recursively Get All Files and Folder below BASE_DIR + files = glob.glob("{}/**".format(BASE_DIR), recursive=True) -# Recursively Get All Files and Folder below BASE_DIR -files = glob.glob("{}/**".format(BASE_DIR), recursive=True) + # Retain only Files + files = list(filter(os.path.isfile, files)) -# Retain only Files -files = list(filter(os.path.isfile, files)) + untracked_files = list( + filter(lambda f: not bool(getxattr(f, "user.utracked")), files) + ) -untracked_files = list( - filter(lambda f: not bool(getxattr(f, "user.utracked")), files) -) + tracked_files = list( + filter(lambda f: f not in untracked_files, files) + ) + for file in untracked_files: + file_id = uuid4() -tracked_files = list( - filter(lambda f: f not in untracked_files, files) -) -for file in untracked_files: - file_id = uuid4() + # Get Username + owner = pathlib.Path(file).parts[3] + # Get Creation Date of File + # Here, we are assuming that ctime is creation time + # which is mostly not true. + creation_date = time.ctime(os.stat(file).st_ctime) - # Get Username - owner = pathlib.Path(file).parts[3] - # Get Creation Date of File - # Here, we are assuming that ctime is creation time - # which is mostly not true. - creation_date = time.ctime(os.stat(file).st_ctime) + # Get File Size + size = os.path.getsize(file) - # Get File Size - size = os.path.getsize(file) + # Compute sha512 sum + sha_sum = sha512sum(file) - # Compute sha512 sum - sha_sum = sha512sum(file) + # File Path excluding base and username + file_path = pathlib.Path(file).parts[4:] + file_path = os.path.join(*file_path) - # File Path excluding base and username - file_path = pathlib.Path(file).parts[4:] - file_path = os.path.join(*file_path) + # Create Entry + entry_key = os.path.join(FILE_PREFIX, str(file_id)) + entry_value = { + "filename": file_path, + "owner": owner, + "sha512sum": sha_sum, + "creation_date": creation_date, + "size": size + } - # Create Entry - entry_key = os.path.join(FILE_PREFIX, str(file_id)) - entry_value = { - "filename": file_path, - "owner": owner, - "sha512sum": sha_sum, - "creation_date": creation_date, - "size": size - } + print("Tracking {}".format(file)) + # Insert Entry + etcd_client.put(entry_key, entry_value, value_in_json=True) + setxattr(file, "user.utracked", True) - print("Tracking {}".format(file)) - # Insert Entry - etcd_client.put(entry_key, entry_value, value_in_json=True) - setxattr(file, "user.utracked", True) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/host/config.py b/host/config.py index 4191828..03cee29 100755 --- a/host/config.py +++ b/host/config.py @@ -16,7 +16,10 @@ logging.basicConfig( datefmt="%d-%b-%y %H:%M:%S", ) -etcd_client = Etcd3Wrapper(host=config("ETCD_URL")) +etcd_wrapper_args = () +etcd_wrapper_kwargs = {"host": config("ETCD_URL")} + +etcd_client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs) HOST_PREFIX = config("HOST_PREFIX") VM_PREFIX = config("VM_PREFIX") diff --git a/host/main.py b/host/main.py index 8fe73c9..b86e88e 100755 --- a/host/main.py +++ b/host/main.py @@ -1,27 +1,32 @@ import argparse -import threading +# import threading import time import os import sys import virtualmachine +import multiprocessing as mp from ucloud_common.host import HostEntry from ucloud_common.request import RequestEntry, RequestType from config import (vm_pool, host_pool, request_pool, etcd_client, logging, running_vms, - REQUEST_PREFIX, WITHOUT_CEPH) + etcd_wrapper_args, etcd_wrapper_kwargs, + REQUEST_PREFIX, HOST_PREFIX, + WITHOUT_CEPH, VM_DIR, HostPool) +from etcd3_wrapper import Etcd3Wrapper +import etcd3 +def update_heartbeat(host): + client = Etcd3Wrapper(*etcd_wrapper_args, **etcd_wrapper_kwargs) + host_pool = HostPool(client, HOST_PREFIX) + this_host = host_pool.get(host) -def update_heartbeat(host: HostEntry): while True: - host.update_heartbeat() - host_pool.put(host) + this_host.update_heartbeat() + host_pool.put(this_host) time.sleep(10) - logging.info("Updated last heartbeat time %s", host.last_heartbeat) - - def maintenance(host): # To capture vm running according to running_vms list @@ -66,16 +71,25 @@ def main(): argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1") args = argparser.parse_args() + assert WITHOUT_CEPH and os.path.isdir(VM_DIR), ( + "You have set WITHOUT_CEPH to True. So, the vm directory mentioned" + " in .env file must exists. But, it don't." ) + + mp.set_start_method('spawn') + heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(args.hostname,)) + + host_pool = HostPool(etcd_client, HOST_PREFIX) host = host_pool.get(args.hostname) - if not host: - print("No Such Host") - exit(1) - - if WITHOUT_CEPH and not os.path.isdir("/var/vm"): - print("You have set WITHOUT_CEPH to True. So, the /var/vm must exists. But, it don't") - sys.exit(1) + assert host, "No such host" + try: + heartbeat_updating_process.start() + except Exception as e: + logging.info("No Need To Go Further. Our heartbeat updating mechanism is not working") + logging.exception(e) + exit(-1) + logging.info("%s Session Started %s", '*' * 5, '*' * 5) # It is seen that under heavy load, timeout event doesn't come @@ -86,13 +100,6 @@ def main(): # update the heart beat in a predictive manner we start Heart # beat updating mechanism in separated thread - heartbeat_updating_thread = threading.Thread(target=update_heartbeat, args=(host,)) - try: - heartbeat_updating_thread.start() - except Exception as e: - logging.info("No Need To Go Further. Our heartbeat updating mechanism is not working") - logging.exception(e) - exit(-1) for events_iterator in [ etcd_client.get_prefix(REQUEST_PREFIX, value_in_json=True), @@ -134,4 +141,6 @@ def main(): logging.info("Running VMs %s", running_vms) -main() +if __name__ == "__main__": + main() + diff --git a/imagescanner/main.py b/imagescanner/main.py index f0956ac..146e756 100755 --- a/imagescanner/main.py +++ b/imagescanner/main.py @@ -19,90 +19,96 @@ def qemu_img_type(path): qemu_img_info = json.loads(qemu_img_info.decode("utf-8")) return qemu_img_info["format"] -# If you are using WITHOUT_CEPH FLAG in .env -# then please make sure that IMAGE_DIR directory -# exists otherwise this script would fail -if WITHOUT_CEPH and not os.path.isdir(IMAGE_DIR): - print("You have set WITHOUT_CEPH to True. So," - "the {} must exists. But, it don't".format(IMAGE_DIR)) - sys.exit(1) - -try: - subprocess.check_output(['which', 'qemu-img']) -except Exception: - print("qemu-img missing") - sys.exit(1) - -# We want to get images entries that requests images to be created -images = client.get_prefix(IMAGE_PREFIX, value_in_json=True) -images_to_be_created = list(filter(lambda im: im.value['status'] == 'TO_BE_CREATED', images)) - -for image in images_to_be_created: - try: - image_uuid = image.key.split('/')[-1] - image_owner = image.value['owner'] - image_filename = image.value['filename'] - image_store_name = image.value['store_name'] - image_full_path = os.path.join(BASE_PATH, image_owner, image_filename) - - image_stores = client.get_prefix(IMAGE_STORE_PREFIX, value_in_json=True) - user_image_store = next(filter( - lambda s, store_name=image_store_name: s.value["name"] == store_name, - image_stores - )) - - image_store_pool = user_image_store.value['attributes']['pool'] - - except Exception as e: - logging.exception(e) - else: - # At least our basic data is available - - qemu_img_convert_command = ["qemu-img", "convert", "-f", "qcow2", - "-O", "raw", image_full_path, "image.raw"] - - - if WITHOUT_CEPH: - image_import_command = ["mv", "image.raw", os.path.join(IMAGE_DIR, image_uuid)] - snapshot_creation_command = ["true"] - snapshot_protect_command = ["true"] - else: - image_import_command = ["rbd", "import", "image.raw", - "{}/{}".format(image_store_pool, image_uuid)] - snapshot_creation_command = ["rbd", "snap", "create", - "{}/{}@protected".format(image_store_pool, image_uuid)] - snapshot_protect_command = ["rbd", "snap", "protect", - "{}/{}@protected".format(image_store_pool, image_uuid)] - - - # First check whether the image is qcow2 - - if qemu_img_type(image_full_path) == "qcow2": - try: - # Convert .qcow2 to .raw - subprocess.check_output(qemu_img_convert_command) - - # Import image either to ceph/filesystem - subprocess.check_output(image_import_command) - - # Create and Protect Snapshot - subprocess.check_output(snapshot_creation_command) - subprocess.check_output(snapshot_protect_command) - - except Exception as e: - logging.exception(e) - - else: - # Everything is successfully done - image.value["status"] = "CREATED" - client.put(image.key, json.dumps(image.value)) - else: - # The user provided image is either not found or of invalid format - image.value["status"] = "INVALID_IMAGE" - client.put(image.key, json.dumps(image.value)) +def main(): + # If you are using WITHOUT_CEPH FLAG in .env + # then please make sure that IMAGE_DIR directory + # exists otherwise this script would fail + if WITHOUT_CEPH and not os.path.isdir(IMAGE_DIR): + print("You have set WITHOUT_CEPH to True. So," + "the {} must exists. But, it don't".format(IMAGE_DIR)) + sys.exit(1) try: - os.remove("image.raw") + subprocess.check_output(['which', 'qemu-img']) except Exception: - pass + print("qemu-img missing") + sys.exit(1) + + # We want to get images entries that requests images to be created + images = client.get_prefix(IMAGE_PREFIX, value_in_json=True) + images_to_be_created = list(filter(lambda im: im.value['status'] == 'TO_BE_CREATED', images)) + + for image in images_to_be_created: + try: + image_uuid = image.key.split('/')[-1] + image_owner = image.value['owner'] + image_filename = image.value['filename'] + image_store_name = image.value['store_name'] + image_full_path = os.path.join(BASE_PATH, image_owner, image_filename) + + image_stores = client.get_prefix(IMAGE_STORE_PREFIX, value_in_json=True) + user_image_store = next(filter( + lambda s, store_name=image_store_name: s.value["name"] == store_name, + image_stores + )) + + image_store_pool = user_image_store.value['attributes']['pool'] + + except Exception as e: + logging.exception(e) + else: + # At least our basic data is available + + qemu_img_convert_command = ["qemu-img", "convert", "-f", "qcow2", + "-O", "raw", image_full_path, "image.raw"] + + + if WITHOUT_CEPH: + image_import_command = ["mv", "image.raw", os.path.join(IMAGE_DIR, image_uuid)] + snapshot_creation_command = ["true"] + snapshot_protect_command = ["true"] + else: + image_import_command = ["rbd", "import", "image.raw", + "{}/{}".format(image_store_pool, image_uuid)] + snapshot_creation_command = ["rbd", "snap", "create", + "{}/{}@protected".format(image_store_pool, image_uuid)] + snapshot_protect_command = ["rbd", "snap", "protect", + "{}/{}@protected".format(image_store_pool, image_uuid)] + + + # First check whether the image is qcow2 + + if qemu_img_type(image_full_path) == "qcow2": + try: + # Convert .qcow2 to .raw + subprocess.check_output(qemu_img_convert_command) + + # Import image either to ceph/filesystem + subprocess.check_output(image_import_command) + + # Create and Protect Snapshot + subprocess.check_output(snapshot_creation_command) + subprocess.check_output(snapshot_protect_command) + + except Exception as e: + logging.exception(e) + + else: + # Everything is successfully done + image.value["status"] = "CREATED" + client.put(image.key, json.dumps(image.value)) + else: + # The user provided image is either not found or of invalid format + image.value["status"] = "INVALID_IMAGE" + client.put(image.key, json.dumps(image.value)) + + + try: + os.remove("image.raw") + except Exception: + pass + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/metadata/config.py b/metadata/config.py index 0df4102..05cc113 100644 --- a/metadata/config.py +++ b/metadata/config.py @@ -15,6 +15,7 @@ logging.basicConfig( VM_PREFIX = config("VM_PREFIX") +USER_PREFIX = config("USER_PREFIX") etcd_client = Etcd3Wrapper(host=config("ETCD_URL")) diff --git a/metadata/main.py b/metadata/main.py index d9a0bb7..1c67768 100644 --- a/metadata/main.py +++ b/metadata/main.py @@ -1,6 +1,8 @@ +import os + from flask import Flask, request from flask_restful import Resource, Api -from config import etcd_client, VM_POOL +from config import etcd_client, VM_POOL, USER_PREFIX app = Flask(__name__) api = Api(app) @@ -29,7 +31,6 @@ def ipv62mac(ipv6): mac_parts[0] = '%02x' % (int(mac_parts[0], 16) ^ 2) del mac_parts[4] del mac_parts[3] - return ':'.join(mac_parts) diff --git a/scheduler/main.py b/scheduler/main.py index 4ce178f..6e0ba90 100755 --- a/scheduler/main.py +++ b/scheduler/main.py @@ -14,8 +14,9 @@ from helper import (get_suitable_host, dead_host_mitigation, dead_host_detection assign_host, NoSuitableHostFound) - def main(): + logging.info("%s SESSION STARTED %s", '*' * 5, '*' * 5) + pending_vms = [] for request_iterator in [ @@ -85,5 +86,5 @@ def main(): logging.info("No Resource Left. Emailing admin....") -logging.info("%s SESSION STARTED %s", '*' * 5, '*' * 5) -main() +if __name__ == "__main__": + main()