cloak/x.py

161 lines
4.4 KiB
Python

from __future__ import print_function
import bottle as b
import base64
import ssl
import threading
import zlib
import sys
import functools
from Crypto.Cipher import AES
# fill with your values
# number of bytes; if less than or equals to zero then unlimited
MAXSIZE = 104857600 # max 100MB
PRODUCTION_HOST = '188.226.169.172'
PRODUCTION_PORT = 80
DEV_HOST = 'localhost'
DEV_PORT = 8080
URL_PREFIX = 'x'
# AES encryption stuff
# fill with your values
AES_MODE = AES.MODE_CFB
AES_IV = r'A123B890JKL @%#$'
AES_KEY = r')(*+AKIM 313 321kjah.;klk@sfsd%$'
debug = False
py3k = sys.version_info >= (3, 0, 0)
if py3k:
import urllib.request as urlreq
import urllib.parse as urlpar
stdout_write = sys.stdout.buffer.write
def get_content_length(r):
foo = r.getheader('Content-Length')
if foo:
return int(foo)
else:
return None
else:
import urllib2 as urlreq
import urllib as urlpar
stdout_write = sys.stdout.write
def get_content_length(r):
foo = r.info().getheaders('Content-Length')
if foo and len(foo) >= 1:
return int(foo[0])
else:
return None
urlunquote = urlpar.unquote
if hasattr(ssl, 'create_default_context'):
SSL_NO_VERIFY_CTX = ssl.create_default_context()
SSL_NO_VERIFY_CTX.check_hostname = False
SSL_NO_VERIFY_CTX.verify_mode = ssl.CERT_NONE
urlopen = functools.partial(urlreq.urlopen, context=SSL_NO_VERIFY_CTX)
else:
urlopen = urlreq.urlopen
lock = threading.Lock()
# TODO: implement key rotation or something similar
def aes_key():
return AES_KEY
# TODO: implement IV rotation or something similar
def aes_iv():
return AES_IV
def aes_new():
return AES.new(aes_key(), AES_MODE, aes_iv())
@b.route('/')
def x():
return "/" + URL_PREFIX + "/TARGET"
@b.route('/' + URL_PREFIX + '/<url:re:.+>')
def x(url):
global debug
# allow only one request at a time
with lock:
durl = str.encode(url)
durl = base64.urlsafe_b64decode(durl)
durl = zlib.decompress(durl)
aes = aes_new()
durl = aes.decrypt(durl)
durl = urlunquote(durl.decode())
if debug:
print('fetching {}'.format(durl))
foo = urlopen(durl)
content_length = get_content_length(foo)
if debug:
print('content-length: {}'.format(content_length))
if content_length is None:
x = str.encode('No Content-Length\n')
elif MAXSIZE > 0 and content_length > MAXSIZE:
x = str.encode('Too big\n')
else:
x = foo.read()
b.response.set_header('Content-Type', "text/plain")
b.response.set_header('Content-Disposition',
"attachment; filename=DATA.txt")
aes = aes_new()
return base64.b64encode(zlib.compress(aes.encrypt(x)))
if __name__ == "__main__":
argc = len(sys.argv)
if argc > 1:
aes = aes_new()
if sys.argv[1] == '-h':
print("usage: {0} -h print help".format(sys.argv[0]))
print("usage: {0} c TARGET encode TARGET".format(sys.argv[0]))
print("usage: {0} d decode stdin".format(sys.argv[0]))
print("usage: {0} ANY run server in debug mode on"
"8080".format(sys.argv[0]))
print("usage: {0} run server on 80".format(sys.argv[0]))
sys.exit(0)
elif sys.argv[1] == 'c':
if argc < 3:
print("missing target")
sys.exit(1)
else:
# encode target url
if sys.argv[1] == 'c':
foo = str.encode(sys.argv[2])
foo = aes.encrypt(foo)
foo = zlib.compress(foo)
foo = base64.urlsafe_b64encode(foo)
stdout_write(foo)
print()
sys.exit(0)
# decode data from stdin
elif sys.argv[1] == 'd':
foo = str.encode(sys.stdin.read())
foo = base64.b64decode(foo)
foo = zlib.decompress(foo)
foo = aes.decrypt(foo)
stdout_write(foo)
sys.exit(0)
else:
# run dev server
host = DEV_HOST
port = DEV_PORT
debug = True
else:
# run production server
host = PRODUCTION_HOST
port = PRODUCTION_PORT
debug = False
b.run(host=host, port=port, debug=debug, reloader=True)