working till migration

This commit is contained in:
ahmadbilalkhalid 2019-07-27 14:05:35 +05:00
parent b8a44eca69
commit 1dddc2197b
10 changed files with 10857 additions and 3407 deletions

2
.gitignore vendored
View File

@ -6,3 +6,5 @@ venv/
log.txt log.txt
vm_socklog/ vm_socklog/
etcd3_wrapper/
ucloud_common/

3
.gitmodules vendored
View File

@ -1,3 +0,0 @@
[submodule "etcd3_wrapper"]
path = etcd3_wrapper
url = https://code.ungleich.ch/ahmedbilal/etcd3_wrapper

View File

@ -5,12 +5,13 @@ verify_ssl = true
[dev-packages] [dev-packages]
bandit = "*" bandit = "*"
pylama = "*"
[packages] [packages]
python-decouple = "*" python-decouple = "*"
etcd3 = "*"
cython = "*" cython = "*"
pylint = "*" pylint = "*"
python-etcd3 = {editable = true,git = "https://github.com/kragniz/python-etcd3"}
[requires] [requires]
python_version = "3.7" python_version = "3.7"

138
Pipfile.lock generated
View File

@ -1,7 +1,7 @@
{ {
"_meta": { "_meta": {
"hash": { "hash": {
"sha256": "1473a03ca16049e1c2200b57b1403db764cd05a8c3a94d1d008b109127f72f87" "sha256": "6f0726e9d014ad330ab36d11b44a15c59316234e343697e46ea2a10ab0997a90"
}, },
"pipfile-spec": 6, "pipfile-spec": 6,
"requires": { "requires": {
@ -25,44 +25,37 @@
}, },
"cython": { "cython": {
"hashes": [ "hashes": [
"sha256:04ebf16df9406d3279a2489c3327803c782d9e17637d525bfb44ecf5ec65850f", "sha256:07efba7b32c082c519b75e3b03821c2f32848e2b3e9986c784bbd8ffaf0666d7",
"sha256:1486ec88d1c73dea3846a5640054018b002608e04a791ccbd2082a47bce4440a", "sha256:08db41daf18fabf7b7a85e39aa26954f6246994540043194af026c0df65a4942",
"sha256:20da832a5e9a8e93d1e1eb64650258956723940968eb585506531719b55b804f", "sha256:19bbe3caf885a1d2e2c30eacc10d1e45dbbefb156493fe1d5d1adc1668cc1269",
"sha256:2464688b523d7a133b52cf1343c1c595b92fc6554af1015f74b9e49951e992d4", "sha256:1c574f2f2ba760b82b2bcf6262e77e75589247dc5ef796a3ff1b2213e50ee452",
"sha256:27827b68a8359e9ab6bf683c68d8ee79863a0c94a577acf56aa02cc302e16f51", "sha256:1dfe672c686e34598bdbaa93c3b30acb3720ae9258232a4f68ba04ee9969063d",
"sha256:27deeeeca0fd8933af07923e809c8fed0763d150a4fdd4082932a33b8c874ed6", "sha256:283faea84e6c4e54c3f5c8ff89aa2b6c1c3a813aad4f6d48ed3b9cc9043ef9f9",
"sha256:31f4da785d5e09deb852ea59795a629c5befb6040929e7880c6f63e6668246ce", "sha256:2a145888d0942e7c36e86a7b7c7e2923cb9f7055805a3b72dcb137e3efdb0979",
"sha256:4828cf8fa638c35139e643f30201b240c0d156b1b9967a7321ae42d721d7224c", "sha256:3f75065936e16569d6e13dfd76de988f5eabeae460aa54770c9b961ab6f747fc",
"sha256:48b365e32cc5639ae2c239d7bd4f8a1d920a13a7ae92113c4c938903c9400147", "sha256:4d78124f5f281f1d5d5b7919cbbc65a7073ff93562def81ee78a8307e6e72494",
"sha256:4eb71856c1d1b33083df9318fd30143470ad6f0d1b9ad2ee61a120710842d28b", "sha256:5ba4d088b8e5d59b8a5911ca9c72952acf3c83296b57daf75af92fb2af1e8423",
"sha256:5b06ef8422d27d8128f8f80bdefa111eadcab246fba1d668720af4f0b97b7a0e", "sha256:6b19daeda1d5d1dfc973b291246f6a63a663b20c33980724d6d073c562719536",
"sha256:71c553640e1ddaaf143e38dbc6cd1863fa3c0738fb1830a9aaffba9a51838f30", "sha256:790c7dc80fd1c3e38acefe06027e2f5a8466c128c7e47c6e140fd5316132574d",
"sha256:73e2742ee1f923c5f213183bf493901f9630e395634fce5b739a53b7dc5d64be", "sha256:7f8c4e648881454ba3ba0bcf3b21a9e1878a67d20ea2b8d9ec1c4c628592ab6b",
"sha256:82a632bc02063eff0b8e7ff3089aa3d912d1c7499709f51c8f04f57c8832cfe6", "sha256:8bcd3f597290f9902548d6355898d7e376e7f3762f89db9cd50b2b58429df9e8",
"sha256:977ca1ac059e4d4a4bf5fe2224986baf42b69290453eda44822606f4deae6515", "sha256:8ffb18f71972a5c718a8600d9f52e3507f0d6fb72a978e03270d34a7035c98fb",
"sha256:a7e6217d0dd864a7cc4f457172766864496efd64d24d4980df1521f75f992761", "sha256:92f025df1cb391e09f65775598c7dfb7efad72d74713775db54e267f62ca94a1",
"sha256:ad0ed7dd5dff76eb3aae8c18d95b1c9f885a91a92132728051a704fb8060d08c", "sha256:93cf1c72472a2fd0ef4c52f6074dab08fc28d475b9c824ba73a52701f7a48ae1",
"sha256:b1b8eda9e931f0ca1aadb95a890811bdf530407e48c962643b85675329d99abf", "sha256:9a7fa692cdc967fdbf6a053c1975137d01f6935dede2ef222c71840b290caf79",
"sha256:cec99c79205131da3ee75becea1f3f55c57bf6a1c500431de9ae7a32ac8a5cc4", "sha256:a68eb0c1375f2401de881692b30370a51e550052b8e346b2f71bbdbdc74a214f",
"sha256:d4bbdaa6f61ce2ef26535a7d473d6ffa6e413013c5c580af999546bf1627ae11", "sha256:ac3b7a12ddd52ea910ee3a041e6bc65df7a52f0ba7bd10fb7123502af482c152",
"sha256:d8bdb4208975b12048bdace46e9dd8e3dda3872432f95b53789700a1330e6060", "sha256:b402b700edaf571a0bae18ec35d5b71c266873a6616412b672435c10b6d8f041",
"sha256:dce0362ff9b61f8411d1efc9e16fc528dadbd3707a557561992457f5cb446297", "sha256:c29d069a4a30f472482343c866f7486731ad638ef9af92bfe5fca9c7323d638e",
"sha256:defbbbf5653629ce5cc54091ce49c6830da8d3104de53ed2169c9efcb0720f27", "sha256:d822311498f185db449b687336b4e5db7638c8d8b03bdf10ae91d74e23c7cc0c",
"sha256:e0c53a7e2b6d82ec3c26c009c937fc88eb8c7edf000c54334261beaf56bb08f2", "sha256:dccc8df9e1ac158b06777bbaaeb4516f245f9b147701ae25e6023960e4a0c2a3",
"sha256:e1065bacfe5303f107896e63263537dee90920d26050f2e23c4af12c37da2db6", "sha256:e31f4b946c2765b2f35440fdb4b00c496dfc5babc53c7ae61966b41171d1d59f",
"sha256:e142837c4212c0b2c71e6773cb6740828922806b4c00ee4215be3ceb558671e6", "sha256:eb43f9e582cc221ee2832e25ea6fe5c06f2acc9da6353c562e922f107db12af8",
"sha256:f4cbbab28c93ffee6ec929cf0826f0b11d2488e53a708d51142a5e62f8cd9806", "sha256:f07822248110fd6213db8bc2745fdbbccef6f2b3d18ac91a7fba29c6bc575da5",
"sha256:fa8f63b6551621eea9efea4db37ae401104352f0ebaee32f7d20be88cbe589c3" "sha256:ff69854f123b959d4ae14bd5330714bb9ee4360052992dc0fbd0a3dee4261f95"
], ],
"index": "pypi", "index": "pypi",
"version": "==0.29.12" "version": "==0.29.13"
},
"etcd3": {
"hashes": [
"sha256:25a524b9f032c6631ff0097532907dea81243eaa63c3744510fd1598cc4e0e87"
],
"index": "pypi",
"version": "==0.10.0"
}, },
"grpcio": { "grpcio": {
"hashes": [ "hashes": [
@ -176,6 +169,11 @@
"index": "pypi", "index": "pypi",
"version": "==3.1" "version": "==3.1"
}, },
"python-etcd3": {
"editable": true,
"git": "https://github.com/kragniz/python-etcd3",
"ref": "cdc4c48bde88a795230a02aa574df84ed9ccfa52"
},
"six": { "six": {
"hashes": [ "hashes": [
"sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c", "sha256:3350809f0555b11f552448330d0b52d5f24c91a322ea4a15ef22629740f3761c",
@ -227,6 +225,19 @@
"index": "pypi", "index": "pypi",
"version": "==1.6.2" "version": "==1.6.2"
}, },
"ddt": {
"hashes": [
"sha256:474546b4020ce8a2f9550ba8899c28aa2c284c7bbf175bddede98be949d1ca7c",
"sha256:d13e6af8f36238e89d00f4ebccf2bda4f6d1878be560a6600689e42077e164e3"
],
"version": "==1.2.1"
},
"gitdb": {
"hashes": [
"sha256:a3ebbc27be035a2e874ed904df516e35f4a29a778a764385de09de9e0f139658"
],
"version": "==0.6.4"
},
"gitdb2": { "gitdb2": {
"hashes": [ "hashes": [
"sha256:83361131a1836661a155172932a13c08bda2db3674e4caa32368aa6eb02f38c2", "sha256:83361131a1836661a155172932a13c08bda2db3674e4caa32368aa6eb02f38c2",
@ -236,10 +247,17 @@
}, },
"gitpython": { "gitpython": {
"hashes": [ "hashes": [
"sha256:563221e5a44369c6b79172f455584c9ebbb122a13368cc82cb4b5addff788f82", "sha256:7428f1cc5e72d53e65c3259d5cebc22fb2b07f973c49d95b3c3d26c64890a3c3",
"sha256:8237dc5bfd6f1366abeee5624111b9d6879393d84745a507de0fda86043b65a8" "sha256:a0f744a4941eac5d99033eb0adcbec83bf443ee173fda4292d92a906aedce952"
], ],
"version": "==2.1.11" "version": "==2.1.12"
},
"mccabe": {
"hashes": [
"sha256:ab8a6258860da4b6677da4bd2fe5dc2c659cff31b3ee4f7f5d64e79735b80d42",
"sha256:dd8d182285a0fe56bace7f45b5e7d1a6ebcbf524e8f3bd87eb0f125271b8831f"
],
"version": "==0.6.1"
}, },
"pbr": { "pbr": {
"hashes": [ "hashes": [
@ -248,6 +266,34 @@
], ],
"version": "==5.4.1" "version": "==5.4.1"
}, },
"pycodestyle": {
"hashes": [
"sha256:95a2219d12372f05704562a14ec30bc76b05a5b297b21a5dfe3f6fac3491ae56",
"sha256:e40a936c9a450ad81df37f549d676d127b1b66000a6c500caa2b085bc0ca976c"
],
"version": "==2.5.0"
},
"pydocstyle": {
"hashes": [
"sha256:58c421dd605eec0bce65df8b8e5371bb7ae421582cdf0ba8d9435ac5b0ffc36a"
],
"version": "==4.0.0"
},
"pyflakes": {
"hashes": [
"sha256:17dbeb2e3f4d772725c777fabc446d5634d1038f234e77343108ce445ea69ce0",
"sha256:d976835886f8c5b31d47970ed689944a0262b5f3afa00a5a7b4dc81e5449f8a2"
],
"version": "==2.1.1"
},
"pylama": {
"hashes": [
"sha256:9bae53ef9c1a431371d6a8dca406816a60d547147b60a4934721898f553b7d8f",
"sha256:fd61c11872d6256b019ef1235be37b77c922ef37ac9797df6bd489996dddeb15"
],
"index": "pypi",
"version": "==7.7.1"
},
"pyyaml": { "pyyaml": {
"hashes": [ "hashes": [
"sha256:57acc1d8533cbe51f6662a55434f0dbecfa2b9eaf115bede8f6fd00115a0c0d3", "sha256:57acc1d8533cbe51f6662a55434f0dbecfa2b9eaf115bede8f6fd00115a0c0d3",
@ -271,6 +317,12 @@
], ],
"version": "==1.12.0" "version": "==1.12.0"
}, },
"smmap": {
"hashes": [
"sha256:0e2b62b497bd5f0afebc002eda4d90df9d209c30ef257e8673c90a6b5c119d62"
],
"version": "==0.9.0"
},
"smmap2": { "smmap2": {
"hashes": [ "hashes": [
"sha256:0555a7bf4df71d1ef4218e4807bbf9b201f910174e6e08af2e138d4e517b4dde", "sha256:0555a7bf4df71d1ef4218e4807bbf9b201f910174e6e08af2e138d4e517b4dde",
@ -278,6 +330,12 @@
], ],
"version": "==2.0.5" "version": "==2.0.5"
}, },
"snowballstemmer": {
"hashes": [
"sha256:9f3b9ffe0809d174f7047e121431acf99c89a7040f0ca84f94ba53a498e6d0c9"
],
"version": "==1.9.0"
},
"stevedore": { "stevedore": {
"hashes": [ "hashes": [
"sha256:7be098ff53d87f23d798a7ce7ae5c31f094f3deb92ba18059b1aeb1ca9fec0a0", "sha256:7be098ff53d87f23d798a7ce7ae5c31f094f3deb92ba18059b1aeb1ca9fec0a0",

@ -1 +0,0 @@
Subproject commit d079acadf29e6df55c329574604148631d4ad4bc

1
etcd3_wrapper Symbolic link
View File

@ -0,0 +1 @@
../etcd3_wrapper/

13811
log.txt

File diff suppressed because it is too large Load Diff

270
main.py
View File

@ -6,47 +6,27 @@
# For QEMU Monitor Protocol Commands Information, See # For QEMU Monitor Protocol Commands Information, See
# https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor # https://qemu.weilnetz.de/doc/qemu-doc.html#pcsys_005fmonitor
import json
import argparse import argparse
import qmp import qmp
import logging import logging
import os
import subprocess import subprocess
import atexit import threading
import signal import time
from etcd3_wrapper import Etcd3Wrapper from etcd3_wrapper import Etcd3Wrapper
from dataclasses import dataclass from dataclasses import dataclass
from typing import Union from typing import Union
from functools import wraps from functools import wraps
from decouple import config
from datetime import datetime from ucloud_common.rbd import RBD
from ucloud_common.enums import VMStatus, RUNNING_VM_STATUSES from ucloud_common.vm import VmPool, VMEntry, VMStatus
from ucloud_common.host import HostPool, HostEntry
running_vms = [] running_vms = []
vnc_port_pool = list(range(0, 100)) vnc_port_pool = list(range(0, 100))
client = Etcd3Wrapper() client = Etcd3Wrapper()
VM_POOL = None
HOST_POOL = None
@dataclass
class VM:
key: str
vm: qmp.QEMUMachine
class RBD(object):
@staticmethod
def ls(pool):
output = ""
try:
output = subprocess.check_output(
["rbd", "ls", pool], stderr=subprocess.PIPE
).decode("utf-8").strip()
except subprocess.CalledProcessError as e:
raise Exception(e.stderr)
return output.split("\n")
logging.basicConfig( logging.basicConfig(
level=logging.DEBUG, level=logging.DEBUG,
@ -56,25 +36,19 @@ logging.basicConfig(
datefmt="%d-%b-%y %H:%M:%S", datefmt="%d-%b-%y %H:%M:%S",
) )
@dataclass
class VM:
key: str
vm: qmp.QEMUMachine
def goodbye(host):
vms = client.get_prefix("/v1/vm", value_in_json=True)
vms = filter(lambda v: v.value["hostname"] == host.key, vms)
for vm in vms:
vm.value["hostname"] = ""
if vm.value["status"] in RUNNING_VM_STATUSES: def update_heartbeat(host: HostEntry):
vm.value["status"] = VMStatus.requested_start while True:
host.update_heartbeat()
client.put(vm.key, vm.value, value_in_json=True) HOST_POOL.put(host)
time.sleep(10)
host.value["status"] = "DEAD"
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
client.put(host.key, json.dumps(host.value))
logging.info(f"Host {host.key} dead! at {host.value['last_heartbeat']}") logging.info(f"Updated last heartbeat time {host.last_heartbeat}")
print("Goodbye")
os.kill(os.getpid(), signal.SIGKILL)
def need_running_vm(func): def need_running_vm(func):
@ -86,48 +60,55 @@ def need_running_vm(func):
status = vm.vm.command("query-status") status = vm.vm.command("query-status")
logging.debug(f"VM Status Check - {status}") logging.debug(f"VM Status Check - {status}")
except OSError: except OSError:
logging.info(f"{func.__name__} failed - VM {e.key} - Unknown Error") logging.info(
f"{func.__name__} failed - VM {e.key} - Unknown Error"
)
return func(e) return func(e)
else: else:
logging.info(f"{func.__name__} failed because VM {e.key} is not running") logging.info(
f"{func.__name__} failed because VM {e.key} is not running"
)
return return
return wrapper return wrapper
def create_vm(vm_uuid, e): def create_vm(vm):
image = client.get( image = client.get(
f"/v1/image/{e.value['image_uuid']}", value_in_json=True f"/v1/image/{vm.value['image_uuid']}", value_in_json=True
) )
if image: if image:
logging.debug(image) logging.debug(image)
image_uuid = e.value["image_uuid"]
logging.info("Creating New VM...") logging.info("Creating New VM...")
_command_to_create = f"rbd clone images/{image_uuid}@protected uservms/{vm_uuid}" _command_to_create = f"rbd clone images/{vm.image_uuid}@protected uservms/{vm.uuid}"
try: try:
subprocess.call(_command_to_create.split(" ")) subprocess.call(_command_to_create.split(" "))
# TODO: Make it specific vm.status = VMStatus.requested_start
VM_POOL.put(vm)
except: except:
pass logging.exception("Can't clone image")
e.value["status"] = "REQUESTED_START"
client.put(e.key, json.dumps(e.value))
def start_vm(vm_path, e): def start_vm(vm_path, e):
if not vm_path.split("/")[-1] in RBD.ls("uservms"): try:
logging.info(f"Image file of vm {e.key} does not exists") user_vms = RBD.ls("uservms")
logging.info(f"Setting vm {e.key} status to DELETED") except:
e.value["status"] = "DELETED" logging.info("Can't access uservms pool")
client.put(e.key, json.dumps(e.value))
return return
if not vm_path.split("/")[-1] in user_vms:
logging.info(f"Image file of vm {e.key} does not exists")
logging.info(f"Deleting vm {e.key}")
client.client.delete(e.key)
return
_vm = get_vm(running_vms, e.key) _vm = get_vm(running_vms, e.key)
if _vm: if _vm:
logging.info(f"{e.key} already running") logging.info(f"{e.key} already running")
e.value["status"] = "RUNNING" e.status = VMStatus.running
client.put(e.key, json.dumps(e.value)) VM_POOL.put(e)
return return
# FIXME: There should be better vnc port allocation scheme # FIXME: There should be better vnc port allocation scheme
@ -136,10 +117,13 @@ def start_vm(vm_path, e):
test_dir="vm_socklog", test_dir="vm_socklog",
args=[ args=[
vm_path, vm_path,
"-boot", "c", # First Boot Hard drive "-boot",
"-m", "1024", # RAM limit "c", # First Boot Hard drive
"-m",
"1024", # RAM limit
# Ever growing port number # Ever growing port number
"-vnc", f":{vnc_port_pool.pop(0)}", # Allow VNC "-vnc",
f":{vnc_port_pool.pop(0)}", # Allow VNC
], ],
) )
try: try:
@ -147,16 +131,16 @@ def start_vm(vm_path, e):
vm.launch() vm.launch()
if vm.is_running(): if vm.is_running():
running_vms.append(VM(e.key, vm)) running_vms.append(VM(e.key, vm))
e.value["status"] = "RUNNING" e.status = VMStatus.running
client.put(e.key, e.value, value_in_json=True) VM_POOL.put(e)
else: else:
e.value["status"] = "KILLED" e.declare_killed()
client.put(e.key, e.value, value_in_json=True) VM_POOL.put(e)
return return
except (qmp.QEMUMachineError, TypeError): except (qmp.QEMUMachineError, TypeError):
logging.exception(f"Machine Error Occurred on {e.key}") logging.exception(f"Machine Error Occurred on {e.key}")
e.value["status"] = "KILLED" e.declare_killed()
client.put(e.key, e.value, value_in_json=True) VM_POOL.put(e)
else: else:
logging.info(f"Started Successfully {e.key}") logging.info(f"Started Successfully {e.key}")
@ -166,8 +150,8 @@ def suspend_vm(e):
vm = get_vm(running_vms, e.key) vm = get_vm(running_vms, e.key)
vm.vm.command("stop") vm.vm.command("stop")
if vm.vm.command("query-status")["status"] == "paused": if vm.vm.command("query-status")["status"] == "paused":
e.value["status"] = "SUSPENDED" e.status = VMStatus.suspended
client.put(e.key, json.dumps(e.value)) VM_POOL.put(e)
logging.info(f"Successfully suspended VM {e.key}") logging.info(f"Successfully suspended VM {e.key}")
else: else:
logging.info(f"Suspending VM {e.key} failed") logging.info(f"Suspending VM {e.key} failed")
@ -178,8 +162,8 @@ def resume_vm(e):
vm = get_vm(running_vms, e.key) vm = get_vm(running_vms, e.key)
vm.vm.command("cont") vm.vm.command("cont")
if vm.vm.command("query-status")["status"] == "running": if vm.vm.command("query-status")["status"] == "running":
e.value["status"] = "RUNNING" e.status = VMStatus.running
client.put(e.key, json.dumps(e.value)) VM_POOL.put(e)
logging.info(f"Successfully resumed VM {e.key}") logging.info(f"Successfully resumed VM {e.key}")
else: else:
logging.info(f"Resuming VM {e.key} failed") logging.info(f"Resuming VM {e.key} failed")
@ -191,146 +175,110 @@ def shutdown_vm(e):
vm.vm.shutdown() vm.vm.shutdown()
if not vm.vm.is_running(): if not vm.vm.is_running():
logging.info(f"VM {e.key} shutdown successfully") logging.info(f"VM {e.key} shutdown successfully")
e.value["status"] = "STOPPED" e.status = VMStatus.stopped
client.put(e.key, json.dumps(e.value)) VM_POOL.put(e)
running_vms.remove(vm) running_vms.remove(vm)
def delete_vm(e): def delete_vm(e):
#FIXME: Implementation Obseleted after CEPH Integeration # FIXME: Implementation Obseleted after CEPH Integeration
logging.info(f"Deleting VM {e.key}") logging.info(f"Deleting VM {e.key}")
shutdown_vm(e) shutdown_vm(e)
vm = client.get(e.key, value_in_json=True) client.client.delete(e.key)
if vm:
vm_id = e.key.split('/')[-1]
vm_owner = e.value['owner']
vm_path = f"{config('BASE_DIR')}/{vm_owner}/.vm/{vm_id}"
if os.path.exists(vm_path):
os.remove(vm_path)
client.client.delete(e.key)
logging.info(f"VM {vm.key} deleted")
else:
logging.info(f"Cannot delete key {e.key} because it doesn't exists")
def get_vm(vm_list: list, vm_key) -> Union[VM, None]: def get_vm(vm_list: list, vm_key) -> Union[VM, None]:
return next((vm for vm in vm_list if vm.key == vm_key), None) return next((vm for vm in vm_list if vm.key == vm_key), None)
def maintenence(e, host): def maintenence(host):
_vms = VM_POOL.by_host(host.key)
# VMs on this Host alleged_running_vms = VM_POOL.by_status("RUNNING", _vms)
_vms = filter(lambda v: v.value["hostname"] == host.key, client.get_prefix("/v1/vm", value_in_json=True))
alleged_running_vms = filter(lambda v: v.value["status"] == "RUNNING", _vms)
# TODO: Delete this. This was intended to start VMs that
# requested to be started when ucloud-vm is not running.
# This is no longer needed as we check for pending requests
# at the start and handle them.
# should_be_running = filter(lambda v: v.value["status"] == "REQUESTED_START", _vms)
for vm in alleged_running_vms: for vm in alleged_running_vms:
_vm = get_vm(running_vms, vm.key) _vm = get_vm(running_vms, vm.key)
if (_vm and not _vm.vm.is_running()) or _vm is None: if (_vm and not _vm.vm.is_running()) or _vm is None:
logging.debug(f"{_vm} {vm.key}") logging.debug(f"{_vm} {vm.key}")
logging.info(f"{vm.key} is not running but is said to be running") logging.info(
f"{vm.key} is not running but is said to be running"
)
logging.info(f"Updating {vm.key} status to KILLED") logging.info(f"Updating {vm.key} status to KILLED")
vm.value["status"] = "KILLED" vm.declare_killed()
client.put(vm.key, json.dumps(vm.value)) VM_POOL.put(vm)
running_vms.remove(vm)
# TODO: Delete this. This was intended to start VMs that
# requested to be started when ucloud-vm is not running.
# This is no longer needed as we check for pending requests
# at the start and handle them.
# for vm in should_be_running:
# vm_path = f"rbd:uservms/{vm.key.split('/')[-1]}"
# start_vm(vm_path, e)
# TODO: Check whether a vm is running on this host that
# is not supposed to be running on this host
host.value["status"] = "ALIVE"
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
client.put(host.key, json.dumps(host.value))
logging.info(f"Updated last heartbeat time {host.value['last_heartbeat']}")
def main(): def main():
argparser = argparse.ArgumentParser() argparser = argparse.ArgumentParser()
argparser.add_argument("hostname", help="Name of this host. e.g /v1/host/1") argparser.add_argument(
"hostname", help="Name of this host. e.g /v1/host/1"
)
args = argparser.parse_args() args = argparser.parse_args()
host = client.get(args.hostname, value_in_json=True) global HOST_POOL, VM_POOL
HOST_POOL = HostPool(client, "/v1/host")
VM_POOL = VmPool(client, "/v1/vm")
host = HOST_POOL.get(args.hostname)
if not host: if not host:
print("No Such Host") print("No Such Host")
exit(1) exit(1)
host.value["status"] = "ALIVE"
host.value["last_heartbeat"] = datetime.utcnow().isoformat()
client.put(host.key, host.value, value_in_json=True)
atexit.register(goodbye, host=host) # It is seen that under heavy load, timeout event doesn't come
# in a predictive manner which delays heart beat update which
# in turn misunderstood by scheduler that the host is dead
# when it is actually alive. So, to ensure that we update the
# heart beat in a predictive manner we start Heart beat updating
# mechanism in separated thread
heartbeat_updating_thread = threading.Thread(target=update_heartbeat, args=(host,))
heartbeat_updating_thread.start()
for events_iterator in [client.get_prefix("/v1/vm/"), for events_iterator in [
client.watch_prefix("/v1/vm/", timeout=10)]: client.get_prefix("/v1/vm/", value_in_json=True),
client.watch_prefix("/v1/vm/", timeout=10, value_in_json=True),
]:
for e in events_iterator: for e in events_iterator:
# TODO: Should we disable timeout alarm inside e = VMEntry(e)
# event handling code and enable it while going outside
try:
e.value = json.loads(e.value)
except json.JSONDecodeError:
logging.error(f"Invalid JSON {e.value}")
continue
e_status = e.value["status"] if e.status == "TIMEOUT":
if e_status == "TIMEOUT":
client.client.delete(e.key)
logging.info("Timeout") logging.info("Timeout")
maintenence(e, host) maintenence(host)
continue continue
e_hostname = e.value["hostname"] if hasattr(e, "migration_destination"):
if hasattr(e.value, "migration_destination"): e_migration_destination = e.value[
e_migration_destination = e.value["migration_destination"] "migration_destination"
]
else: else:
e_migration_destination = "" e_migration_destination = ""
vm_uuid = e.key.split("/")[-1]
# If the event is directed toward me or # If the event is directed toward me or
# I am destination of a REQUESTED_MIGRATION # I am destination of a REQUESTED_MIGRATION
if e_hostname == host.key or\ if e.hostname == host.key or e_migration_destination == host.key:
e_migration_destination == host.key:
logging.debug(f"EVENT: {e}") logging.debug(f"EVENT: {e}")
if e_status == "SCHEDULED_DEPLOY": if e.status == "SCHEDULED_DEPLOY":
create_vm(vm_uuid, e) create_vm(e)
elif e_status == "REQUESTED_SUSPEND": elif e.status == "REQUESTED_SUSPEND":
suspend_vm(e) suspend_vm(e)
elif e_status == "REQUESTED_RESUME": elif e.status == "REQUESTED_RESUME":
resume_vm(e) resume_vm(e)
elif e_status == "REQUESTED_START": elif e.status == "REQUESTED_START":
vm_path = f"rbd:uservms/{vm_uuid}" vm_path = f"rbd:uservms/{e.uuid}"
start_vm(vm_path, e) start_vm(vm_path, e)
elif e_status == "REQUESTED_SHUTDOWN": elif e.status == "REQUESTED_SHUTDOWN":
shutdown_vm(e) shutdown_vm(e)
elif e_status == "DELETED": elif e.status == "DELETED":
delete_vm(e) delete_vm(e)
# elif e_status == "REQUESTED_MIGRATION": # elif e_status == "REQUESTED_MIGRATION":
# if e.value["migration_destination"] # if e.value["migration_destination"]
logging.info(f"Running VMs {running_vms}") logging.info(f"Running VMs {running_vms}")

1
ucloud_common Symbolic link
View File

@ -0,0 +1 @@
../ucloud_common/

View File

@ -1,34 +0,0 @@
from enum import Enum
class VMStatus(Enum):
# Must be only assigned to brand new VM
requested_new = "REQUESTED_NEW"
# Only Assigned to already created vm
requested_start = "REQUESTED_START"
# These all are for running vms
requested_shutdown = "REQUESTED_SHUTDOWN"
requested_suspend = "REQUESTED_SUSPEND"
requested_resume = "REQUESTED_RESUME"
requested_migrate = "REQUESTED_MIGRATE"
# either its image is not found or user requested
# to delete it
deleted = "DELETED"
stopped = "STOPPED" # After requested_shutdown
killed = "KILLED" # either host died or vm died itself
running = "RUNNING"
suspended = "SUSPENDED"
class HostStatus(Enum):
alive = "ALIVE"
dead = "DEAD"
RUNNING_VM_STATUSES = [VMStatus.requested_shutdown, VMStatus.requested_suspend,
VMStatus.requested_resume, VMStatus.requested_migrate,
VMStatus.running, VMStatus.suspended]