import etcd3 import json from functools import wraps from uncloud import UncloudException from uncloud.common import logger class EtcdEntry: def __init__(self, meta_or_key, value, value_in_json=False): if hasattr(meta_or_key, 'key'): # if meta has attr 'key' then get it self.key = meta_or_key.key.decode('utf-8') else: # otherwise meta is the 'key' self.key = meta_or_key self.value = value.decode('utf-8') if value_in_json: self.value = json.loads(self.value) def readable_errors(func): @wraps(func) def wrapper(*args, **kwargs): try: return func(*args, **kwargs) except etcd3.exceptions.ConnectionFailedError: raise UncloudException('Cannot connect to etcd: is etcd running as configured in uncloud.conf?') except etcd3.exceptions.ConnectionTimeoutError as err: raise etcd3.exceptions.ConnectionTimeoutError('etcd connection timeout.') from err except Exception: logger.exception('Some etcd error occured. See syslog for details.') return wrapper class Etcd3Wrapper: @readable_errors def __init__(self, *args, **kwargs): self.client = etcd3.client(*args, **kwargs) @readable_errors def get(self, *args, value_in_json=False, **kwargs): _value, _key = self.client.get(*args, **kwargs) if _key is None or _value is None: return None return EtcdEntry(_key, _value, value_in_json=value_in_json) @readable_errors def put(self, *args, value_in_json=False, **kwargs): _key, _value = args if value_in_json: _value = json.dumps(_value) if not isinstance(_key, str): _key = _key.decode('utf-8') return self.client.put(_key, _value, **kwargs) @readable_errors def get_prefix(self, *args, value_in_json=False, raise_exception=True, **kwargs): try: event_iterator = self.client.get_prefix(*args, **kwargs) for e in event_iterator: yield EtcdEntry(*e[::-1], value_in_json=value_in_json) except Exception as err: if raise_exception: raise Exception('Exception in etcd_wrapper.get_prefix') from err else: logger.exception('Error in etcd_wrapper') return iter([]) @readable_errors def watch_prefix(self, key, raise_exception=True, value_in_json=False): try: event_iterator, cancel = self.client.watch_prefix(key) for e in event_iterator: if hasattr(e, '_event'): e = e._event if e.type == e.PUT: yield EtcdEntry(e.kv.key, e.kv.value, value_in_json=value_in_json) except Exception as err: if raise_exception: raise Exception('Exception in etcd_wrapper.get_prefix') from err else: logger.exception('Error in etcd_wrapper.watch_prefix') try: cancel() except Exception: pass return iter([])