2019-08-27 08:31:22 +00:00
|
|
|
import subprocess
|
2019-08-27 11:19:54 +00:00
|
|
|
import inspect
|
|
|
|
from dataclasses import dataclass
|
|
|
|
from abc import ABC
|
|
|
|
from enum import Enum
|
2019-08-27 08:31:22 +00:00
|
|
|
|
2019-08-27 11:19:54 +00:00
|
|
|
class ResultType(Enum):
|
|
|
|
success = 0
|
|
|
|
failure = 1
|
2019-08-27 08:31:22 +00:00
|
|
|
|
|
|
|
def clone(repo):
|
|
|
|
command = f"git clone {repo}"
|
|
|
|
try:
|
|
|
|
subprocess.check_output(command.split())
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
|
|
return False
|
|
|
|
else:
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
|
|
def clone_common():
|
|
|
|
return clone("https://code.ungleich.ch/ungleich-public/ucloud_common")
|
|
|
|
|
|
|
|
def clone_etcd_wrapper():
|
|
|
|
return clone("https://code.ungleich.ch/ahmedbilal/etcd3_wrapper")
|
|
|
|
|
2019-08-27 11:19:54 +00:00
|
|
|
class Result(object):
|
|
|
|
def __init__(self, _result, _type: ResultType, _op=""):
|
|
|
|
self._type = _type
|
|
|
|
self._result = str(_result)
|
|
|
|
self._op = _op
|
|
|
|
if self._type == ResultType.failure:
|
|
|
|
print(self._op, "failed")
|
|
|
|
|
|
|
|
|
|
|
|
def add(self, operation, **kwargs):
|
|
|
|
if self._type == ResultType.success:
|
|
|
|
r = operation(**kwargs)
|
|
|
|
self._type = r._type
|
|
|
|
self._result = r._result
|
|
|
|
return self
|
|
|
|
else:
|
|
|
|
print("Dependency not satisfied")
|
|
|
|
exit(-1)
|
|
|
|
|
2019-08-27 08:31:22 +00:00
|
|
|
|
2019-08-27 11:19:54 +00:00
|
|
|
def __repr__(self):
|
|
|
|
return f"{self._type}, {self._result}"
|
2019-08-27 08:31:22 +00:00
|
|
|
|
2019-08-27 11:19:54 +00:00
|
|
|
class Operation(ABC):
|
|
|
|
pass
|
2019-08-27 08:31:22 +00:00
|
|
|
|
2019-08-27 11:19:54 +00:00
|
|
|
|
|
|
|
class GitOperation(object):
|
|
|
|
@staticmethod
|
|
|
|
def clone(url, path="."):
|
|
|
|
command = f"git clone {url}"
|
|
|
|
try:
|
|
|
|
output = subprocess.check_output(command.split(), cwd=path)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
|
|
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
|
|
|
|
else:
|
|
|
|
return Result(output, ResultType.success)
|
|
|
|
|
|
|
|
class PipenvOperation(object):
|
|
|
|
@staticmethod
|
|
|
|
def install(path=".", package_name=None):
|
|
|
|
if package_name:
|
|
|
|
command = f"pipenv install {package_name}"
|
|
|
|
else:
|
|
|
|
command = f"pipenv install"
|
|
|
|
try:
|
|
|
|
output = subprocess.check_output(command.split(), cwd=path)
|
|
|
|
except subprocess.CalledProcessError as e:
|
|
|
|
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
|
|
|
|
else:
|
|
|
|
return Result(output, ResultType.success)
|
|
|
|
|
|
|
|
|
|
|
|
class FileOperation(object):
|
|
|
|
@staticmethod
|
|
|
|
def write(path, content, mode="w"):
|
|
|
|
try:
|
|
|
|
with open(path, mode) as file:
|
|
|
|
file.write(content)
|
|
|
|
except Exception as e:
|
|
|
|
return Result(e, ResultType.failure, inspect.currentframe().f_code.co_name)
|
|
|
|
else:
|
|
|
|
return Result("", ResultType.success)
|
|
|
|
|