Remove pending vm handling mechanism from scheduler + fixed issue that update VM's hostname even on migration failure
This commit is contained in:
		
					parent
					
						
							
								d2d6c6bf5c
							
						
					
				
			
			
				commit
				
					
						d13a4bcc37
					
				
			
		
					 5 changed files with 40 additions and 94 deletions
				
			
		|  | @ -24,7 +24,7 @@ sys.excepthook = exception_hook | ||||||
| if __name__ == '__main__': | if __name__ == '__main__': | ||||||
|     # Setting up root logger |     # Setting up root logger | ||||||
|     logger = logging.getLogger() |     logger = logging.getLogger() | ||||||
|     logger.setLevel(logging.INFO) |     logger.setLevel(logging.DEBUG) | ||||||
| 
 | 
 | ||||||
|     syslog_handler = SysLogHandler(address='/dev/log') |     syslog_handler = SysLogHandler(address='/dev/log') | ||||||
|     syslog_handler.setLevel(logging.DEBUG) |     syslog_handler.setLevel(logging.DEBUG) | ||||||
|  |  | ||||||
|  | @ -567,7 +567,7 @@ def main(): | ||||||
|             settings["etcd"]["image_store_prefix"], value_in_json=True |             settings["etcd"]["image_store_prefix"], value_in_json=True | ||||||
|         ) |         ) | ||||||
|     ) |     ) | ||||||
|     if len(image_stores) == 0: |     if not image_stores: | ||||||
|         data = { |         data = { | ||||||
|             "is_public": True, |             "is_public": True, | ||||||
|             "type": "ceph", |             "type": "ceph", | ||||||
|  | @ -583,7 +583,7 @@ def main(): | ||||||
|             json.dumps(data), |             json.dumps(data), | ||||||
|         ) |         ) | ||||||
| 
 | 
 | ||||||
|     app.run(host="::", debug=True) |     app.run(host="::", debug=False) | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|  |  | ||||||
|  | @ -28,10 +28,8 @@ def maintenance(host): | ||||||
|     vmm = VMM() |     vmm = VMM() | ||||||
|     running_vms = vmm.discover() |     running_vms = vmm.discover() | ||||||
|     for vm_uuid in running_vms: |     for vm_uuid in running_vms: | ||||||
|         if ( |         if vmm.is_running(vm_uuid) and vmm.get_status(vm_uuid) == "running": | ||||||
|             vmm.is_running(vm_uuid) |             logger.debug('VM {} is running on {}'.format(vm_uuid, host)) | ||||||
|             and vmm.get_status(vm_uuid) == "running" |  | ||||||
|         ): |  | ||||||
|             vm = shared.vm_pool.get( |             vm = shared.vm_pool.get( | ||||||
|                 join_path(settings["etcd"]["vm_prefix"], vm_uuid) |                 join_path(settings["etcd"]["vm_prefix"], vm_uuid) | ||||||
|             ) |             ) | ||||||
|  | @ -43,32 +41,18 @@ def maintenance(host): | ||||||
| 
 | 
 | ||||||
| def main(hostname): | def main(hostname): | ||||||
|     host_pool = shared.host_pool |     host_pool = shared.host_pool | ||||||
|     host = next( |     host = next(filter(lambda h: h.hostname == hostname, host_pool.hosts), None) | ||||||
|         filter(lambda h: h.hostname == hostname, host_pool.hosts), None |     assert host is not None, "No such host with name = {}".format(hostname) | ||||||
|     ) |  | ||||||
|     assert host is not None, "No such host with name = {}".format( |  | ||||||
|         hostname |  | ||||||
|     ) |  | ||||||
| 
 | 
 | ||||||
|     try: |     try: | ||||||
|         heartbeat_updating_process = mp.Process( |         heartbeat_updating_process = mp.Process(target=update_heartbeat, args=(hostname,)) | ||||||
|             target=update_heartbeat, args=(hostname,) |  | ||||||
|         ) |  | ||||||
|         heartbeat_updating_process.start() |         heartbeat_updating_process.start() | ||||||
|     except Exception as e: |     except Exception as e: | ||||||
|         raise Exception( |         raise Exception("ucloud-host heartbeat updating mechanism is not working") from e | ||||||
|             "ucloud-host heartbeat updating mechanism is not working" |  | ||||||
|         ) from e |  | ||||||
| 
 | 
 | ||||||
|     for events_iterator in [ |     for events_iterator in [ | ||||||
|         shared.etcd_client.get_prefix( |         shared.etcd_client.get_prefix(settings["etcd"]["request_prefix"], value_in_json=True), | ||||||
|             settings["etcd"]["request_prefix"], value_in_json=True |         shared.etcd_client.watch_prefix(settings["etcd"]["request_prefix"], timeout=10, value_in_json=True) | ||||||
|         ), |  | ||||||
|         shared.etcd_client.watch_prefix( |  | ||||||
|             settings["etcd"]["request_prefix"], |  | ||||||
|             timeout=10, |  | ||||||
|             value_in_json=True, |  | ||||||
|         ), |  | ||||||
|     ]: |     ]: | ||||||
|         for request_event in events_iterator: |         for request_event in events_iterator: | ||||||
|             request_event = RequestEntry(request_event) |             request_event = RequestEntry(request_event) | ||||||
|  | @ -76,52 +60,35 @@ def main(hostname): | ||||||
|             if request_event.type == "TIMEOUT": |             if request_event.type == "TIMEOUT": | ||||||
|                 maintenance(host.key) |                 maintenance(host.key) | ||||||
| 
 | 
 | ||||||
|             if request_event.hostname == host.key: |             elif request_event.hostname == host.key: | ||||||
|                 logger.debug("VM Request: %s", request_event) |                 logger.debug("VM Request: %s on Host %s", request_event, host.hostname) | ||||||
| 
 |                 shared.request_pool.client.client.delete(request_event.key) | ||||||
|                 shared.request_pool.client.client.delete( |  | ||||||
|                     request_event.key |  | ||||||
|                 ) |  | ||||||
|                 vm_entry = shared.etcd_client.get( |                 vm_entry = shared.etcd_client.get( | ||||||
|                     join_path( |                     join_path(settings["etcd"]["vm_prefix"], request_event.uuid) | ||||||
|                         settings["etcd"]["vm_prefix"], |  | ||||||
|                         request_event.uuid, |  | ||||||
|                     ) |  | ||||||
|                 ) |                 ) | ||||||
|  |                 logger.debug("VM hostname: {}".format(vm_entry.value)) | ||||||
|  |                 vm = virtualmachine.VM(vm_entry) | ||||||
|  |                 if request_event.type == RequestType.StartVM: | ||||||
|  |                     vm.start() | ||||||
| 
 | 
 | ||||||
|                 if vm_entry: |                 elif request_event.type == RequestType.StopVM: | ||||||
|                     vm = virtualmachine.VM(vm_entry) |                     vm.stop() | ||||||
|                     if request_event.type == RequestType.StartVM: |  | ||||||
|                         vm.start() |  | ||||||
| 
 | 
 | ||||||
|                     elif request_event.type == RequestType.StopVM: |                 elif request_event.type == RequestType.DeleteVM: | ||||||
|                         vm.stop() |                     vm.delete() | ||||||
| 
 | 
 | ||||||
|                     elif request_event.type == RequestType.DeleteVM: |                 elif request_event.type == RequestType.InitVMMigration: | ||||||
|                         vm.delete() |                     vm.start(destination_host_key=host.key) | ||||||
| 
 | 
 | ||||||
|                     elif ( |                 elif request_event.type == RequestType.TransferVM: | ||||||
|                         request_event.type |                     destination_host = host_pool.get(request_event.destination_host_key) | ||||||
|                         == RequestType.InitVMMigration |                     if destination_host: | ||||||
|                     ): |                         vm.migrate( | ||||||
|                         vm.start(destination_host_key=host.key) |                             destination_host=destination_host.hostname, | ||||||
| 
 |                             destination_sock_path=request_event.destination_sock_path, | ||||||
|                     elif request_event.type == RequestType.TransferVM: |  | ||||||
|                         host = host_pool.get( |  | ||||||
|                             request_event.destination_host_key |  | ||||||
|                         ) |                         ) | ||||||
|                         if host: |                     else: | ||||||
|                             vm.migrate( |                         logger.error("Host %s not found!", request_event.destination_host_key) | ||||||
|                                 destination_host=host.hostname, |  | ||||||
|                                 destination_sock_path=request_event.destination_sock_path, |  | ||||||
|                             ) |  | ||||||
|                         else: |  | ||||||
|                             logger.error( |  | ||||||
|                                 "Host %s not found!", |  | ||||||
|                                 request_event.destination_host_key, |  | ||||||
|                             ) |  | ||||||
|                 else: |  | ||||||
|                     logger.info("VM Entry missing") |  | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| if __name__ == "__main__": | if __name__ == "__main__": | ||||||
|  |  | ||||||
|  | @ -38,13 +38,14 @@ class VM: | ||||||
|         else: |         else: | ||||||
|             self.uuid = vm_entry.key.split("/")[-1] |             self.uuid = vm_entry.key.split("/")[-1] | ||||||
|             self.host_key = self.vm["hostname"] |             self.host_key = self.vm["hostname"] | ||||||
|  |         logger.debug('VM Hostname {}'.format(self.host_key)) | ||||||
| 
 | 
 | ||||||
|     def get_qemu_args(self): |     def get_qemu_args(self): | ||||||
|         command = ( |         command = ( | ||||||
|             "-name {owner}_{name}" |             "-drive file={file},format=raw,if=virtio,cache=none" | ||||||
|             " -drive file={file},format=raw,if=virtio,cache=none" |  | ||||||
|             " -device virtio-rng-pci" |             " -device virtio-rng-pci" | ||||||
|             " -m {memory} -smp cores={cores},threads={threads}" |             " -m {memory} -smp cores={cores},threads={threads}" | ||||||
|  |             " -name {owner}_{name}" | ||||||
|         ).format( |         ).format( | ||||||
|             owner=self.vm["owner"], |             owner=self.vm["owner"], | ||||||
|             name=self.vm["name"], |             name=self.vm["name"], | ||||||
|  | @ -67,11 +68,7 @@ class VM: | ||||||
|         except Exception as err: |         except Exception as err: | ||||||
|             declare_stopped(self.vm) |             declare_stopped(self.vm) | ||||||
|             self.vm["log"].append("Cannot Setup Network Properly") |             self.vm["log"].append("Cannot Setup Network Properly") | ||||||
|             logger.error( |             logger.error("Cannot Setup Network Properly for vm %s", self.uuid, exc_info=err) | ||||||
|                 "Cannot Setup Network Properly for vm %s", |  | ||||||
|                 self.uuid, |  | ||||||
|                 exc_info=err, |  | ||||||
|             ) |  | ||||||
|         else: |         else: | ||||||
|             self.vmm.start( |             self.vmm.start( | ||||||
|                 uuid=self.uuid, |                 uuid=self.uuid, | ||||||
|  | @ -81,6 +78,7 @@ class VM: | ||||||
|             ) |             ) | ||||||
| 
 | 
 | ||||||
|             status = self.vmm.get_status(self.uuid) |             status = self.vmm.get_status(self.uuid) | ||||||
|  |             logger.debug('VM {} status is {}'.format(self.uuid, status)) | ||||||
|             if status == "running": |             if status == "running": | ||||||
|                 self.vm["status"] = VMStatus.running |                 self.vm["status"] = VMStatus.running | ||||||
|                 self.vm["vnc_socket"] = self.vmm.get_vnc(self.uuid) |                 self.vm["vnc_socket"] = self.vmm.get_vnc(self.uuid) | ||||||
|  | @ -99,7 +97,7 @@ class VM: | ||||||
|             else: |             else: | ||||||
|                 self.stop() |                 self.stop() | ||||||
|                 declare_stopped(self.vm) |                 declare_stopped(self.vm) | ||||||
| 
 |         logger.debug('VM {} has hostname {}'.format(self.uuid, self.vm['hostname'])) | ||||||
|         self.sync() |         self.sync() | ||||||
| 
 | 
 | ||||||
|     def stop(self): |     def stop(self): | ||||||
|  |  | ||||||
|  | @ -17,8 +17,6 @@ from . import logger | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
| def main(): | def main(): | ||||||
|     pending_vms = [] |  | ||||||
| 
 |  | ||||||
|     for request_iterator in [ |     for request_iterator in [ | ||||||
|         shared.etcd_client.get_prefix( |         shared.etcd_client.get_prefix( | ||||||
|             settings["etcd"]["request_prefix"], value_in_json=True |             settings["etcd"]["request_prefix"], value_in_json=True | ||||||
|  | @ -44,24 +42,8 @@ def main(): | ||||||
|                     logger.debug("Dead hosts: %s", dead_hosts) |                     logger.debug("Dead hosts: %s", dead_hosts) | ||||||
|                     dead_host_mitigation(dead_hosts) |                     dead_host_mitigation(dead_hosts) | ||||||
| 
 | 
 | ||||||
|                 # If there are VMs that weren't assigned a host |  | ||||||
|                 # because there wasn't a host available which |  | ||||||
|                 # meets requirement of that VM then we would |  | ||||||
|                 # create a new ScheduleVM request for that VM |  | ||||||
|                 # on our behalf. |  | ||||||
|                 while pending_vms: |  | ||||||
|                     pending_vm_entry = pending_vms.pop() |  | ||||||
|                     r = RequestEntry.from_scratch( |  | ||||||
|                         type="ScheduleVM", |  | ||||||
|                         uuid=pending_vm_entry.uuid, |  | ||||||
|                         hostname=pending_vm_entry.hostname, |  | ||||||
|                         request_prefix=settings["etcd"][ |  | ||||||
|                             "request_prefix" |  | ||||||
|                         ], |  | ||||||
|                     ) |  | ||||||
|                     shared.request_pool.put(r) |  | ||||||
| 
 |  | ||||||
|             elif request_entry.type == RequestType.ScheduleVM: |             elif request_entry.type == RequestType.ScheduleVM: | ||||||
|  |                 print(request_event.value) | ||||||
|                 logger.debug( |                 logger.debug( | ||||||
|                     "%s, %s", request_entry.key, request_entry.value |                     "%s, %s", request_entry.key, request_entry.value | ||||||
|                 ) |                 ) | ||||||
|  | @ -86,7 +68,6 @@ def main(): | ||||||
|                     ) |                     ) | ||||||
|                     shared.vm_pool.put(vm_entry) |                     shared.vm_pool.put(vm_entry) | ||||||
| 
 | 
 | ||||||
|                     pending_vms.append(vm_entry) |  | ||||||
|                     logger.info("No Resource Left. Emailing admin....") |                     logger.info("No Resource Left. Emailing admin....") | ||||||
| 
 | 
 | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue