156 lines
4.6 KiB
Python
156 lines
4.6 KiB
Python
import unittest
|
|
import sys
|
|
import etcd3
|
|
import json
|
|
|
|
sys.path.insert(0, "../")
|
|
from main import accumulated_specs, remaining_resources, VmPool
|
|
|
|
|
|
class TestFunctions(unittest.TestCase):
|
|
def setUp(self):
|
|
self.client = etcd3.client()
|
|
self.host_prefix = "/v1/host"
|
|
self.vm_prefix = "/v1/vm"
|
|
|
|
# These deletion could also be in
|
|
# tearDown() but it is more appropriate here
|
|
# as it enable us to check the ETCD store
|
|
# even after test is run
|
|
self.client.delete_prefix(self.host_prefix)
|
|
self.client.delete_prefix(self.vm_prefix)
|
|
|
|
self.create_hosts()
|
|
self.create_vms()
|
|
|
|
def create_hosts(self):
|
|
host1 = """{
|
|
"cpu": 32,
|
|
"ram": 128,
|
|
"hdd": 1024,
|
|
"sdd": 0
|
|
}"""
|
|
|
|
host2 = """{
|
|
"cpu": 16,
|
|
"ram": 64,
|
|
"hdd": 512,
|
|
"sdd": 0
|
|
}"""
|
|
|
|
host3 = """{
|
|
"cpu": 16,
|
|
"ram": 32,
|
|
"hdd": 256,
|
|
"sdd": 256
|
|
}"""
|
|
with self.client.lock("lock"):
|
|
self.client.put(f"{self.host_prefix}/1", host1)
|
|
self.client.put(f"{self.host_prefix}/2", host2)
|
|
self.client.put(f"{self.host_prefix}/3", host3)
|
|
|
|
def create_vms(self):
|
|
vm1 = json.dumps(
|
|
{
|
|
"owner": "meow",
|
|
"specs": {"cpu": 4, "ram": 8, "hdd": 100, "sdd": 256},
|
|
"hostname": "",
|
|
"status": "REQUESTED_NEW",
|
|
}
|
|
)
|
|
vm2 = json.dumps(
|
|
{
|
|
"owner": "meow",
|
|
"specs": {"cpu": 16, "ram": 64, "hdd": 512, "sdd": 0},
|
|
"hostname": "",
|
|
"status": "REQUESTED_NEW",
|
|
}
|
|
)
|
|
vm3 = json.dumps(
|
|
{
|
|
"owner": "meow",
|
|
"specs": {"cpu": 16, "ram": 32, "hdd": 128, "sdd": 0},
|
|
"hostname": "",
|
|
"status": "REQUESTED_NEW",
|
|
}
|
|
)
|
|
vm4 = json.dumps(
|
|
{
|
|
"owner": "meow",
|
|
"specs": {"cpu": 16, "ram": 64, "hdd": 512, "sdd": 0},
|
|
"hostname": "",
|
|
"status": "REQUESTED_NEW",
|
|
}
|
|
)
|
|
vm5 = json.dumps(
|
|
{
|
|
"owner": "meow",
|
|
"specs": {"cpu": 2, "ram": 2, "hdd": 10, "sdd": 0},
|
|
"hostname": "",
|
|
"status": "REQUESTED_NEW",
|
|
}
|
|
)
|
|
vm6 = json.dumps(
|
|
{
|
|
"owner": "meow",
|
|
"specs": {"cpu": 10, "ram": 22, "hdd": 146, "sdd": 0},
|
|
"hostname": "",
|
|
"status": "REQUESTED_NEW",
|
|
}
|
|
)
|
|
vm7 = json.dumps(
|
|
{
|
|
"owner": "meow",
|
|
"specs": {"cpu": 10, "ram": 22, "hdd": 146, "sdd": 0},
|
|
"hostname": "",
|
|
"status": "REQUESTED_NEW",
|
|
}
|
|
)
|
|
self.client.put(f"{self.vm_prefix}/1", vm1)
|
|
self.client.put(f"{self.vm_prefix}/2", vm2)
|
|
self.client.put(f"{self.vm_prefix}/3", vm3)
|
|
self.client.put(f"{self.vm_prefix}/4", vm4)
|
|
self.client.put(f"{self.vm_prefix}/5", vm5)
|
|
self.client.put(f"{self.vm_prefix}/6", vm6)
|
|
self.client.put(f"{self.vm_prefix}/7", vm7)
|
|
|
|
def test_accumulated_specs(self):
|
|
vms = [
|
|
{"ssd": 10, "cpu": 4, "ram": 8},
|
|
{"hdd": 10, "cpu": 4, "ram": 8},
|
|
{"cpu": 8, "ram": 32},
|
|
]
|
|
self.assertEqual(
|
|
accumulated_specs(vms),
|
|
{"ssd": 10, "cpu": 16, "ram": 48, "hdd": 10},
|
|
)
|
|
|
|
def test_remaining_resources(self):
|
|
host_specs = {"ssd": 10, "cpu": 16, "ram": 48, "hdd": 10}
|
|
vms_specs = {"ssd": 10, "cpu": 32, "ram": 12, "hdd": 0}
|
|
resultant_specs = {"ssd": 0, "cpu": -16, "ram": 36, "hdd": 10}
|
|
self.assertEqual(
|
|
remaining_resources(host_specs, vms_specs), resultant_specs
|
|
)
|
|
|
|
def test_vmpool(self):
|
|
vm_pool = VmPool(self.client, self.vm_prefix)
|
|
print(self.client.get(f"{self.vm_prefix}/1")[1].key)
|
|
self.assertEqual(
|
|
vm_pool.by_host(vm_pool.vms, f"{self.host_prefix}/1"),
|
|
[
|
|
(
|
|
f"{self.vm_prefix}/1",
|
|
{
|
|
"owner": "meow",
|
|
"specs": {"cpu": 4, "ram": 8, "hdd": 100, "sdd": 256},
|
|
"hostname": f"{self.host_prefix}/1",
|
|
"status": "SCHEDULED_DEPLOY",
|
|
},
|
|
)
|
|
],
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|