uncloud/uncloud/common/etcd_wrapper.py

105 lines
3.5 KiB
Python

import etcd3
import json
from functools import wraps
from uncloud import UncloudException
from uncloud.common import logger
from typing import Iterator
class EtcdEntry:
def __init__(self, meta_or_key, value, value_in_json=False):
if hasattr(meta_or_key, 'key'):
# if meta has attr 'key' then get it
self.key = meta_or_key.key.decode('utf-8')
else:
# otherwise meta is the 'key'
self.key = meta_or_key
self.value = value.decode('utf-8')
if value_in_json:
self.value = json.loads(self.value)
def readable_errors(func):
@wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except etcd3.exceptions.ConnectionFailedError:
raise UncloudException('Cannot connect to etcd: is etcd running as configured in uncloud.conf?')
except etcd3.exceptions.ConnectionTimeoutError as err:
raise etcd3.exceptions.ConnectionTimeoutError('etcd connection timeout.') from err
except Exception:
logger.exception('Some etcd error occured. See syslog for details.')
return wrapper
class Etcd3Wrapper:
@readable_errors
def __init__(self, *args, **kwargs):
self.client = etcd3.client(*args, **kwargs)
@readable_errors
def get(self, *args, value_in_json=False, **kwargs) -> EtcdEntry:
"""Get a key/value pair from etcd
:return:
EtcdEntry: if a key/value pair is found in etcd
:raises:
KeyError: If key is not found in etcd
Exception: Different type of exception can be raised depending on
situation
"""
_value, _key = self.client.get(*args, **kwargs)
if _key is None or _value is None:
raise KeyError
return EtcdEntry(_key, _value, value_in_json=value_in_json)
@readable_errors
def put(self, *args, value_in_json=False, **kwargs):
"""Put key/value pair in etcd
:return: a response containing a header and the prev_kv
:raises:
Exception: Different type of exception can be raised depending on
situation
"""
_key, _value = args
if value_in_json:
_value = json.dumps(_value)
if not isinstance(_key, str):
_key = _key.decode('utf-8')
return self.client.put(_key, _value, **kwargs)
@readable_errors
def get_prefix(self, *args, value_in_json=False, **kwargs) -> \
Iterator[EtcdEntry]:
event_iterator = self.client.get_prefix(*args, **kwargs)
for e in event_iterator:
yield EtcdEntry(*e[::-1], value_in_json=value_in_json)
@readable_errors
def watch_prefix(self, key, raise_exception=True, value_in_json=False) -> Iterator[EtcdEntry]:
try:
event_iterator, cancel = self.client.watch_prefix(key)
for e in event_iterator:
if hasattr(e, '_event'):
e = e._event
if e.type == e.PUT:
yield EtcdEntry(e.kv.key, e.kv.value, value_in_json=value_in_json)
except Exception as err:
if raise_exception:
raise Exception('Exception in etcd_wrapper.get_prefix') from err
else:
logger.exception('Error in etcd_wrapper.watch_prefix')
try:
cancel()
except Exception:
pass
return iter([])