ungleich-otp/nameko1.py
2018-10-26 18:27:58 +02:00

119 lines
2.7 KiB
Python

from nameko.events import EventDispatcher, event_handler
from nameko.rpc import rpc
import json
from nameko.web.handlers import http
from nameko.timer import timer
from nameko.rpc import rpc, RpcProxy
import pyotp
class ServiceA:
""" Event dispatching service. """
name = "service_a"
dispatch = EventDispatcher()
@rpc
def dispatching_method(self, payload):
self.dispatch("event_type", payload)
class ServiceB:
""" Event listening service. """
name = "service_b"
@event_handler("service_a", "event_type")
def handle_event(self, payload):
print("service b received:", payload)
class HttpService:
name = "http_service"
@http('GET', '/get/<int:value>')
def get_method(self, request, value):
return json.dumps({'value': value})
@http('POST', '/post')
def do_post(self, request):
return u"received: {}".format(request.get_data(as_text=True))
@http('GET,PUT,POST,DELETE', '/multi')
def do_multi(self, request):
return request.method
class ServiceTimer:
name ="servicetimer"
dispatch = EventDispatcher()
@timer(interval=3)
def ping(self):
# method executed every second
print("pong")
self.dispatch("ping", "pong")
class LoggerService:
name = "loggerpoint"
@event_handler("servicetimer", "ping")
def handle_event(self, payload):
print("timing receive in logger: ", payload)
class OTPClient:
name = "generic-service-using-otp"
totp = pyotp.TOTP("JBSWY3DPEHPK3PXP")
otp = RpcProxy("otp")
@timer(interval=3)
def auth(self):
token = self.totp.now()
print("Verifying using {}".format(token))
print("Auth1: {}".format(self.otp.verify("app1", token)))
print("Auth-wrongapp: {}".format(self.otp.verify("app2", token)))
print("Auth-noapp: {}".format(self.otp.verify("appNOAPP", token)))
class OTPSeed:
name = "generic-service-using-otp-seed"
otp = RpcProxy("otp")
@timer(interval=10)
def auth(self):
seed = otp.get_seed("app1")
totp = pyotp.TOTP(seed)
print("seed / token: {} {}".format(seed, token))
class OTPService:
name = "otp"
otp_tokens = {
'app1': 'JBSWY3DPEHPK3PXP',
'app2': 'AIEIU3IAAA'
}
@rpc
def get_seed(self, appid):
if appid in self.otp_tokens:
return self.otp_tokens[appid]
else:
return "NO SEED"
@rpc
def verify(self, appid, token):
if not appid in self.otp_tokens:
return "NO SUCH APP {}".format(appid)
totp = pyotp.TOTP(self.otp_tokens[appid])
if totp.verify(token, valid_window=3):
return "OK"
else:
return "FAIL"