diff --git a/main.py b/main.py index 34b0d47..03d26d3 100644 --- a/main.py +++ b/main.py @@ -212,75 +212,74 @@ def main(): atexit.register(goodbye, host=host) - events = client.watch_prefix("/v1/vm/", timeout=10) + for events_iterator in [client.get_prefix("/v1/vm/"), + client.watch_prefix("/v1/vm/", timeout=10)]: + for e in events: + try: + e.value = json.loads(e.value) + except json.JSONDecodeError: + logging.error(f"Invalid JSON {e.value}") + continue - # events = client.get_prefix("/v1/vm/") - for e in events: - try: - e.value = json.loads(e.value) - except json.JSONDecodeError: - logging.error(f"Invalid JSON {e.value}") - continue + e_status = e.value["status"] - e_status = e.value["status"] + if e_status == "TIMEOUT": + logging.info("Timeout") + _vms = filter(lambda v: v.value["hostname"] == args.hostname, client.get_prefix("/v1/vm", value_in_json=True)) + alleged_running_vms = filter(lambda v: v.value["status"] == "RUNNING", _vms) + should_be_running = filter(lambda v: v.value["status"] == "REQUESTED_START", _vms) + for vm in alleged_running_vms: + _vm = get_vm(running_vms, vm.key) + if (_vm and not _vm.vm.is_running()) or _vm is None: + logging.debug(f"{_vm} {vm.key}") + logging.info(f"{vm.key} is not running but is said to be running") + logging.info(f"Updating {vm.key} status to KILLED") + vm.value["status"] = "KILLED" + client.put(vm.key, json.dumps(vm.value)) - if e_status == "TIMEOUT": - logging.info("Timeout") - _vms = filter(lambda v: v.value["hostname"] == args.hostname, client.get_prefix("/v1/vm", value_in_json=True)) - alleged_running_vms = filter(lambda v: v.value["status"] == "RUNNING", _vms) - should_be_running = filter(lambda v: v.value["status"] == "REQUESTED_START", _vms) - for vm in alleged_running_vms: - _vm = get_vm(running_vms, vm.key) - if (_vm and not _vm.vm.is_running()) or _vm is None: - logging.debug(f"{_vm} {vm.key}") - logging.info(f"{vm.key} is not running but is said to be running") - logging.info(f"Updating {vm.key} status to KILLED") - vm.value["status"] = "KILLED" - client.put(vm.key, json.dumps(vm.value)) + for vm in should_be_running: + vm_path = f"rbd:uservms/{vm.key.split('/')[-1]}" + start_vm(vm_path, e) - for vm in should_be_running: - vm_path = f"rbd:uservms/{vm.key.split('/')[-1]}" + host.value["status"] = "ALIVE" + host.value["last_heartbeat"] = datetime.utcnow().isoformat() + client.put(host.key, json.dumps(host.value)) + logging.info(f"Updated last heartbeat time {host.value['last_heartbeat']}") + continue + + e_hostname = e.value["hostname"] + vm_uuid = e.key.split("/")[-1] + + # If it is not for me then skip it + if e_hostname != args.hostname: + continue + + logging.debug(f"EVENT: {e}") + + if e_status == "SCHEDULED_DEPLOY": + create_vm(vm_uuid, e) + + elif e_status == "REQUESTED_SUSPEND": + suspend_vm(e) + + elif e_status == "REQUESTED_RESUME": + resume_vm(e) + + elif e_status == "REQUESTED_START": + + # DELETEME: Delete when CEPH integeration is complete + # vm_path = f"{owner_dir}/.vm/{vm_uuid}.raw" + + vm_path = f"rbd:uservms/{vm_uuid}" start_vm(vm_path, e) - host.value["status"] = "ALIVE" - host.value["last_heartbeat"] = datetime.utcnow().isoformat() - client.put(host.key, json.dumps(host.value)) - logging.info(f"Updated last heartbeat time {host.value['last_heartbeat']}") - continue + elif e_status == "REQUESTED_SHUTDOWN": + shutdown_vm(e) - e_hostname = e.value["hostname"] - vm_uuid = e.key.split("/")[-1] + elif e_status == "DELETED": + delete_vm(e) - # If it is not for me then skip it - if e_hostname != args.hostname: - continue - - logging.debug(f"EVENT: {e}") - - if e_status == "SCHEDULED_DEPLOY": - create_vm(vm_uuid, e) - - elif e_status == "REQUESTED_SUSPEND": - suspend_vm(e) - - elif e_status == "REQUESTED_RESUME": - resume_vm(e) - - elif e_status == "REQUESTED_START": - - # DELETEME: Delete when CEPH integeration is complete - # vm_path = f"{owner_dir}/.vm/{vm_uuid}.raw" - - vm_path = f"rbd:uservms/{vm_uuid}" - start_vm(vm_path, e) - - elif e_status == "REQUESTED_SHUTDOWN": - shutdown_vm(e) - - elif e_status == "DELETED": - delete_vm(e) - - logging.info(f"Running VMs {running_vms}") + logging.info(f"Running VMs {running_vms}") main()