new, simpler, cleaner, experimental timeout mechanism
This commit is contained in:
parent
615a070918
commit
b198048ed8
1 changed files with 42 additions and 22 deletions
|
@ -1,9 +1,13 @@
|
|||
import etcd3
|
||||
import json
|
||||
import signal
|
||||
import threading
|
||||
import time
|
||||
|
||||
from dataclasses import dataclass
|
||||
from collections import namedtuple
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from os import path
|
||||
|
||||
PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"])
|
||||
|
||||
|
@ -47,26 +51,42 @@ class Etcd3Wrapper(object):
|
|||
for entry in r:
|
||||
yield EtcdEntry(*entry[::-1], value_in_json=value_in_json)
|
||||
|
||||
def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs):
|
||||
# def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs):
|
||||
# if timeout > 0:
|
||||
# signal.signal(signal.SIGALRM, raise_timeout)
|
||||
# while True:
|
||||
# try:
|
||||
# if timeout > 0:
|
||||
# signal.alarm(timeout)
|
||||
# r, _ = self.client.watch_prefix(*args, **kwargs)
|
||||
|
||||
# for event in r:
|
||||
# signal.alarm(0)
|
||||
# # if e.value is None don't propagate its value
|
||||
# if event.value is None:
|
||||
# continue
|
||||
# event = EtcdEntry(event, event.value, value_in_json=value_in_json)
|
||||
# signal.alarm(timeout)
|
||||
# yield event
|
||||
# except TimeoutError:
|
||||
# _value = {"status": "TIMEOUT"}
|
||||
# yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value)))
|
||||
|
||||
def watch_prefix(self, key_prefix, timeout=0, value_in_json=False, **kwargs):
|
||||
r, _ = self.client.watch_prefix(key_prefix, **kwargs)
|
||||
if timeout > 0:
|
||||
signal.signal(signal.SIGALRM, raise_timeout)
|
||||
while True:
|
||||
try:
|
||||
if timeout > 0:
|
||||
signal.alarm(timeout)
|
||||
r, _ = self.client.watch_prefix(*args, **kwargs)
|
||||
t = threading.Thread(target=create_timeout_event,
|
||||
args=(timeout, key_prefix, self.client))
|
||||
t.start()
|
||||
|
||||
for event in r:
|
||||
if event.value:
|
||||
event = EtcdEntry(event, event.value, value_in_json=value_in_json)
|
||||
yield event
|
||||
|
||||
for event in r:
|
||||
# if e.value is None don't propagate its value
|
||||
if event.value is None:
|
||||
continue
|
||||
event = EtcdEntry(event, event.value, value_in_json=value_in_json)
|
||||
yield event
|
||||
except TimeoutError:
|
||||
_value = {"status": "TIMEOUT"}
|
||||
yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value)))
|
||||
|
||||
|
||||
def raise_timeout(signum, frame):
|
||||
_ = signum, frame
|
||||
raise TimeoutError()
|
||||
def create_timeout_event(timeout, key_prefix, etcd_client):
|
||||
while True:
|
||||
time.sleep(timeout)
|
||||
_value = {"status": "TIMEOUT", "last_timeout": datetime.utcnow().isoformat()}
|
||||
_key = path.join(f"{key_prefix}", "TIMEOUT")
|
||||
etcd_client.put(_key, _value, value_in_json=True)
|
||||
|
|
Loading…
Reference in a new issue