timeout option added in etcd_wrapper.watch_prefix()
This commit is contained in:
parent
cb2a416a17
commit
615a070918
1 changed files with 28 additions and 8 deletions
|
@ -1,6 +1,11 @@
|
||||||
import etcd3
|
import etcd3
|
||||||
import json
|
import json
|
||||||
|
import signal
|
||||||
|
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
from collections import namedtuple
|
||||||
|
|
||||||
|
PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"])
|
||||||
|
|
||||||
|
|
||||||
@dataclass(init=False)
|
@dataclass(init=False)
|
||||||
|
@ -42,11 +47,26 @@ class Etcd3Wrapper(object):
|
||||||
for entry in r:
|
for entry in r:
|
||||||
yield EtcdEntry(*entry[::-1], value_in_json=value_in_json)
|
yield EtcdEntry(*entry[::-1], value_in_json=value_in_json)
|
||||||
|
|
||||||
def watch_prefix(self, *args, value_in_json=False, **kwargs):
|
def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs):
|
||||||
r, _ = self.client.watch_prefix(*args, **kwargs)
|
if timeout > 0:
|
||||||
for event in r:
|
signal.signal(signal.SIGALRM, raise_timeout)
|
||||||
# if e.value is None don't propagate its value
|
while True:
|
||||||
if event.value is None:
|
try:
|
||||||
continue
|
if timeout > 0:
|
||||||
event = EtcdEntry(event, event.value, value_in_json=value_in_json)
|
signal.alarm(timeout)
|
||||||
yield event
|
r, _ = self.client.watch_prefix(*args, **kwargs)
|
||||||
|
|
||||||
|
for event in r:
|
||||||
|
# if e.value is None don't propagate its value
|
||||||
|
if event.value is None:
|
||||||
|
continue
|
||||||
|
event = EtcdEntry(event, event.value, value_in_json=value_in_json)
|
||||||
|
yield event
|
||||||
|
except TimeoutError:
|
||||||
|
_value = {"status": "TIMEOUT"}
|
||||||
|
yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value)))
|
||||||
|
|
||||||
|
|
||||||
|
def raise_timeout(signum, frame):
|
||||||
|
_ = signum, frame
|
||||||
|
raise TimeoutError()
|
||||||
|
|
Loading…
Reference in a new issue