import subprocess import logging logging.basicConfig() log = logging.getLogger("checks") class CheckException(Exception): pass class WrongParameterException(CheckException): pass class BaseCheck(object): def check(self): """ Should return tuple: (true/false, data) data is the raw result and can be saved in a database """ pass class DNSCheck(BaseCheck): def __init__(self, name, rr_type="AAAA", expected_result=None, server=None): self.rr_types = [ "AAAA", "A", "PTR", "TXT" ] self.name = name self.rr_type = rr_type self.server = server self.expected_result = expected_result if not self.rr_type in self.rr_types: raise WrongParameterException("Unsupported rr_type: {}".format(self.rr_type)) self.command = self.rr_type_to_command() def rr_type_to_command(self): base_cmd="dig +short" if self.rr_type == "AAAA": command = "{} {} aaaa".format(base_cmd, self.name) elif self.rr_type == "A": command = "{} {} a".format(base_cmd, self.name) elif self.rr_type == "PTR": command = "-x {}".format(base_cmd, self.name) elif self.rr_type == "TXT": command = "{} {} txt".format(base_cmd, self.name) if self.server: command = "{} @{}".format(command, self.server) return command.split() def check(self): res = subprocess.run(self.command, capture_output=True, encoding="utf-8") if not res.returncode == 0: return (False, "") if self.expected_result: if not self.expected_result == res.stdout: return (False, res.stdout) else: if res.stdout == "": return (False, res.stdout) # Has (correct) data return (True, res.stdout) def __repr__(self): return "".format(self.__str__()) def __str__(self): name = "{}/{}".format(self.name, self.rr_type) if self.server: name = "{}@{}".format(name, self.server) return name class DFCheck(BaseCheck): def __init__(self, name, username="root", path="/"): self.name = name self.username = username self.path = path self.command = self.create_command() log.info("Command = {}".format(self.command)) def create_command(self): base_command ='ssh {}@{}'.format(self.username, self.name).split() base_command.append("df {}".format(self.path)) return base_command def check(self): res = subprocess.run(self.command, capture_output=True, encoding="utf-8") if not res.returncode == 0: return (False, "") return (True, res.stdout) def __repr__(self): return "".format(self.__str__()) def __str__(self): name = "{}:{}".format(self.name, self.path) return name