From e63399ea5b2e2377874f140c81b83196edf3ea44 Mon Sep 17 00:00:00 2001 From: Ahmed Bilal Khalid Date: Mon, 12 Aug 2019 21:52:53 +0500 Subject: [PATCH] watch_prefix better implementation --- .gitignore | 3 +- Pipfile | 0 Pipfile.lock | 0 __init__.py | 0 etcd_wrapper.py | 75 ++++++++++++++++++------------------------------- 5 files changed, 30 insertions(+), 48 deletions(-) mode change 100644 => 100755 .gitignore mode change 100644 => 100755 Pipfile mode change 100644 => 100755 Pipfile.lock mode change 100644 => 100755 __init__.py mode change 100644 => 100755 etcd_wrapper.py diff --git a/.gitignore b/.gitignore old mode 100644 new mode 100755 index ba0430d..a031648 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -__pycache__/ \ No newline at end of file +__pycache__/ +.idea \ No newline at end of file diff --git a/Pipfile b/Pipfile old mode 100644 new mode 100755 diff --git a/Pipfile.lock b/Pipfile.lock old mode 100644 new mode 100755 diff --git a/__init__.py b/__init__.py old mode 100644 new mode 100755 diff --git a/etcd_wrapper.py b/etcd_wrapper.py old mode 100644 new mode 100755 index 7b6617e..8559b34 --- a/etcd_wrapper.py +++ b/etcd_wrapper.py @@ -1,14 +1,10 @@ import etcd3 import json -import signal -import threading -import time import queue +import copy from collections import namedtuple from dataclasses import dataclass -from datetime import datetime -from os import path PseudoEtcdMeta = namedtuple("PseudoEtcdMeta", ["key"]) @@ -21,7 +17,10 @@ class EtcdEntry: self.value = value.decode("utf-8") if value_in_json: - self.value = json.loads(self.value) + try: + self.value = json.loads(self.value) + except json.JSONDecodeError as e: + print(f"Json Error: {e}, value={value}") key: str value: str @@ -33,7 +32,7 @@ class Etcd3Wrapper(object): def get(self, *args, value_in_json=False, **kwargs): _value, _key = self.client.get(*args, **kwargs) - if _key is None: + if _key is None or _value is None: return None return EtcdEntry(_key, _value, value_in_json=value_in_json) @@ -50,52 +49,34 @@ class Etcd3Wrapper(object): def get_prefix(self, *args, value_in_json=False, **kwargs): r = self.client.get_prefix(*args, **kwargs) for entry in r: - yield EtcdEntry(*entry[::-1], value_in_json=value_in_json) + e = EtcdEntry(*entry[::-1], value_in_json=value_in_json) + if e.value: + yield e - # def watch_prefix(self, *args, timeout=0, value_in_json=False, **kwargs): - # def raise_timeout(signum, frame): - # raise TimeoutError("timeout") - - # r, _ = self.client.watch_prefix(*args, **kwargs) + def watch_prefix(self, key, timeout=0, value_in_json=False): + timeout_event = EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), + value=str.encode(json.dumps({"status": "TIMEOUT", + "type": "TIMEOUT"})), + value_in_json=value_in_json) - # if timeout > 0: - # signal.signal(signal.SIGALRM, raise_timeout) + event_queue = queue.Queue() - # while True: - # try: - # signal.alarm(timeout) + def add_event_to_queue(event): + for e in event.events: + if e.value: + event_queue.put(EtcdEntry(e, e.value, value_in_json=value_in_json)) - # for event in r: - # signal.alarm(0) - # if event.value: - # event = EtcdEntry(event, event.value, value_in_json=value_in_json) - # signal.alarm(timeout) - # yield event - # else: - # signal.alarm(timeout) - # except TimeoutError: - # _value = {"status": "TIMEOUT"} - # yield EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value))) - - def watch_prefix(self, key_prefix, timeout, - value_in_json=False, **kwargs): - q = queue.Queue() - r, _ = self.client.watch_prefix(key_prefix, **kwargs) - t = threading.Thread(target=watch_prefix_generator, - args=(value_in_json, r, q)) - t.start() + self.client.add_watch_prefix_callback(key, add_event_to_queue) while True: try: - v = q.get(timeout=timeout) - yield v + while True: + v = event_queue.get(timeout=timeout) + yield v except queue.Empty: - _value = {"status": "TIMEOUT"} - e = EtcdEntry(PseudoEtcdMeta(key=b"TIMEOUT"), value=str.encode(json.dumps(_value))) - yield e + event_queue.put(copy.deepcopy(timeout_event)) -def watch_prefix_generator(value_in_json, generator, q): - for event in generator: - if event.value: - event = EtcdEntry(event, event.value) - q.put(event) + +class PsuedoEtcdEntry(EtcdEntry): + def __init__(self, key, value, value_in_json=False): + super().__init__(PseudoEtcdMeta(key=key.encode("utf-8")), value, value_in_json=value_in_json)