ucloud-setup/app/helper.py

122 lines
3.2 KiB
Python
Raw Normal View History

2019-08-27 08:31:22 +00:00
import subprocess
2019-08-27 11:19:54 +00:00
import inspect
from dataclasses import dataclass
from abc import ABC
from enum import Enum
2019-08-27 08:31:22 +00:00
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
class ResultType(Enum):
success = 0
failure = 1
2019-08-27 08:31:22 +00:00
2019-08-28 11:42:50 +00:00
2019-08-27 08:31:22 +00:00
def clone(repo):
command = f"git clone {repo}"
try:
subprocess.check_output(command.split())
except subprocess.CalledProcessError as e:
return False
else:
return True
2019-08-28 11:42:50 +00:00
2019-08-27 08:31:22 +00:00
def clone_common():
return clone("https://code.ungleich.ch/ungleich-public/ucloud_common")
2019-08-28 11:42:50 +00:00
2019-08-27 08:31:22 +00:00
def clone_etcd_wrapper():
return clone("https://code.ungleich.ch/ahmedbilal/etcd3_wrapper")
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
class Result(object):
def __init__(self, _result, _type: ResultType, _op=""):
self._type = _type
self._result = str(_result)
self._op = _op
if self._type == ResultType.failure:
print(self._op, "failed")
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
def add(self, operation, **kwargs):
if self._type == ResultType.success:
r = operation(**kwargs)
self._type = r._type
self._result = r._result
return self
else:
print("Dependency not satisfied")
exit(-1)
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
def __repr__(self):
return f"{self._type}, {self._result}"
2019-08-27 08:31:22 +00:00
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
class Operation(ABC):
pass
2019-08-27 08:31:22 +00:00
2019-08-27 11:19:54 +00:00
class GitOperation(object):
@staticmethod
def clone(url, path="."):
command = f"git clone {url}"
try:
output = subprocess.check_output(command.split(), cwd=path)
except subprocess.CalledProcessError as e:
2019-08-28 11:42:50 +00:00
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
2019-08-27 11:19:54 +00:00
else:
return Result(output, ResultType.success)
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
class PipenvOperation(object):
@staticmethod
def install(path=".", package_name=None):
if package_name:
command = f"pipenv install {package_name}"
else:
command = f"pipenv install"
try:
output = subprocess.check_output(command.split(), cwd=path)
except subprocess.CalledProcessError as e:
2019-08-28 11:42:50 +00:00
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
2019-08-27 11:19:54 +00:00
else:
return Result(output, ResultType.success)
class FileOperation(object):
@staticmethod
def write(path, content, mode="w"):
try:
with open(path, mode) as file:
file.write(content)
except Exception as e:
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
else:
return Result("", ResultType.success)
2019-08-28 07:13:05 +00:00
def get_distro_name():
distro_name = None
with open("/etc/os-release") as f:
content = f.read()
start = content.find("ID=") + 3
end = content.find("\n", start)
distro_name = content[start:end]
return distro_name
2019-08-28 11:42:50 +00:00
2019-08-28 07:13:05 +00:00
class PackageManagementOperation(object):
@staticmethod
def install(package_name):
try:
distro = get_distro_name()
if distro == "alpine":
command = f"apk add {package_name}"
else:
assert "Unknown Distro"
2019-08-28 11:42:50 +00:00
2019-08-28 08:53:29 +00:00
subprocess.check_output(command.split())
2019-08-28 07:13:05 +00:00
except Exception as e:
print(e)
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
else:
return Result("", ResultType.success)