import json import multiprocessing import sys import unittest from datetime import datetime from os.path import dirname BASE_DIR = dirname(dirname(__file__)) sys.path.insert(0, BASE_DIR) from main import ( accumulated_specs, remaining_resources, VmPool, main, ) from ucloud.config import etcd_client class TestFunctions(unittest.TestCase): @classmethod def setUpClass(cls): cls.client = etcd_client cls.host_prefix = "/test/host" cls.vm_prefix = "/test/vm" # These deletion could also be in # tearDown() but it is more appropriate here # as it enable us to check the ETCD store # even after test is run cls.client.client.delete_prefix(cls.host_prefix) cls.client.client.delete_prefix(cls.vm_prefix) cls.create_hosts(cls) cls.create_vms(cls) cls.p = multiprocessing.Process( target=main, args=[cls.vm_prefix, cls.host_prefix] ) cls.p.start() @classmethod def tearDownClass(cls): cls.p.terminate() def create_hosts(self): host1 = { "cpu": 32, "ram": 128, "hdd": 1024, "sdd": 0, "status": "ALIVE", "last_heartbeat": datetime.utcnow().isoformat(), } host2 = { "cpu": 16, "ram": 64, "hdd": 512, "sdd": 0, "status": "ALIVE", "last_heartbeat": datetime.utcnow().isoformat(), } host3 = { "cpu": 16, "ram": 32, "hdd": 256, "sdd": 256, "status": "ALIVE", "last_heartbeat": datetime.utcnow().isoformat(), } with self.client.client.lock("lock"): self.client.put( f"{self.host_prefix}/1", host1, value_in_json=True ) self.client.put( f"{self.host_prefix}/2", host2, value_in_json=True ) self.client.put( f"{self.host_prefix}/3", host3, value_in_json=True ) def create_vms(self): vm1 = json.dumps( { "owner": "meow", "specs": {"cpu": 4, "ram": 8, "hdd": 100, "sdd": 256}, "hostname": "", "status": "REQUESTED_NEW", } ) vm2 = json.dumps( { "owner": "meow", "specs": {"cpu": 16, "ram": 64, "hdd": 512, "sdd": 0}, "hostname": "", "status": "REQUESTED_NEW", } ) vm3 = json.dumps( { "owner": "meow", "specs": {"cpu": 16, "ram": 32, "hdd": 128, "sdd": 0}, "hostname": "", "status": "REQUESTED_NEW", } ) vm4 = json.dumps( { "owner": "meow", "specs": {"cpu": 16, "ram": 64, "hdd": 512, "sdd": 0}, "hostname": "", "status": "REQUESTED_NEW", } ) vm5 = json.dumps( { "owner": "meow", "specs": {"cpu": 2, "ram": 2, "hdd": 10, "sdd": 0}, "hostname": "", "status": "REQUESTED_NEW", } ) vm6 = json.dumps( { "owner": "meow", "specs": {"cpu": 10, "ram": 22, "hdd": 146, "sdd": 0}, "hostname": "", "status": "REQUESTED_NEW", } ) vm7 = json.dumps( { "owner": "meow", "specs": {"cpu": 10, "ram": 22, "hdd": 146, "sdd": 0}, "hostname": "", "status": "REQUESTED_NEW", } ) self.client.put(f"{self.vm_prefix}/1", vm1) self.client.put(f"{self.vm_prefix}/2", vm2) self.client.put(f"{self.vm_prefix}/3", vm3) self.client.put(f"{self.vm_prefix}/4", vm4) self.client.put(f"{self.vm_prefix}/5", vm5) self.client.put(f"{self.vm_prefix}/6", vm6) self.client.put(f"{self.vm_prefix}/7", vm7) def test_accumulated_specs(self): vms = [ {"ssd": 10, "cpu": 4, "ram": 8}, {"hdd": 10, "cpu": 4, "ram": 8}, {"cpu": 8, "ram": 32}, ] self.assertEqual( accumulated_specs(vms), {"ssd": 10, "cpu": 16, "ram": 48, "hdd": 10}, ) def test_remaining_resources(self): host_specs = {"ssd": 10, "cpu": 16, "ram": 48, "hdd": 10} vms_specs = {"ssd": 10, "cpu": 32, "ram": 12, "hdd": 0} resultant_specs = {"ssd": 0, "cpu": -16, "ram": 36, "hdd": 10} self.assertEqual( remaining_resources(host_specs, vms_specs), resultant_specs ) def test_vmpool(self): self.p.join(1) vm_pool = VmPool(self.client, self.vm_prefix) # vm_pool by host actual = vm_pool.by_host(vm_pool.vms, f"{self.host_prefix}/3") ground_truth = [ ( f"{self.vm_prefix}/1", { "owner": "meow", "specs": { "cpu": 4, "ram": 8, "hdd": 100, "sdd": 256, }, "hostname": f"{self.host_prefix}/3", "status": "SCHEDULED_DEPLOY", }, ) ] self.assertEqual(actual[0], ground_truth[0]) # vm_pool by status actual = vm_pool.by_status(vm_pool.vms, "REQUESTED_NEW") ground_truth = [ ( f"{self.vm_prefix}/7", { "owner": "meow", "specs": { "cpu": 10, "ram": 22, "hdd": 146, "sdd": 0, }, "hostname": "", "status": "REQUESTED_NEW", }, ) ] self.assertEqual(actual[0], ground_truth[0]) # vm_pool by except status actual = vm_pool.except_status(vm_pool.vms, "SCHEDULED_DEPLOY") ground_truth = [ ( f"{self.vm_prefix}/7", { "owner": "meow", "specs": { "cpu": 10, "ram": 22, "hdd": 146, "sdd": 0, }, "hostname": "", "status": "REQUESTED_NEW", }, ) ] self.assertEqual(actual[0], ground_truth[0]) if __name__ == "__main__": unittest.main()