#!/usr/bin/env python3 import aiohttp from aiohttp import web import asyncio import argparse from prometheus_client import Gauge from prometheus_client.exposition import generate_latest import json import re async def fetch(session, url): url_report = f"{url}/report" device_name = re.sub('[/:.-]', '_', re.sub('(http|https)://','',url)) async with session.get(url_report) as response: response = json.loads(await response.text()) return (device_name, response['power'], response['temperature']) async def main(devices): async with aiohttp.ClientSession() as session: tasks = [] for device in devices: tasks.append(fetch(session, device)) results = await asyncio.gather(*tasks) class MyStrom(object): def __init__(self, devices): self.devices = devices self.usage = Gauge('mystrom_usage_watts', 'Electricity usage of the device', ['device']) self.temperature = Gauge('mystrom_temperature_celsius', 'Temperature', ['device']) async def fetch(self, session, url): url_report = f"{url}/report" device_name = re.sub('[/:.-]', '_', re.sub('(http|https)://','',url)) async with session.get(url_report) as response: response = json.loads(await response.text()) return (device_name, response['power'], response['temperature']) async def get(self, request): tasks = [] async with aiohttp.ClientSession() as session: for device in self.devices: tasks.append(self.fetch(session, device)) results = await asyncio.gather(*tasks) for res in results: self.usage.labels(device=res[0]).set(res[1]) self.temperature.labels(device=res[0]).set(res[2]) return web.Response(text=generate_latest().decode('utf-8')) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Export usage data from mystrom devices via prometheus') parser.add_argument('device', nargs='+', help="URL to your mystrom device(s)") args = parser.parse_args() mystrom = MyStrom(args.device) app = web.Application() app.add_routes([web.get('/metrics', mystrom.get)]) web.run_app(app, host='::')