watch_prefix better implementation

This commit is contained in:
ahmadbilalkhalid 2019-08-12 21:52:53 +05:00
parent 9abc74e387
commit e63399ea5b
5 changed files with 30 additions and 48 deletions

3
.gitignore vendored Normal file → Executable file
View file

@ -1 +1,2 @@
__pycache__/ __pycache__/
.idea

0
Pipfile Normal file → Executable file
View file

0
Pipfile.lock generated Normal file → Executable file
View file

0
__init__.py Normal file → Executable file
View file

75
etcd_wrapper.py Normal file → Executable file
View file

@ -1,14 +1,10 @@
import etcd3 import etcd3
import json import json
import signal
import threading
import time
import queue import queue
import copy
from collections import namedtuple from collections import namedtuple
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime
from os import path
PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"]) PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"])
@ -21,7 +17,10 @@ class EtcdEntry:
self.value = value.decode("utf-8") self.value = value.decode("utf-8")
if value_in_json: if value_in_json:
self.value = json.loads(self.value) try:
self.value = json.loads(self.value)
except json.JSONDecodeError as e:
print(f"Json Error: {e}, value={value}")
key: str key: str
value: str value: str
@ -33,7 +32,7 @@ class Etcd3Wrapper(object):
def get(self, *args, value_in_json=False, **kwargs): def get(self, *args, value_in_json=False, **kwargs):
_value, _key = self.client.get(*args, **kwargs) _value, _key = self.client.get(*args, **kwargs)
if _key is None: if _key is None or _value is None:
return None return None
return EtcdEntry(_key, _value, value_in_json=value_in_json) return EtcdEntry(_key, _value, value_in_json=value_in_json)
@ -50,52 +49,34 @@ class Etcd3Wrapper(object):
def get_prefix(self, *args, value_in_json=False, **kwargs): def get_prefix(self, *args, value_in_json=False, **kwargs):
r = self.client.get_prefix(*args, **kwargs) r = self.client.get_prefix(*args, **kwargs)
for entry in r: for entry in r:
yield EtcdEntry(*entry[::-1], value_in_json=value_in_json) e = EtcdEntry(*entry[::-1], value_in_json=value_in_json)
if e.value:
yield e
# def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs): def watch_prefix(self, key, timeout=0, value_in_json=False):
# def raise_timeout(signum, frame): timeout_event = EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"),
# raise TimeoutError("timeout") value=str.encode(json.dumps({"status": "TIMEOUT",
"type": "TIMEOUT"})),
# r, _ = self.client.watch_prefix(*args, **kwargs) value_in_json=value_in_json)
# if timeout > 0: event_queue = queue.Queue()
# signal.signal(signal.SIGALRM, raise_timeout)
# while True: def add_event_to_queue(event):
# try: for e in event.events:
# signal.alarm(timeout) if e.value:
event_queue.put(EtcdEntry(e, e.value, value_in_json=value_in_json))
# for event in r: self.client.add_watch_prefix_callback(key, add_event_to_queue)
# signal.alarm(0)
# if event.value:
# event = EtcdEntry(event, event.value, value_in_json=value_in_json)
# signal.alarm(timeout)
# yield event
# else:
# signal.alarm(timeout)
# except TimeoutError:
# _value = {"status": "TIMEOUT"}
# yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value)))
def watch_prefix(self, key_prefix, timeout,
value_in_json=False, **kwargs):
q = queue.Queue()
r, _ = self.client.watch_prefix(key_prefix, **kwargs)
t = threading.Thread(target=watch_prefix_generator,
args=(value_in_json, r, q))
t.start()
while True: while True:
try: try:
v = q.get(timeout=timeout) while True:
yield v v = event_queue.get(timeout=timeout)
yield v
except queue.Empty: except queue.Empty:
_value = {"status": "TIMEOUT"} event_queue.put(copy.deepcopy(timeout_event))
e = EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value)))
yield e
def watch_prefix_generator(value_in_json, generator, q):
for event in generator: class PsuedoEtcdEntry(EtcdEntry):
if event.value: def __init__(self, key, value, value_in_json=False):
event = EtcdEntry(event, event.value) super().__init__(PseudoEtcdMeta(key=key.encode("utf-8")), value, value_in_json=value_in_json)
q.put(event)