ucloud-setup/app/helper.py

90 lines
2.5 KiB
Python
Raw Normal View History

2019-08-27 08:31:22 +00:00
import subprocess
2019-08-27 11:19:54 +00:00
import inspect
from dataclasses import dataclass
from abc import ABC
from enum import Enum
2019-08-27 08:31:22 +00:00
2019-08-27 11:19:54 +00:00
class ResultType(Enum):
success = 0
failure = 1
2019-08-27 08:31:22 +00:00
def clone(repo):
command = f"git clone {repo}"
try:
subprocess.check_output(command.split())
except subprocess.CalledProcessError as e:
return False
else:
return True
def clone_common():
return clone("https://code.ungleich.ch/ungleich-public/ucloud_common")
def clone_etcd_wrapper():
return clone("https://code.ungleich.ch/ahmedbilal/etcd3_wrapper")
2019-08-27 11:19:54 +00:00
class Result(object):
def __init__(self, _result, _type: ResultType, _op=""):
self._type = _type
self._result = str(_result)
self._op = _op
if self._type == ResultType.failure:
print(self._op, "failed")
def add(self, operation, **kwargs):
if self._type == ResultType.success:
r = operation(**kwargs)
self._type = r._type
self._result = r._result
return self
else:
print("Dependency not satisfied")
exit(-1)
2019-08-27 08:31:22 +00:00
2019-08-27 11:19:54 +00:00
def __repr__(self):
return f"{self._type}, {self._result}"
2019-08-27 08:31:22 +00:00
2019-08-27 11:19:54 +00:00
class Operation(ABC):
pass
2019-08-27 08:31:22 +00:00
2019-08-27 11:19:54 +00:00
class GitOperation(object):
@staticmethod
def clone(url, path="."):
command = f"git clone {url}"
try:
output = subprocess.check_output(command.split(), cwd=path)
except subprocess.CalledProcessError as e:
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
else:
return Result(output, ResultType.success)
class PipenvOperation(object):
@staticmethod
def install(path=".", package_name=None):
if package_name:
command = f"pipenv install {package_name}"
else:
command = f"pipenv install"
try:
output = subprocess.check_output(command.split(), cwd=path)
except subprocess.CalledProcessError as e:
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
else:
return Result(output, ResultType.success)
class FileOperation(object):
@staticmethod
def write(path, content, mode="w"):
try:
with open(path, mode) as file:
file.write(content)
except Exception as e:
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
else:
return Result("", ResultType.success)