diff --git a/etcd_wrapper.py b/etcd_wrapper.py index 7c86771..23b6223 100644 --- a/etcd_wrapper.py +++ b/etcd_wrapper.py @@ -1,6 +1,11 @@ import etcd3 import json +import signal + from dataclasses import dataclass +from collections import namedtuple + +PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"]) @dataclass(init=False) @@ -42,11 +47,26 @@ class Etcd3Wrapper(object): for entry in r: yield EtcdEntry(*entry[::-1], value_in_json=value_in_json) - def watch_prefix(self, *args, value_in_json=False, **kwargs): - r, _ = self.client.watch_prefix(*args, **kwargs) - for event in r: - # if e.value is None don't propagate its value - if event.value is None: - continue - event = EtcdEntry(event, event.value, value_in_json=value_in_json) - yield event \ No newline at end of file + def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs): + if timeout > 0: + signal.signal(signal.SIGALRM, raise_timeout) + while True: + try: + if timeout > 0: + signal.alarm(timeout) + r, _ = self.client.watch_prefix(*args, **kwargs) + + for event in r: + # if e.value is None don't propagate its value + if event.value is None: + continue + event = EtcdEntry(event, event.value, value_in_json=value_in_json) + yield event + except TimeoutError: + _value = {"status": "TIMEOUT"} + yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value))) + + +def raise_timeout(signum, frame): + _ = signum, frame + raise TimeoutError()