diff --git a/etcd_wrapper.py b/etcd_wrapper.py index a631aa3..7b6617e 100644 --- a/etcd_wrapper.py +++ b/etcd_wrapper.py @@ -3,6 +3,7 @@ import json import signal import threading import time +import queue from collections import namedtuple from dataclasses import dataclass @@ -52,41 +53,49 @@ class Etcd3Wrapper(object): yield EtcdEntry(*entry[::-1], value_in_json=value_in_json) # def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs): + # def raise_timeout(signum, frame): + # raise TimeoutError("timeout") + + # r, _ = self.client.watch_prefix(*args, **kwargs) + # if timeout > 0: # signal.signal(signal.SIGALRM, raise_timeout) + # while True: # try: - # if timeout > 0: - # signal.alarm(timeout) - # r, _ = self.client.watch_prefix(*args, **kwargs) + # signal.alarm(timeout) # for event in r: # signal.alarm(0) - # # if e.value is None don't propagate its value - # if event.value is None: - # continue - # event = EtcdEntry(event, event.value, value_in_json=value_in_json) - # signal.alarm(timeout) - # yield event + # if event.value: + # event = EtcdEntry(event, event.value, value_in_json=value_in_json) + # signal.alarm(timeout) + # yield event + # else: + # signal.alarm(timeout) # except TimeoutError: # _value = {"status": "TIMEOUT"} # yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value))) - - def watch_prefix(self, key_prefix, timeout=0, value_in_json=False, **kwargs): - r, _ = self.client.watch_prefix(key_prefix, **kwargs) - if timeout > 0: - t = threading.Thread(target=create_timeout_event, - args=(timeout, key_prefix, self.client)) - t.start() - - for event in r: - if event.value: - event = EtcdEntry(event, event.value, value_in_json=value_in_json) - yield event -def create_timeout_event(timeout, key_prefix, etcd_client): - while True: - time.sleep(timeout) - _value = {"status": "TIMEOUT", "last_timeout": datetime.utcnow().isoformat()} - _key = path.join(f"{key_prefix}", "TIMEOUT") - etcd_client.put(_key.encode("utf-8"), json.dumps(_value)) + def watch_prefix(self, key_prefix, timeout, + value_in_json=False, **kwargs): + q = queue.Queue() + r, _ = self.client.watch_prefix(key_prefix, **kwargs) + t = threading.Thread(target=watch_prefix_generator, + args=(value_in_json, r, q)) + t.start() + + while True: + try: + v = q.get(timeout=timeout) + yield v + except queue.Empty: + _value = {"status": "TIMEOUT"} + e = EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value))) + yield e + +def watch_prefix_generator(value_in_json, generator, q): + for event in generator: + if event.value: + event = EtcdEntry(event, event.value) + q.put(event)