experimental watch_prefix with new timeout mechanism added
This commit is contained in:
parent
d079acadf2
commit
9abc74e387
1 changed files with 36 additions and 27 deletions
|
@ -3,6 +3,7 @@ import json
|
||||||
import signal
|
import signal
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
import queue
|
||||||
|
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
|
@ -52,41 +53,49 @@ class Etcd3Wrapper(object):
|
||||||
yield EtcdEntry(*entry[::-1], value_in_json=value_in_json)
|
yield EtcdEntry(*entry[::-1], value_in_json=value_in_json)
|
||||||
|
|
||||||
# def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs):
|
# def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs):
|
||||||
|
# def raise_timeout(signum, frame):
|
||||||
|
# raise TimeoutError("timeout")
|
||||||
|
|
||||||
|
# r, _ = self.client.watch_prefix(*args, **kwargs)
|
||||||
|
|
||||||
# if timeout > 0:
|
# if timeout > 0:
|
||||||
# signal.signal(signal.SIGALRM, raise_timeout)
|
# signal.signal(signal.SIGALRM, raise_timeout)
|
||||||
|
|
||||||
# while True:
|
# while True:
|
||||||
# try:
|
# try:
|
||||||
# if timeout > 0:
|
# signal.alarm(timeout)
|
||||||
# signal.alarm(timeout)
|
|
||||||
# r, _ = self.client.watch_prefix(*args, **kwargs)
|
|
||||||
|
|
||||||
# for event in r:
|
# for event in r:
|
||||||
# signal.alarm(0)
|
# signal.alarm(0)
|
||||||
# # if e.value is None don't propagate its value
|
# if event.value:
|
||||||
# if event.value is None:
|
# event = EtcdEntry(event, event.value, value_in_json=value_in_json)
|
||||||
# continue
|
# signal.alarm(timeout)
|
||||||
# event = EtcdEntry(event, event.value, value_in_json=value_in_json)
|
# yield event
|
||||||
# signal.alarm(timeout)
|
# else:
|
||||||
# yield event
|
# signal.alarm(timeout)
|
||||||
# except TimeoutError:
|
# except TimeoutError:
|
||||||
# _value = {"status": "TIMEOUT"}
|
# _value = {"status": "TIMEOUT"}
|
||||||
# yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value)))
|
# yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value)))
|
||||||
|
|
||||||
def watch_prefix(self, key_prefix, timeout=0, value_in_json=False, **kwargs):
|
|
||||||
r, _ = self.client.watch_prefix(key_prefix, **kwargs)
|
|
||||||
if timeout > 0:
|
|
||||||
t = threading.Thread(target=create_timeout_event,
|
|
||||||
args=(timeout, key_prefix, self.client))
|
|
||||||
t.start()
|
|
||||||
|
|
||||||
for event in r:
|
|
||||||
if event.value:
|
|
||||||
event = EtcdEntry(event, event.value, value_in_json=value_in_json)
|
|
||||||
yield event
|
|
||||||
|
|
||||||
def create_timeout_event(timeout, key_prefix, etcd_client):
|
def watch_prefix(self, key_prefix, timeout,
|
||||||
while True:
|
value_in_json=False, **kwargs):
|
||||||
time.sleep(timeout)
|
q = queue.Queue()
|
||||||
_value = {"status": "TIMEOUT", "last_timeout": datetime.utcnow().isoformat()}
|
r, _ = self.client.watch_prefix(key_prefix, **kwargs)
|
||||||
_key = path.join(f"{key_prefix}", "TIMEOUT")
|
t = threading.Thread(target=watch_prefix_generator,
|
||||||
etcd_client.put(_key.encode("utf-8"), json.dumps(_value))
|
args=(value_in_json, r, q))
|
||||||
|
t.start()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
v = q.get(timeout=timeout)
|
||||||
|
yield v
|
||||||
|
except queue.Empty:
|
||||||
|
_value = {"status": "TIMEOUT"}
|
||||||
|
e = EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value)))
|
||||||
|
yield e
|
||||||
|
|
||||||
|
def watch_prefix_generator(value_in_json, generator, q):
|
||||||
|
for event in generator:
|
||||||
|
if event.value:
|
||||||
|
event = EtcdEntry(event, event.value)
|
||||||
|
q.put(event)
|
||||||
|
|
Loading…
Reference in a new issue