lorawan/python/lorautil.py

116 lines
3.1 KiB
Python
Raw Normal View History

2016-11-02 16:10:07 +00:00
# Helper functions for various Lora receivers
# Nico Schottelius <nico.schottelius -at- ungleich.ch>
# 2016-11-02
# GPLv3+
2016-11-02 16:13:04 +00:00
import psycopg2
2016-11-02 16:22:43 +00:00
import json
2016-11-02 16:42:39 +00:00
import logging
import select
import psycopg2
import psycopg2.extensions
import sys
import time
import websocket
2016-11-13 19:54:58 +00:00
logging.basicConfig(format='%(levelname)s: %(message)s')
2016-11-13 19:41:22 +00:00
log = logging.getLogger(__name__)
2016-11-02 16:13:04 +00:00
2016-11-02 16:10:07 +00:00
dbname="lorawan"
def db_notify(provider, payload='', deveui=''):
notify="{}:{}".format(deveui, payload)
2016-11-02 16:42:39 +00:00
log.debug("Notify: {} {}".format(provider, notify))
2016-11-02 16:10:07 +00:00
try:
conn = psycopg2.connect("dbname={}".format(dbname))
cursor = conn.cursor()
cursor.execute("select pg_notify (%s, %s)", (provider, notify))
cursor.connection.commit()
except Exception as e:
2016-11-02 16:42:39 +00:00
log.error("DB Notify failed: %s" % e)
2016-11-02 16:10:07 +00:00
def db_insert_json(provider, data, payload='', deveui=''):
try:
conn = psycopg2.connect("dbname={}".format(dbname))
cursor = conn.cursor()
cursor.execute("insert into packets values (DEFAULT, DEFAULT, %s, %s, %s, %s)", (provider, data, payload, deveui))
cursor.connection.commit()
conn.close()
except Exception as e:
2016-11-02 16:42:39 +00:00
log.error("DB Insert failed: %s" % e)
2016-11-02 16:22:43 +00:00
2016-11-15 15:39:12 +00:00
2016-11-02 16:44:01 +00:00
def jsonToDict(data):
2016-11-02 16:22:43 +00:00
return json.loads(data)
def nodered_from_stdin():
provider = sys.argv[1]
for line in sys.stdin:
print("{} -> {}".format(provider, line))
nodered_send(provider,line)
time.sleep(0.1)
2016-11-15 15:39:12 +00:00
def nodered_send(path, data):
ws = websocket.create_connection("ws://localhost:1880/{}".format(path))
ws.send("%s" % data)
ws.close()
channels = [ "loriot", "swisscom", "ttn" ]
def pg_conn_notify():
conns = []
for channel in channels:
conn = psycopg2.connect("dbname={}".format(dbname))
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
curs.execute("LISTEN {};".format(channel))
conns.append(conn)
log.debug("Waiting for notifications on channel {}".format(channel))
2016-11-12 16:49:49 +00:00
return conns
def pg_wait_for_pkg(conns, callback):
readable, writable, exceptional = select.select(conns,[],[])
for conn in readable:
conn.poll()
while conn.notifies:
notify = conn.notifies.pop(0)
log.debug("Got NOTIFY: {} {} {}".format(notify.pid, notify.channel, notify.payload))
callback(notify.channel, notify.payload)
2016-11-15 15:39:12 +00:00
class DB(object):
2016-11-15 16:15:01 +00:00
def __init__(self, query):
2016-11-15 15:39:12 +00:00
self.query = query
@classmethod
def gps_query(cls, since="1 day"):
2016-11-15 15:42:58 +00:00
return cls("select payload from packets where payload like 'lat%' and received_dt > NOW() - '{}'::INTERVAL".format(since))
2016-11-15 15:39:12 +00:00
def __iter__(self):
try:
self.conn = psycopg2.connect("dbname={}".format(dbname))
2016-11-15 16:19:12 +00:00
self.cursor = self.conn.cursor()
2016-11-15 15:39:12 +00:00
self.cursor.execute(self.query)
except Exception as e:
log.error("DB query failed: %s" % e)
raise
return self
def __next__(self):
data = self.cursor.fetchone()
if not data:
self.conn.close()
raise StopIteration
return data