import etcd3 import json import queue import copy from uncloud import UncloudException from collections import namedtuple from functools import wraps from . import logger PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"]) class EtcdEntry: # key: str # value: str def __init__(self, meta, value, value_in_json=False): self.key = meta.key.decode("utf-8") self.value = value.decode("utf-8") if value_in_json: self.value = json.loads(self.value) def readable_errors(func): @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except etcd3.exceptions.ConnectionFailedError as err: raise UncloudException( "Cannot connect to etcd: is etcd running as configured in uncloud.conf?" ) except etcd3.exceptions.ConnectionTimeoutError as err: raise etcd3.exceptions.ConnectionTimeoutError( "etcd connection timeout." ) from err except Exception: logger.exception( "Some etcd error occured. See syslog for details." ) return wrapper class Etcd3Wrapper: @readable_errors def __init__(self, *args, **kwargs): self.client = etcd3.client(*args, **kwargs) @readable_errors def get(self, *args, value_in_json=False, **kwargs): _value, _key = self.client.get(*args, **kwargs) if _key is None or _value is None: return None return EtcdEntry(_key, _value, value_in_json=value_in_json) @readable_errors def put(self, *args, value_in_json=False, **kwargs): _key, _value = args if value_in_json: _value = json.dumps(_value) if not isinstance(_key, str): _key = _key.decode("utf-8") return self.client.put(_key, _value, **kwargs) @readable_errors def get_prefix(self, *args, value_in_json=False, **kwargs): r = self.client.get_prefix(*args, **kwargs) for entry in r: e = EtcdEntry(*entry[::-1], value_in_json=value_in_json) if e.value: yield e @readable_errors def watch_prefix(self, key, timeout=0, value_in_json=False): timeout_event = EtcdEntry( PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode( json.dumps({"status": "TIMEOUT", "type": "TIMEOUT"}) ), value_in_json=value_in_json, ) event_queue = queue.Queue() def add_event_to_queue(event): if hasattr(event, "events"): for e in event.events: if e.value: event_queue.put( EtcdEntry( e, e.value, value_in_json=value_in_json ) ) self.client.add_watch_prefix_callback(key, add_event_to_queue) while True: try: while True: v = event_queue.get(timeout=timeout) yield v except queue.Empty: event_queue.put(copy.deepcopy(timeout_event)) class PsuedoEtcdEntry(EtcdEntry): def __init__(self, key, value, value_in_json=False): super().__init__( PseudoEtcdMeta(key=key.encode("utf-8")), value, value_in_json=value_in_json, )