TIMEOUT event handling shifted to function ;maintenence'
This commit is contained in:
parent
6b14a3462e
commit
5429f2d168
1 changed files with 25 additions and 38 deletions
81
main.py
81
main.py
|
@ -94,25 +94,11 @@ def create_vm(vm_uuid, e):
|
||||||
_command_to_create = f"rbd clone images/{image_uuid}@protected uservms/{vm_uuid}"
|
_command_to_create = f"rbd clone images/{image_uuid}@protected uservms/{vm_uuid}"
|
||||||
subprocess.call(_command_to_create.split(" "))
|
subprocess.call(_command_to_create.split(" "))
|
||||||
|
|
||||||
# DELETEME: Delete when CEPH integeration is complete
|
|
||||||
|
|
||||||
# os.makedirs(f"{owner_dir}/.vm", exist_ok=True)
|
|
||||||
|
|
||||||
# if not os.path.isfile(f"{owner_dir}/.vm/{vm_uuid}.raw"):
|
|
||||||
# shutil.copy(
|
|
||||||
# f"/var/vm/{image_uuid}.raw", f"{owner_dir}/.vm/{vm_uuid}.raw"
|
|
||||||
# )
|
|
||||||
|
|
||||||
e.value["status"] = "REQUESTED_START"
|
e.value["status"] = "REQUESTED_START"
|
||||||
client.put(e.key, json.dumps(e.value))
|
client.put(e.key, json.dumps(e.value))
|
||||||
|
|
||||||
|
|
||||||
def start_vm(vm_path, e):
|
def start_vm(vm_path, e):
|
||||||
# FIXME: Assume for the moment that the image exists
|
|
||||||
# Use librados to list files that exists in
|
|
||||||
# uservms pool then checkwhether the e.key.split("/").pop()
|
|
||||||
# exists in rbd_ls(uservms_pool)
|
|
||||||
|
|
||||||
if not vm_path.split("/")[-1] in RBD.ls("uservms"):
|
if not vm_path.split("/")[-1] in RBD.ls("uservms"):
|
||||||
logging.info(f"Image file of vm {e.key} does not exists")
|
logging.info(f"Image file of vm {e.key} does not exists")
|
||||||
logging.info(f"Setting vm {e.key} status to DELETED")
|
logging.info(f"Setting vm {e.key} status to DELETED")
|
||||||
|
@ -216,35 +202,8 @@ def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
|
||||||
return next((vm for vm in vm_list if vm.key == vm_key), None)
|
return next((vm for vm in vm_list if vm.key == vm_key), None)
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def maintenence(e, host):
|
||||||
argparser = argparse.ArgumentParser()
|
_vms = filter(lambda v: v.value["hostname"] == host.key, client.get_prefix("/v1/vm", value_in_json=True))
|
||||||
argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1")
|
|
||||||
args = argparser.parse_args()
|
|
||||||
|
|
||||||
host = client.get(args.hostname, value_in_json=True)
|
|
||||||
if not host:
|
|
||||||
print("No Such Host")
|
|
||||||
exit(1)
|
|
||||||
|
|
||||||
host.value["status"] = "ALIVE"
|
|
||||||
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
|
|
||||||
|
|
||||||
atexit.register(goodbye, host=host)
|
|
||||||
|
|
||||||
for events_iterator in [client.get_prefix("/v1/vm/"),
|
|
||||||
client.watch_prefix("/v1/vm/", timeout=10)]:
|
|
||||||
for e in events_iterator:
|
|
||||||
try:
|
|
||||||
e.value = json.loads(e.value)
|
|
||||||
except json.JSONDecodeError:
|
|
||||||
logging.error(f"Invalid JSON {e.value}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
e_status = e.value["status"]
|
|
||||||
|
|
||||||
if e_status == "TIMEOUT":
|
|
||||||
logging.info("Timeout")
|
|
||||||
_vms = filter(lambda v: v.value["hostname"] == args.hostname, client.get_prefix("/v1/vm", value_in_json=True))
|
|
||||||
alleged_running_vms = filter(lambda v: v.value["status"] == "RUNNING", _vms)
|
alleged_running_vms = filter(lambda v: v.value["status"] == "RUNNING", _vms)
|
||||||
should_be_running = filter(lambda v: v.value["status"] == "REQUESTED_START", _vms)
|
should_be_running = filter(lambda v: v.value["status"] == "REQUESTED_START", _vms)
|
||||||
for vm in alleged_running_vms:
|
for vm in alleged_running_vms:
|
||||||
|
@ -264,6 +223,38 @@ def main():
|
||||||
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
|
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
|
||||||
client.put(host.key, json.dumps(host.value))
|
client.put(host.key, json.dumps(host.value))
|
||||||
logging.info(f"Updated last heartbeat time {host.value['last_heartbeat']}")
|
logging.info(f"Updated last heartbeat time {host.value['last_heartbeat']}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
argparser = argparse.ArgumentParser()
|
||||||
|
argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1")
|
||||||
|
args = argparser.parse_args()
|
||||||
|
|
||||||
|
host = client.get(args.hostname, value_in_json=True)
|
||||||
|
if not host:
|
||||||
|
print("No Such Host")
|
||||||
|
exit(1)
|
||||||
|
|
||||||
|
host.value["status"] = "ALIVE"
|
||||||
|
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
|
||||||
|
client.put(host.key, host.value, value_in_json=True)
|
||||||
|
|
||||||
|
atexit.register(goodbye, host=host)
|
||||||
|
|
||||||
|
for events_iterator in [client.get_prefix("/v1/vm/"),
|
||||||
|
client.watch_prefix("/v1/vm/", timeout=10)]:
|
||||||
|
for e in events_iterator:
|
||||||
|
try:
|
||||||
|
e.value = json.loads(e.value)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
logging.error(f"Invalid JSON {e.value}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
e_status = e.value["status"]
|
||||||
|
|
||||||
|
if e_status == "TIMEOUT":
|
||||||
|
logging.info("Timeout")
|
||||||
|
maintenence(e, host)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
e_hostname = e.value["hostname"]
|
e_hostname = e.value["hostname"]
|
||||||
|
@ -285,10 +276,6 @@ def main():
|
||||||
resume_vm(e)
|
resume_vm(e)
|
||||||
|
|
||||||
elif e_status == "REQUESTED_START":
|
elif e_status == "REQUESTED_START":
|
||||||
|
|
||||||
# DELETEME: Delete when CEPH integeration is complete
|
|
||||||
# vm_path = f"{owner_dir}/.vm/{vm_uuid}.raw"
|
|
||||||
|
|
||||||
vm_path = f"rbd:uservms/{vm_uuid}"
|
vm_path = f"rbd:uservms/{vm_uuid}"
|
||||||
start_vm(vm_path, e)
|
start_vm(vm_path, e)
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue