ucloud-setup/app/helper.py

137 lines
3.8 KiB
Python
Raw Normal View History

2019-08-27 08:31:22 +00:00
import subprocess
2019-08-27 11:19:54 +00:00
import inspect
2019-08-29 18:32:43 +00:00
import os
2019-08-27 11:19:54 +00:00
from dataclasses import dataclass
from abc import ABC
from enum import Enum
2019-08-27 08:31:22 +00:00
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
class ResultType(Enum):
success = 0
failure = 1
2019-08-27 08:31:22 +00:00
2019-08-28 11:42:50 +00:00
2019-08-27 08:31:22 +00:00
def clone(repo):
command = f"git clone {repo}"
try:
subprocess.check_output(command.split())
except subprocess.CalledProcessError as e:
return False
else:
return True
2019-08-28 11:42:50 +00:00
2019-08-27 08:31:22 +00:00
def clone_common():
return clone("https://code.ungleich.ch/ungleich-public/ucloud_common")
2019-08-28 11:42:50 +00:00
2019-08-27 08:31:22 +00:00
def clone_etcd_wrapper():
return clone("https://code.ungleich.ch/ahmedbilal/etcd3_wrapper")
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
class Result(object):
def __init__(self, _result, _type: ResultType, _op=""):
self._type = _type
self._result = str(_result)
self._op = _op
if self._type == ResultType.failure:
print(self._op, "failed")
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
def add(self, operation, **kwargs):
if self._type == ResultType.success:
r = operation(**kwargs)
self._type = r._type
self._result = r._result
return self
else:
print("Dependency not satisfied")
exit(-1)
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
def __repr__(self):
return f"{self._type}, {self._result}"
2019-08-27 08:31:22 +00:00
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
class Operation(ABC):
pass
2019-08-27 08:31:22 +00:00
2019-08-27 11:19:54 +00:00
class GitOperation(object):
@staticmethod
def clone(url, path="."):
command = f"git clone {url}"
try:
output = subprocess.check_output(command.split(), cwd=path)
except subprocess.CalledProcessError as e:
2019-08-28 11:42:50 +00:00
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
2019-08-27 11:19:54 +00:00
else:
return Result(output, ResultType.success)
2019-08-28 11:42:50 +00:00
2019-08-27 11:19:54 +00:00
class PipenvOperation(object):
2019-08-29 17:51:43 +00:00
@staticmethod
def create(path=".", site_packages=False):
if site_packages:
command = f"pipenv --site-packages --python 3.7"
else:
command = "pipenv --python 3.7"
try:
output = subprocess.check_output(command.split(), cwd=path)
except subprocess.CalledProcessError as e:
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
else:
return Result(output, ResultType.success)
2019-08-27 11:19:54 +00:00
@staticmethod
def install(path=".", package_name=None):
if package_name:
2019-08-29 18:32:43 +00:00
command = f"pipenv install {package_name}"
2019-08-27 11:19:54 +00:00
else:
2019-08-29 18:32:43 +00:00
command = f"pipenv install"
2019-08-27 11:19:54 +00:00
try:
2019-08-29 18:32:43 +00:00
output = subprocess.check_output(command.split(), cwd=path, env={**os.environ, "PIP_INSTALL_OPTION": "-- --jobs=8"})
2019-08-27 11:19:54 +00:00
except subprocess.CalledProcessError as e:
2019-08-28 11:42:50 +00:00
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
2019-08-27 11:19:54 +00:00
else:
return Result(output, ResultType.success)
class FileOperation(object):
@staticmethod
def write(path, content, mode="w"):
try:
with open(path, mode) as file:
file.write(content)
except Exception as e:
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
else:
return Result("", ResultType.success)
2019-08-28 07:13:05 +00:00
def get_distro_name():
distro_name = None
with open("/etc/os-release") as f:
content = f.read()
start = content.find("ID=") + 3
end = content.find("\n", start)
distro_name = content[start:end]
return distro_name
2019-08-28 11:42:50 +00:00
2019-08-28 07:13:05 +00:00
class PackageManagementOperation(object):
@staticmethod
def install(package_name):
try:
distro = get_distro_name()
if distro == "alpine":
command = f"apk add {package_name}"
else:
assert "Unknown Distro"
2019-08-28 11:42:50 +00:00
2019-08-28 08:53:29 +00:00
subprocess.check_output(command.split())
2019-08-28 07:13:05 +00:00
except Exception as e:
print(e)
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
else:
return Result("", ResultType.success)