etcd3_wrapper/etcd_wrapper.py

92 lines
3.1 KiB
Python

import etcd3
import json
import signal
import threading
import time
from collections import namedtuple
from dataclasses import dataclass
from datetime import datetime
from os import path
PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"])
@dataclass(init=False)
class EtcdEntry:
def __init__(self, meta, value, value_in_json=False):
self.key = meta.key.decode("utf-8")
self.value = value.decode("utf-8")
if value_in_json:
self.value = json.loads(self.value)
key: str
value: str
class Etcd3Wrapper(object):
def __init__(self, *args, **kwargs):
self.client = etcd3.client(*args, **kwargs)
def get(self, *args, value_in_json=False, **kwargs):
_value, _key = self.client.get(*args, **kwargs)
if _key is None:
return None
return EtcdEntry(_key, _value, value_in_json=value_in_json)
def put(self, *args, value_in_json=False, **kwargs):
_key, _value = args
if value_in_json:
_value = json.dumps(_value)
if not isinstance(_key, str):
_key = _key.decode("utf-8")
return self.client.put(_key, _value, **kwargs)
def get_prefix(self, *args, value_in_json=False, **kwargs):
r = self.client.get_prefix(*args, **kwargs)
for entry in r:
yield EtcdEntry(*entry[::-1], value_in_json=value_in_json)
# def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs):
# if timeout > 0:
# signal.signal(signal.SIGALRM, raise_timeout)
# while True:
# try:
# if timeout > 0:
# signal.alarm(timeout)
# r, _ = self.client.watch_prefix(*args, **kwargs)
# for event in r:
# signal.alarm(0)
# # if e.value is None don't propagate its value
# if event.value is None:
# continue
# event = EtcdEntry(event, event.value, value_in_json=value_in_json)
# signal.alarm(timeout)
# yield event
# except TimeoutError:
# _value = {"status": "TIMEOUT"}
# yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value)))
def watch_prefix(self, key_prefix, timeout=0, value_in_json=False, **kwargs):
r, _ = self.client.watch_prefix(key_prefix, **kwargs)
if timeout > 0:
t = threading.Thread(target=create_timeout_event,
args=(timeout, key_prefix, self.client))
t.start()
for event in r:
if event.value:
event = EtcdEntry(event, event.value, value_in_json=value_in_json)
yield event
def create_timeout_event(timeout, key_prefix, etcd_client):
while True:
time.sleep(timeout)
_value = {"status": "TIMEOUT", "last_timeout": datetime.utcnow().isoformat()}
_key = path.join(f"{key_prefix}", "TIMEOUT")
etcd_client.put(_key.encode("utf-8"), json.dumps(_value))