122 lines
3 KiB
Python
122 lines
3 KiB
Python
import subprocess
|
|
import logging
|
|
|
|
logging.basicConfig()
|
|
log = logging.getLogger("checks")
|
|
|
|
class CheckException(Exception):
|
|
pass
|
|
|
|
class WrongParameterException(CheckException):
|
|
pass
|
|
|
|
class BaseCheck(object):
|
|
|
|
def check(self):
|
|
""" Should return tuple:
|
|
(true/false, data)
|
|
|
|
data is the raw result and can be saved in a database
|
|
"""
|
|
pass
|
|
|
|
|
|
|
|
class DNSCheck(BaseCheck):
|
|
def __init__(self, name, rr_type="AAAA", expected_result=None, server=None):
|
|
|
|
self.rr_types = [ "AAAA", "A", "PTR", "TXT" ]
|
|
self.name = name
|
|
self.rr_type = rr_type
|
|
self.server = server
|
|
|
|
self.expected_result = expected_result
|
|
|
|
if not self.rr_type in self.rr_types:
|
|
raise WrongParameterException("Unsupported rr_type: {}".format(self.rr_type))
|
|
|
|
self.command = self.rr_type_to_command()
|
|
|
|
def rr_type_to_command(self):
|
|
base_cmd="dig +short"
|
|
|
|
if self.rr_type == "AAAA":
|
|
command = "{} {} aaaa".format(base_cmd, self.name)
|
|
elif self.rr_type == "A":
|
|
command = "{} {} a".format(base_cmd, self.name)
|
|
elif self.rr_type == "PTR":
|
|
command = "-x {}".format(base_cmd, self.name)
|
|
elif self.rr_type == "TXT":
|
|
command = "{} {} txt".format(base_cmd, self.name)
|
|
|
|
if self.server:
|
|
command = "{} @{}".format(command, self.server)
|
|
|
|
return command.split()
|
|
|
|
|
|
def check(self):
|
|
res = subprocess.run(self.command,
|
|
capture_output=True,
|
|
encoding="utf-8")
|
|
|
|
if not res.returncode == 0:
|
|
return (False, "")
|
|
|
|
|
|
if self.expected_result:
|
|
if not self.expected_result == res.stdout:
|
|
return (False, res.stdout)
|
|
|
|
else:
|
|
if res.stdout == "":
|
|
return (False, res.stdout)
|
|
|
|
# Has (correct) data
|
|
|
|
return (True, res.stdout)
|
|
|
|
def __repr__(self):
|
|
return "<Check {}>".format(self.__str__())
|
|
|
|
def __str__(self):
|
|
name = "{}/{}".format(self.name, self.rr_type)
|
|
|
|
if self.server:
|
|
name = "{}@{}".format(name, self.server)
|
|
|
|
return name
|
|
|
|
|
|
class DFCheck(BaseCheck):
|
|
def __init__(self, name, username="root", path="/"):
|
|
self.name = name
|
|
self.username = username
|
|
self.path = path
|
|
|
|
self.command = self.create_command()
|
|
log.info("Command = {}".format(self.command))
|
|
|
|
def create_command(self):
|
|
base_command ='ssh {}@{}'.format(self.username, self.name).split()
|
|
base_command.append("df {}".format(self.path))
|
|
|
|
return base_command
|
|
|
|
def check(self):
|
|
res = subprocess.run(self.command,
|
|
capture_output=True,
|
|
encoding="utf-8")
|
|
|
|
if not res.returncode == 0:
|
|
return (False, "")
|
|
|
|
return (True, res.stdout)
|
|
|
|
def __repr__(self):
|
|
return "<df {}>".format(self.__str__())
|
|
|
|
def __str__(self):
|
|
name = "{}:{}".format(self.name, self.path)
|
|
|
|
return name
|