diff --git a/etcd_wrapper.py b/etcd_wrapper.py index 23b6223..26d0edc 100644 --- a/etcd_wrapper.py +++ b/etcd_wrapper.py @@ -1,9 +1,13 @@ import etcd3 import json import signal +import threading +import time -from dataclasses import dataclass from collections import namedtuple +from dataclasses import dataclass +from datetime import datetime +from os import path PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"]) @@ -47,26 +51,42 @@ class Etcd3Wrapper(object): for entry in r: yield EtcdEntry(*entry[::-1], value_in_json=value_in_json) - def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs): + # def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs): + # if timeout > 0: + # signal.signal(signal.SIGALRM, raise_timeout) + # while True: + # try: + # if timeout > 0: + # signal.alarm(timeout) + # r, _ = self.client.watch_prefix(*args, **kwargs) + + # for event in r: + # signal.alarm(0) + # # if e.value is None don't propagate its value + # if event.value is None: + # continue + # event = EtcdEntry(event, event.value, value_in_json=value_in_json) + # signal.alarm(timeout) + # yield event + # except TimeoutError: + # _value = {"status": "TIMEOUT"} + # yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value))) + + def watch_prefix(self, key_prefix, timeout=0, value_in_json=False, **kwargs): + r, _ = self.client.watch_prefix(key_prefix, **kwargs) if timeout > 0: - signal.signal(signal.SIGALRM, raise_timeout) - while True: - try: - if timeout > 0: - signal.alarm(timeout) - r, _ = self.client.watch_prefix(*args, **kwargs) + t = threading.Thread(target=create_timeout_event, + args=(timeout, key_prefix, self.client)) + t.start() + + for event in r: + if event.value: + event = EtcdEntry(event, event.value, value_in_json=value_in_json) + yield event - for event in r: - # if e.value is None don't propagate its value - if event.value is None: - continue - event = EtcdEntry(event, event.value, value_in_json=value_in_json) - yield event - except TimeoutError: - _value = {"status": "TIMEOUT"} - yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value))) - - -def raise_timeout(signum, frame): - _ = signum, frame - raise TimeoutError() +def create_timeout_event(timeout, key_prefix, etcd_client): + while True: + time.sleep(timeout) + _value = {"status": "TIMEOUT", "last_timeout": datetime.utcnow().isoformat()} + _key = path.join(f"{key_prefix}", "TIMEOUT") + etcd_client.put(_key, _value, value_in_json=True)