Refactoring, logging

This commit is contained in:
Oleg Lavrovsky 2025-05-07 23:40:27 +02:00
commit 63f9218309
No known key found for this signature in database
GPG key ID: 31E523030632FF4B
8 changed files with 493 additions and 426 deletions

View file

@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
"""Collect events from remote repositories."""
import logging
import requests
from dateutil import parser
from flask import current_app
# In seconds, how long to wait for API response
REQUEST_TIMEOUT = 10
@ -12,34 +12,39 @@ REQUEST_TIMEOUT = 10
def fetch_commits_gitea(full_name, limit=10):
"""Parse data about Gitea commits."""
apiurl = "https://codeberg.org/api/v1/repos/%s/commits?limit=%d" % (
full_name, limit)
full_name,
limit,
)
data = requests.get(apiurl, timeout=REQUEST_TIMEOUT)
if data.status_code != 200:
logging.warning("Could not sync Gitea commits on %s" % full_name)
current_app.logger.warning("Could not sync Gitea commits on %s" % full_name)
return []
json = data.json()
if 'message' in json:
logging.warning("Could not sync Gitea commits on %s: %s"
% (full_name, json['message']))
if "message" in json:
current_app.logger.warning(
"Could not sync Gitea commits on %s: %s" % (full_name, json["message"])
)
return []
commitlog = []
for entry in json:
if 'commit' not in entry:
if "commit" not in entry:
continue
url = entry['html_url']
commit = entry['commit']
datestamp = parser.parse(entry['created'])
author = ''
if 'committer' in commit and 'name' in commit['committer']:
author = commit['committer']['name']
elif 'author' in entry and 'name' in commit['author']:
author = commit['author']['name']
commitlog.append({
'url': url,
'date': datestamp,
'author': author,
'message': commit['message'][:256],
})
url = entry["html_url"]
commit = entry["commit"]
datestamp = parser.parse(entry["created"])
author = ""
if "committer" in commit and "name" in commit["committer"]:
author = commit["committer"]["name"]
elif "author" in entry and "name" in commit["author"]:
author = commit["author"]["name"]
commitlog.append(
{
"url": url,
"date": datestamp,
"author": author,
"message": commit["message"][:256],
}
)
return commitlog
@ -52,12 +57,13 @@ def fetch_commits_github(full_name, since=None, until=None):
apiurl += "&until=%s" % until.replace(microsecond=0).isoformat()
data = requests.get(apiurl, timeout=REQUEST_TIMEOUT)
if data.status_code != 200:
logging.warning("Could not sync GitHub commits on %s" % full_name)
current_app.logger.warning("Could not sync GitHub commits on %s" % full_name)
return []
json = data.json()
if 'message' in json:
logging.warning("Could not sync GitHub commits on %s: %s"
% (full_name, json['message']))
if "message" in json:
current_app.logger.warning(
"Could not sync GitHub commits on %s: %s" % (full_name, json["message"])
)
return []
return parse_github_commits(json, full_name)
@ -66,32 +72,36 @@ def parse_github_commits(json, full_name):
"""Standardize data from a GitHub commit log."""
commitlog = []
for entry in json:
if 'commit' not in entry:
if "commit" not in entry:
continue
commit = entry['commit']
datestamp = parser.parse(commit['committer']['date'])
author = ''
if 'author' in entry and \
entry['author'] is not None and \
'login' in entry['author']:
author = entry['author']['login']
elif 'committer' in commit:
author = commit['committer']['name'][:100]
commit = entry["commit"]
datestamp = parser.parse(commit["committer"]["date"])
author = ""
if (
"author" in entry
and entry["author"] is not None
and "login" in entry["author"]
):
author = entry["author"]["login"]
elif "committer" in commit:
author = commit["committer"]["name"][:100]
url = "https://github.com/%s" % full_name
if 'html_url' in entry:
url = entry['html_url']
commitlog.append({
'url': url,
'date': datestamp,
'author': author,
'message': commit['message'][:256],
})
if "html_url" in entry:
url = entry["html_url"]
commitlog.append(
{
"url": url,
"date": datestamp,
"author": author,
"message": commit["message"][:256],
}
)
return commitlog
def fetch_commits_gitlab(project_id: int, since=None, until=None):
"""Parse data about GitLab commits."""
apiurl = 'https://gitlab.com/api/v4/'
apiurl = "https://gitlab.com/api/v4/"
apiurl = apiurl + "projects/%d/repository/commits?" % project_id
if since is not None:
apiurl += "&since=%s" % since.replace(microsecond=0).isoformat()
@ -99,25 +109,26 @@ def fetch_commits_gitlab(project_id: int, since=None, until=None):
apiurl += "&until=%s" % until.replace(microsecond=0).isoformat()
# Collect basic data
data = requests.get(apiurl, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
if data.text.find("{") < 0:
return []
json = data.json()
if 'message' in json:
logging.warning("Could not sync GitLab commits", json['message'])
if "message" in json:
current_app.logger.warning("Could not sync GitLab commits", json["message"])
return []
commitlog = []
for commit in json:
if 'message' not in commit:
if "message" not in commit:
continue
datestamp = parser.parse(commit['created_at'])
author = ''
if 'author_name' in commit and \
commit['author_name'] is not None:
author = commit['author_name']
commitlog.append({
'url': commit['web_url'],
'date': datestamp,
'author': author,
'message': commit['message'][:256],
})
datestamp = parser.parse(commit["created_at"])
author = ""
if "author_name" in commit and commit["author_name"] is not None:
author = commit["author_name"]
commitlog.append(
{
"url": commit["web_url"],
"date": datestamp,
"author": author,
"message": commit["message"][:256],
}
)
return commitlog

View file

@ -1,10 +1,9 @@
# -*- coding: utf-8 -*-
"""Collecting data from third party API repositories."""
import re
import requests
import bleach
import logging
from flask import current_app
from pyquery import PyQuery as pq # noqa: N813
from base64 import b64decode
from bleach.sanitizer import ALLOWED_ATTRIBUTES
@ -15,23 +14,29 @@ from .apievents import (
fetch_commits_gitea,
)
from .utils import (
sanitize_url, load_presets, load_yaml_presets, fix_relative_links, markdownit
sanitize_url,
load_presets,
load_yaml_presets,
fix_relative_links,
markdownit,
)
from future.standard_library import install_aliases
install_aliases()
# In seconds, how long to wait for API response
REQUEST_TIMEOUT = 10
def FetchStageConfig(url, top_element='stages', by_col='name'):
def FetchStageConfig(url, top_element="stages", by_col="name"):
"""Download a remote YAML stages configuration."""
if not url.startswith('http:') and not url.startswith('https:'):
logging.info("Loading stages from file")
if not url.startswith("http:") and not url.startswith("https:"):
current_app.logger.info("Loading stages from file")
return load_yaml_presets(top_element, by_col, url)
logging.info("Loading stages from URL")
current_app.logger.info("Loading stages from URL")
data = requests.get(url, timeout=REQUEST_TIMEOUT)
if data.text.find('stages:') < 0:
logging.debug("No stage data: %s", data.text)
if data.text.find("stages:") < 0:
current_app.logger.debug("No stage data: %s", data.text)
return {}
blob = data.text
return load_presets(blob, top_element, by_col)
@ -41,44 +46,44 @@ def FetchGiteaProject(project_url):
"""Download data from Codeberg, a large Gitea site."""
# Docs: https://codeberg.org/api/swagger
site_root = "https://codeberg.org"
url_q = quote_plus(project_url, '/')
url_q = quote_plus(project_url, "/")
api_repos = site_root + "/api/v1/repos/%s" % url_q
api_content = api_repos + "/contents"
# Collect basic data
logging.info("Fetching Gitea: %s", url_q)
current_app.logger.info("Fetching Gitea: %s", url_q)
data = requests.get(api_repos, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug("No data: %s", data.text)
if data.text.find("{") < 0:
current_app.logger.debug("No data: %s", data.text)
return {}
json = data.json()
if 'name' not in json:
logging.debug("Invalid data: %s", data.text)
if "name" not in json:
current_app.logger.debug("Invalid data: %s", data.text)
return {}
# Collect the README
data = requests.get(api_content, timeout=REQUEST_TIMEOUT)
readme = ""
if not data.text.find('{') < 0:
if not data.text.find("{") < 0:
readmeurl = None
for repo_file in data.json():
if 'readme' in repo_file['name'].lower():
readmeurl = repo_file['download_url']
if "readme" in repo_file["name"].lower():
readmeurl = repo_file["download_url"]
readmedata = requests.get(readmeurl, timeout=REQUEST_TIMEOUT)
readme = readmedata.text
break
if readmeurl is None:
logging.info("Could not find README: %s", url_q)
issuesurl = ''
if json['has_issues']:
issuesurl = json['html_url'] + '/issues'
current_app.logger.info("Could not find README: %s", url_q)
issuesurl = ""
if json["has_issues"]:
issuesurl = json["html_url"] + "/issues"
return {
'type': 'Gitea',
'name': json['name'],
'summary': json['description'],
'description': readme,
'source_url': json['html_url'],
'image_url': json['avatar_url'] or json['owner']['avatar_url'],
'contact_url': issuesurl,
'commits': fetch_commits_gitea(url_q)
"type": "Gitea",
"name": json["name"],
"summary": json["description"],
"description": readme,
"source_url": json["html_url"],
"image_url": json["avatar_url"] or json["owner"]["avatar_url"],
"contact_url": issuesurl,
"commits": fetch_commits_gitea(url_q),
}
@ -88,29 +93,29 @@ def FetchGitlabProject(project_url):
API_BASE = WEB_BASE + "/api/v4/projects/%s"
url_q = quote_plus(project_url)
# Collect basic data
logging.info("Fetching GitLab: %s", url_q)
current_app.logger.info("Fetching GitLab: %s", url_q)
data = requests.get(API_BASE % url_q, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug("No data: %s", data.text)
if data.text.find("{") < 0:
current_app.logger.debug("No data: %s", data.text)
return {}
json = data.json()
if 'name' not in json:
logging.debug("Invalid data: %s", data.text)
if "name" not in json:
current_app.logger.debug("Invalid data: %s", data.text)
return {}
# Collect the README
readmeurl = json['readme_url'] + '?inline=false'
readmeurl = readmeurl.replace('-/blob/', '-/raw/')
readmeurl = json["readme_url"] + "?inline=false"
readmeurl = readmeurl.replace("-/blob/", "-/raw/")
readmedata = requests.get(readmeurl, timeout=REQUEST_TIMEOUT)
readme = readmedata.text or ""
return {
'type': 'GitLab',
'name': json['name'],
'summary': json['description'],
'description': readme,
'source_url': json['web_url'],
'image_url': json['avatar_url'],
'contact_url': json['web_url'] + '/issues',
'commits': fetch_commits_gitlab(json['id'])
"type": "GitLab",
"name": json["name"],
"summary": json["description"],
"description": readme,
"source_url": json["web_url"],
"image_url": json["avatar_url"],
"contact_url": json["web_url"] + "/issues",
"commits": fetch_commits_gitlab(json["id"]),
}
@ -118,74 +123,74 @@ def FetchGitlabAvatar(email):
"""Download a user avatar from GitLab."""
apiurl = "https://gitlab.com/api/v4/avatar?email=%s&size=80"
data = requests.get(apiurl % email, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug("No data: %s", data.text)
if data.text.find("{") < 0:
current_app.logger.debug("No data: %s", data.text)
return None
json = data.json()
if 'avatar_url' not in json:
if "avatar_url" not in json:
return None
return json['avatar_url']
return json["avatar_url"]
def FetchGithubProject(project_url):
"""Download data from GitHub."""
API_BASE = "https://api.github.com/repos/%s"
logging.info("Fetching GitHub: %s", project_url)
current_app.logger.info("Fetching GitHub: %s", project_url)
data = requests.get(API_BASE % project_url, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug("No data: %s", data.text)
if data.text.find("{") < 0:
current_app.logger.debug("No data: %s", data.text)
return {}
json = data.json()
if 'name' not in json or 'full_name' not in json:
logging.debug("Invalid data: %s", data.text)
if "name" not in json or "full_name" not in json:
current_app.logger.debug("Invalid data: %s", data.text)
return {}
repo_full_name = json['full_name']
default_branch = json['default_branch'] or 'main'
repo_full_name = json["full_name"]
default_branch = json["default_branch"] or "main"
readmeurl = "%s/readme" % (API_BASE % project_url)
readmedata = requests.get(readmeurl, timeout=REQUEST_TIMEOUT)
readme = ''
if readmedata.text.find('{') < 0:
logging.debug("No readme: %s", data.text)
readme = ""
if readmedata.text.find("{") < 0:
current_app.logger.debug("No readme: %s", data.text)
else:
readme = readmedata.json()
if 'content' not in readme:
readme = ''
if "content" not in readme:
readme = ""
else:
readme = b64decode(readme['content']).decode('utf-8')
readme = b64decode(readme["content"]).decode("utf-8")
# Fix relative links in text
imgroot = "https://raw.githubusercontent.com"
readme = fix_relative_links(readme, imgroot, repo_full_name, default_branch)
return {
'type': 'GitHub',
'name': json['name'],
'summary': json['description'],
'description': readme,
'webpage_url': json['homepage'],
'source_url': json['html_url'],
'image_url': json['owner']['avatar_url'],
'contact_url': json['html_url'] + '/issues',
'download_url': json['html_url'] + '/releases',
'commits': fetch_commits_github(repo_full_name)
"type": "GitHub",
"name": json["name"],
"summary": json["description"],
"description": readme,
"webpage_url": json["homepage"],
"source_url": json["html_url"],
"image_url": json["owner"]["avatar_url"],
"contact_url": json["html_url"] + "/issues",
"download_url": json["html_url"] + "/releases",
"commits": fetch_commits_github(repo_full_name),
}
def FetchGithubIssue(project_url, issue_id):
"""Download an issue from GitHub."""
project_data = FetchGithubProject(project_url)
logging.info("Fetching GitHub Issue: %s", issue_id)
current_app.logger.info("Fetching GitHub Issue: %s", issue_id)
API_BASE = "https://api.github.com/repos/%s/issues/%d"
data = requests.get(API_BASE % (project_url, issue_id), timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug("No data: %s", data.text)
if data.text.find("{") < 0:
current_app.logger.debug("No data: %s", data.text)
return {}
json = data.json()
if 'title' not in json or 'body' not in json:
logging.debug("Invalid data: %s", data.text)
if "title" not in json or "body" not in json:
current_app.logger.debug("Invalid data: %s", data.text)
return {}
project_data['hashtag'] = '#%d' % issue_id
project_data['summary'] = project_data['name']
project_data['name'] = json['title'][:77]
project_data['description'] = json['body']
project_data["hashtag"] = "#%d" % issue_id
project_data["summary"] = project_data["name"]
project_data["name"] = json["title"][:77]
project_data["description"] = json["body"]
return project_data
@ -193,145 +198,173 @@ def FetchBitbucketProject(project_url):
"""Download data from Bitbucket."""
WEB_BASE = "https://bitbucket.org/%s"
API_BASE = "https://api.bitbucket.org/2.0/repositories/%s"
logging.info("Fetching Bitbucket: %s", project_url)
current_app.logger.info("Fetching Bitbucket: %s", project_url)
data = requests.get(API_BASE % project_url, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug('No data at: %s', project_url)
if data.text.find("{") < 0:
current_app.logger.debug("No data at: %s", project_url)
return {}
json = data.json()
if 'name' not in json:
logging.debug('Invalid format at: %s', project_url)
if "name" not in json:
current_app.logger.debug("Invalid format at: %s", project_url)
return {}
readme = ''
for docext in ['.md', '.rst', '.txt', '']:
readme = ""
for docext in [".md", ".rst", ".txt", ""]:
readmedata = requests.get(
API_BASE % project_url + '/src/HEAD/README.md',
timeout=REQUEST_TIMEOUT)
API_BASE % project_url + "/src/HEAD/README.md", timeout=REQUEST_TIMEOUT
)
if readmedata.text.find('{"type":"error"') != 0:
readme = readmedata.text
break
web_url = WEB_BASE % project_url
contact_url = json['website'] or web_url
if json['has_issues']:
contact_url = json["website"] or web_url
if json["has_issues"]:
contact_url = "%s/issues" % web_url
image_url = ''
if 'project' in json and \
'links' in json['project'] \
and 'avatar' in json['project']['links']:
image_url = json['project']['links']['avatar']['href']
elif 'links' in json and 'avatar' in json['links']:
image_url = json['links']['avatar']['href']
image_url = ""
if (
"project" in json
and "links" in json["project"]
and "avatar" in json["project"]["links"]
):
image_url = json["project"]["links"]["avatar"]["href"]
elif "links" in json and "avatar" in json["links"]:
image_url = json["links"]["avatar"]["href"]
return {
'type': 'Bitbucket',
'name': json['name'],
'summary': json['description'],
'description': readme,
'webpage_url': json['website'],
'source_url': web_url,
'image_url': image_url,
'contact_url': contact_url,
"type": "Bitbucket",
"name": json["name"],
"summary": json["description"],
"description": readme,
"webpage_url": json["website"],
"source_url": web_url,
"image_url": image_url,
"contact_url": contact_url,
}
def FetchDataProject(datapackage_url):
"""Try to load a Data Package formatted JSON file."""
# TODO: use frictionlessdata library!
project_url = datapackage_url.replace('datapackage.json', '')
project_url = sanitize_url(project_url) + 'datapackage.json'
project_url = datapackage_url.replace("datapackage.json", "")
project_url = sanitize_url(project_url) + "datapackage.json"
data = requests.get(project_url, timeout=REQUEST_TIMEOUT)
# TODO: treat dribdat events as special
logging.info("Fetching Data Package: %s", project_url)
if data.text.find('{') < 0:
logging.debug('No data at: %s', project_url)
current_app.logger.info("Fetching Data Package: %s", project_url)
if data.text.find("{") < 0:
current_app.logger.debug("No data at: %s", project_url)
return {}
json = data.json()
if 'name' not in json or 'title' not in json:
logging.debug('Invalid format at: %s', project_url)
contact_url = ""
if "name" not in json or "title" not in json:
current_app.logger.debug("Invalid format at: %s", project_url)
return {}
try:
text_content = parse_data_package(json)
except KeyError:
text_content = '(Could not parse Data Package contents)'
if 'homepage' in json:
contact_url = json['homepage'] or ''
elif 'maintainers' in json and \
len(json['maintainers']) > 0 and \
'web' in json['maintainers'][0]:
contact_url = json['maintainers'][0]['web']
text_content = "(Could not parse Data Package contents)"
if "homepage" in json:
contact_url = json["homepage"] or ""
elif (
"maintainers" in json
and len(json["maintainers"]) > 0
and "web" in json["maintainers"][0]
):
contact_url = json["maintainers"][0]["web"]
return {
'type': 'Data Package',
'name': json['name'],
'summary': json['title'],
'description': text_content,
'source_url': project_url,
'logo_icon': 'box-open',
'contact_url': contact_url,
"type": "Data Package",
"name": json["name"],
"summary": json["title"],
"description": text_content,
"source_url": project_url,
"logo_icon": "box-open",
"contact_url": contact_url,
}
def parse_data_package(json):
"""Extract contents of a Data Package."""
text_content = ''
if 'description' in json:
text_content = json['description'] + '\n\n'
if 'resources' in json:
text_content = text_content + '\n### Resources\n\n'
for r in json['resources']:
rn = r['name']
if 'path' in r:
rn = "[%s](%s)" % (rn, r['path'])
text_content = text_content + '- ' + rn + '\n'
if 'sources' in json:
text_content = text_content + '\n### Sources\n\n'
for r in json['sources']:
rn = r['title']
if 'path' in r:
rn = "[%s](%s)" % (rn, r['path'])
text_content = text_content + '- ' + rn + '\n'
if text_content == '':
raise KeyError('No content')
text_content = ""
if "description" in json:
text_content = json["description"] + "\n\n"
if "resources" in json:
text_content = text_content + "\n### Resources\n\n"
for r in json["resources"]:
rn = r["name"]
if "path" in r:
rn = "[%s](%s)" % (rn, r["path"])
text_content = text_content + "- " + rn + "\n"
if "sources" in json:
text_content = text_content + "\n### Sources\n\n"
for r in json["sources"]:
rn = r["title"]
if "path" in r:
rn = "[%s](%s)" % (rn, r["path"])
text_content = text_content + "- " + rn + "\n"
if text_content == "":
raise KeyError("No content")
return text_content
def FetchDribdatProject(dribdat_url):
"""Try to load a Dribdat project from a remote page."""
project_url = dribdat_url.replace('/project/', '/api/project/')
project_url = sanitize_url(project_url) + '?full=1'
project_url = dribdat_url.replace("/project/", "/api/project/")
project_url = sanitize_url(project_url) + "?full=1"
data = requests.get(project_url, timeout=REQUEST_TIMEOUT)
# TODO: treat dribdat events as special
logging.info("Fetching Dribdat site: %s", project_url)
if data.text.find('{') < 0:
logging.debug('No data at: %s', project_url)
current_app.logger.info("Fetching Dribdat site: %s", project_url)
if data.text.find("{") < 0:
current_app.logger.debug("No data at: %s", project_url)
return {}
json = data.json()
if 'project' not in json or 'event' not in json:
logging.debug('Invalid format at: %s', project_url)
if "project" not in json or "event" not in json:
current_app.logger.debug("Invalid format at: %s", project_url)
return {}
projectdata = json['project']
projectdata['type'] = 'Dribdat'
projectdata['description'] = projectdata['longtext']
projectdata = json["project"]
projectdata["type"] = "Dribdat"
projectdata["description"] = projectdata["longtext"]
return projectdata
# Basis: https://github.com/mozilla/bleach/blob/master/bleach/sanitizer.py#L16
ALLOWED_HTML_TAGS = [
'acronym', 'a', 'blockquote', 'li', 'abbr',
'strong', 'b', 'i', 'ul', 'ol', 'code', 'em',
'img', 'font', 'center', 'sub', 'sup', 'pre',
'table', 'tr', 'thead', 'tbody', 'td',
'h1', 'h2', 'h3', 'h4', 'h5',
'p', 'u'
"acronym",
"a",
"blockquote",
"li",
"abbr",
"strong",
"b",
"i",
"ul",
"ol",
"code",
"em",
"img",
"font",
"center",
"sub",
"sup",
"pre",
"table",
"tr",
"thead",
"tbody",
"td",
"h1",
"h2",
"h3",
"h4",
"h5",
"p",
"u",
]
ALLOWED_HTML_ATTR = ALLOWED_ATTRIBUTES
ALLOWED_HTML_ATTR['h1'] = ['id']
ALLOWED_HTML_ATTR['h2'] = ['id']
ALLOWED_HTML_ATTR['h3'] = ['id']
ALLOWED_HTML_ATTR['h4'] = ['id']
ALLOWED_HTML_ATTR['h5'] = ['id']
ALLOWED_HTML_ATTR['a'] = ['href', 'title', 'class', 'name']
ALLOWED_HTML_ATTR['img'] = ['src', 'width', 'height', 'alt', 'class']
ALLOWED_HTML_ATTR['font'] = ['color']
ALLOWED_HTML_ATTR["h1"] = ["id"]
ALLOWED_HTML_ATTR["h2"] = ["id"]
ALLOWED_HTML_ATTR["h3"] = ["id"]
ALLOWED_HTML_ATTR["h4"] = ["id"]
ALLOWED_HTML_ATTR["h5"] = ["id"]
ALLOWED_HTML_ATTR["a"] = ["href", "title", "class", "name"]
ALLOWED_HTML_ATTR["img"] = ["src", "width", "height", "alt", "class"]
ALLOWED_HTML_ATTR["font"] = ["color"]
def RequestRemoteContent(project_url):
@ -340,11 +373,11 @@ def RequestRemoteContent(project_url):
# online resources controlling the domains from which we can
# fetch remote content.
project_url = sanitize_url(project_url)
logging.info("Fetching: %s", project_url)
current_app.logger.info("Fetching: %s", project_url)
data = requests.get(project_url, timeout=REQUEST_TIMEOUT)
return data.text or None
except requests.exceptions.RequestException:
logging.warning("Could not connect to %s" % project_url)
current_app.logger.warning("Could not connect to %s" % project_url)
return None
@ -352,13 +385,14 @@ def FetchWebProject(project_url):
"""Parse a remote Document, wiki or website URL."""
datatext = RequestRemoteContent(project_url)
if datatext is None: return {}
if datatext is None:
return {}
# Google Document
if project_url.startswith('https://docs.google.com/document'):
if project_url.startswith("https://docs.google.com/document"):
return FetchWebGoogleDoc(datatext, project_url)
# Instructables
elif project_url.startswith('https://www.instructables.com/'):
elif project_url.startswith("https://www.instructables.com/"):
return FetchWebInstructables(datatext, project_url)
# Pretalx
elif datatext.find('<meta name="generator" content="pretalx">') > 0:
@ -370,7 +404,7 @@ def FetchWebProject(project_url):
elif datatext.find('<meta name="generator" content="DokuWiki"/>') > 0:
return FetchWebDokuWiki(datatext, project_url)
# Etherpad
elif datatext.find('pad.importExport.exportetherpad') > 0:
elif datatext.find("pad.importExport.exportetherpad") > 0:
return FetchWebEtherpad(datatext, project_url)
@ -384,12 +418,15 @@ def FetchWebGoogleDoc(text, url):
content = doc("div#contents")
if len(content) < 1:
return {}
content = content.html().strip()
content = str(content.html()).strip()
if not content or len(content) < 1:
return {}
html_content = bleach.clean(content, strip=True,
tags=frozenset(ALLOWED_HTML_TAGS),
attributes=ALLOWED_HTML_ATTR)
html_content = bleach.clean(
content,
strip=True,
tags=frozenset(ALLOWED_HTML_TAGS),
attributes=ALLOWED_HTML_ATTR,
)
obj = {}
# {
# 'type': 'Google', ...
@ -399,11 +436,11 @@ def FetchWebGoogleDoc(text, url):
# 'image_url': image_url
# 'source_url': project_url,
# }
obj['type'] = 'Google Docs'
obj['name'] = ptitle.text()
obj['description'] = html_content
obj['source_url'] = url
obj['logo_icon'] = 'paperclip'
obj["type"] = "Google Docs"
obj["name"] = ptitle.text()
obj["description"] = html_content
obj["source_url"] = url
obj["logo_icon"] = "paperclip"
return obj
@ -413,15 +450,15 @@ def FetchWebCodiMD(text, url):
ptitle = doc("title")
if len(ptitle) < 1:
return {}
content = doc("div#doc").html()
content = str(doc("div#doc").html())
if len(content) < 1:
return {}
obj = {}
obj['type'] = 'Markdown'
obj['name'] = ptitle.text()
obj['description'] = markdownit(content)
obj['source_url'] = url
obj['logo_icon'] = 'outdent'
obj["type"] = "Markdown"
obj["name"] = ptitle.text()
obj["description"] = markdownit(content)
obj["source_url"] = url
obj["logo_icon"] = "outdent"
return obj
@ -434,32 +471,33 @@ def FetchWebDokuWiki(text, url):
content = doc("div.dw-content")
if len(content) < 1:
return {}
html_content = bleach.clean(content.html().strip(), strip=True,
tags=ALLOWED_HTML_TAGS,
attributes=ALLOWED_HTML_ATTR)
html_content = bleach.clean(
str(content.html()).strip(),
strip=True,
tags=ALLOWED_HTML_TAGS,
attributes=ALLOWED_HTML_ATTR,
)
obj = {}
obj['type'] = 'DokuWiki'
obj['name'] = ptitle.text().replace('project:', '')
obj['description'] = html_content
obj['source_url'] = url
obj['logo_icon'] = 'list-ul'
obj["type"] = "DokuWiki"
obj["name"] = str(ptitle.text()).replace("project:", "")
obj["description"] = html_content
obj["source_url"] = url
obj["logo_icon"] = "list-ul"
return obj
def FetchWebEtherpad(text, url):
"""Help extract data from Etherpad Lite."""
ptitle = url.split('/')[-1]
ptitle = url.split("/")[-1]
if len(ptitle) < 1:
return {}
text_content = requests.get(
"%s/export/txt" % url,
timeout=REQUEST_TIMEOUT).text
text_content = requests.get("%s/export/txt" % url, timeout=REQUEST_TIMEOUT).text
obj = {}
obj['type'] = 'Etherpad'
obj['name'] = ptitle.replace('_', ' ')
obj['description'] = text_content
obj['source_url'] = url
obj['logo_icon'] = 'pen'
obj["type"] = "Etherpad"
obj["name"] = ptitle.replace("_", " ")
obj["description"] = text_content
obj["source_url"] = url
obj["logo_icon"] = "pen"
return obj
@ -472,42 +510,44 @@ def FetchWebInstructables(text, url):
return {}
html_content = ParseInstructablesPage(content)
obj = {}
obj['type'] = 'Instructables'
obj['name'] = ptitle.text()
obj['description'] = html_content
obj['source_url'] = url
obj['logo_icon'] = 'wrench'
obj["type"] = "Instructables"
obj["name"] = ptitle.text()
obj["description"] = html_content
obj["source_url"] = url
obj["logo_icon"] = "wrench"
return obj
def FetchWebGitHub(url):
"""Grab a Markdown source from a GitHub link."""
if not url.endswith('.md') or not '/blob/' in url:
if not url.endswith(".md") or "/blob/" not in url:
return {}
filename = url.split('/')[-1].replace('.md', '')
rawurl = url.replace('/blob/', '/raw/').replace("https://github.com/", '')
filename = url.split("/")[-1].replace(".md", "")
rawurl = url.replace("/blob/", "/raw/").replace("https://github.com/", "")
rawdata = requests.get("https://github.com/" + rawurl, timeout=REQUEST_TIMEOUT)
text_content = rawdata.text or ""
return {
'type': 'Markdown',
'name': filename,
'description': text_content,
'source_url': url,
'logo_icon': 'outdent',
"type": "Markdown",
"name": filename,
"description": text_content,
"source_url": url,
"logo_icon": "outdent",
}
def FetchWebGitHubGist(url):
"""Grab a Markdown source from a GitHub Gist link."""
rawurl = url.replace("https://gist.github.com/", '') + '/raw'
rawdata = requests.get("https://gist.githubusercontent.com/" + rawurl, timeout=REQUEST_TIMEOUT)
rawurl = url.replace("https://gist.github.com/", "") + "/raw"
rawdata = requests.get(
"https://gist.githubusercontent.com/" + rawurl, timeout=REQUEST_TIMEOUT
)
text_content = rawdata.text or ""
return {
'type': 'Markdown',
'name': 'Gist',
'description': text_content,
'source_url': url,
'logo_icon': 'outdent',
"type": "Markdown",
"name": "Gist",
"description": text_content,
"source_url": url,
"logo_icon": "outdent",
}
@ -515,56 +555,60 @@ def ParseInstructablesPage(content):
"""Create an HTML summary of content."""
html_content = ""
for step in content.find(".step"):
step_title = pq(step).find('.step-title')
step_title = pq(step).find(".step-title")
if step_title is not None:
html_content += '<h3>' + step_title.text() + '</h3>'
html_content += "<h3>%s</h3>" % step_title.text()
# Grab photos
for img in pq(step).find('noscript'):
if '{{ file' not in pq(img).html():
html_content += pq(img).html()
for img in pq(step).find("noscript"):
img_html = str(pq(img).html())
if "{{ file" not in img_html:
html_content += img_html
# Iterate through body
step_content = pq(step).find('.step-body')
step_content = pq(step).find(".step-body")
if step_content is None:
continue
for elem in pq(step_content).children():
elem_tag, p = ParseInstructablesElement(elem)
if elem_tag is None:
continue
html_content += '<%s>%s</%s>' % (elem_tag, p, elem_tag)
html_content += "<%s>%s</%s>" % (elem_tag, p, elem_tag)
return html_content
def ParseInstructablesElement(elem):
"""Check and return minimal contents."""
if elem.tag == 'pre':
if elem.tag == "pre":
if elem.text is None:
return None, None
return 'pre', elem.text
return "pre", elem.text
else:
p = pq(elem).html()
if p is None:
return None, None
p = bleach.clean(p.strip(), strip=True,
tags=ALLOWED_HTML_TAGS,
attributes=ALLOWED_HTML_ATTR)
p = bleach.clean(
str(p).strip(),
strip=True,
tags=ALLOWED_HTML_TAGS,
attributes=ALLOWED_HTML_ATTR,
)
return elem.tag, p
def FetchWebPretalx(text, url):
"""Grab Pretalx data from a talk."""
if not '/talk/' in url:
if "/talk/" not in url:
return {}
doc = pq(text)
apiurl = doc('link[@rel="alternate"]').attr('href')
rawdata = requests.get(apiurl + '?format=json', timeout=REQUEST_TIMEOUT)
if rawdata.text.find('{') < 0:
apiurl = doc('link[@rel="alternate"]').attr("href")
rawdata = requests.get(str(apiurl) + "?format=json", timeout=REQUEST_TIMEOUT)
if rawdata.text.find("{") < 0:
return {}
jsondata = rawdata.json()
return {
'type': 'Pretalx',
'name': jsondata['title'],
'summary': jsondata['abstract'][:2000],
'description': jsondata['description'],
'source_url': url,
'logo_icon': 'window-maximize',
"type": "Pretalx",
"name": jsondata["title"],
"summary": jsondata["abstract"][:2000],
"description": jsondata["description"],
"source_url": url,
"logo_icon": "window-maximize",
}

View file

@ -1,7 +1,6 @@
# -*- coding: utf-8 -*-
"""Connect to generative A.I. tools."""
import logging
import openai
from flask import current_app
@ -21,11 +20,11 @@ SYSTEM_PROMPT = (
# Default challenge prompts
INITIAL_CHALLENGE_PROMPT = (
"Write a challenge statement for a collaborative hackathon project. " +
"Include a section with a basic introduction and first steps. " +
"Describe in a second section some example datasets and resources. " +
"The last section should explain what kind of skills are involved. " +
"Use dashes (---) to separate the sections."
"Write a challenge statement for a collaborative hackathon project. "
+ "Include a section with a basic introduction and first steps. "
+ "Describe in a second section some example datasets and resources. "
+ "The last section should explain what kind of skills are involved. "
+ "Use dashes (---) to separate the sections."
)
INITIAL_PROJECT_PROMPT = (
"Suggest one clear and concise next step for a hackathon project."
@ -74,7 +73,7 @@ def prompt_ideas(project: Project):
"""Form a prompt that is used to generate posts."""
basep = prompt_initial(project)
# Collect project contents, preferring the pitch
summary = ''
summary = ""
if project.longtext:
summary = project.longtext
if project.autotext:
@ -100,9 +99,7 @@ def prompt_ideas(project: Project):
summary = "Improve upon the following prior results:\n%s" % (summary)
# Generate the prompt
return (
basep + "\n\n%s\n\n%s" % (stage_advice, summary)
)
return basep + "\n\n%s\n\n%s" % (stage_advice, summary)
def gen_project_pitch(project: Project):
@ -122,18 +119,21 @@ def gen_project_post(project: Project, as_boost: bool = False):
else:
# Use the standard recommendation prompt
prompt = DEFAULT_RECOMMENDATION_PROMPT + prompt
#print(prompt)
# print(prompt)
return gen_openai(prompt)
def gen_openai(prompt: str):
"""Request data from a text-completion API."""
logging = current_app.logger
if not current_app.config["LLM_API_KEY"]:
logging.error("Missing ChatGPT configuration (LLM_API_KEY)")
return None
# TODO: persist in app session
if current_app.config["LLM_BASE_URL"]:
logging.info("Using custom LLM provider")
ai_client = openai.OpenAI(
api_key=current_app.config["LLM_API_KEY"],
base_url=current_app.config["LLM_BASE_URL"],
@ -149,6 +149,7 @@ def gen_openai(prompt: str):
# Attempt to get an interaction started
try:
logging.debug("Starting completions")
completion = ai_client.chat.completions.create(
model=current_app.config["LLM_MODEL"],
timeout=REQUEST_TIMEOUT,
@ -176,7 +177,7 @@ def gen_openai(prompt: str):
if len(completion.choices) > 0:
mymodel = current_app.config["LLM_MODEL"].upper()
content = completion.choices[0].message.content or ""
#content = content.replace("\n", "\n> ")
# content = content.replace("\n", "\n> ")
return "🅰️ℹ️ `Generated with %s`\n\n%s" % (mymodel, content)
else:
logging.error("No LLM data in response")

View file

@ -1,14 +1,15 @@
# -*- coding: utf-8 -*-
"""Importing event data from a package"""
import logging
import requests
import json, csv
import json
import csv
import tempfile
from os import path
from copy import deepcopy
from werkzeug.utils import secure_filename
from datetime import datetime
from flask import current_app
from frictionless import Package, Resource
from .user.models import Event, Project, Activity, Category, User, Role
from .utils import format_date
@ -118,10 +119,10 @@ def import_events_data(data, dry_run=False):
name = evt["name"]
event = Event.query.filter_by(name=name).first()
if not event:
logging.info("Creating event: %s" % name)
current_app.logger.info("Creating event: %s" % name)
event = Event()
else:
logging.info("Updating event: %s" % name)
current_app.logger.info("Updating event: %s" % name)
event.set_from_data(evt)
if not dry_run:
event.save()
@ -136,10 +137,10 @@ def import_categories_data(data, dry_run=False):
name = ctg["name"]
category = Category.query.filter_by(name=name).first()
if not category:
logging.info("Creating category: %s" % name)
current_app.logger.info("Creating category: %s" % name)
category = Category()
else:
logging.info("Updating category: %s" % name)
current_app.logger.info("Updating category: %s" % name)
category.set_from_data(ctg)
if not dry_run:
category.save()
@ -160,9 +161,9 @@ def import_users_data(data, dry_run=False):
or User.query.filter_by(email=email).first()
):
# Do not update existing user data
logging.info("Skipping user: %s" % name)
current_app.logger.info("Skipping user: %s" % name)
continue
logging.info("Creating user: %s" % name)
current_app.logger.info("Creating user: %s" % name)
user = User()
user.set_from_data(usr)
import_user_roles(user, usr["roles"], dry_run)
@ -180,6 +181,8 @@ def import_user_roles(user, new_roles, dry_run=False):
if r in my_roles:
continue
# Check that role is a new one
if not Role.name:
continue
role = Role.query.filter(Role.name.ilike(r)).first()
if not role:
role = Role(r)
@ -196,14 +199,14 @@ def import_project_data(data, dry_run=False, event=None):
updates = []
for pjt in data:
# Skip empty rows
if not "name" in pjt:
logging.warning("Skipping empty row")
logging.debug(pjt)
if "name" not in pjt:
current_app.logger.warning("Skipping empty row")
current_app.logger.debug(pjt)
continue
# Get project name and content
name = pjt["name"]
if not "longtext" in pjt and "excerpt" in pjt:
logging.warning("Importing excerpt as longtext")
if "longtext" not in pjt and "excerpt" in pjt:
current_app.logger.warning("Importing excerpt as longtext")
pjt["longtext"] = pjt.pop("excerpt")
# Search for event
event_name = None
@ -212,15 +215,17 @@ def import_project_data(data, dry_run=False, event=None):
if event_name and (not event or event.name != event_name):
event = Event.query.filter_by(name=event_name).first()
if not event:
logging.warning("Skip [%s], event not found: %s" % (name, event_name))
current_app.logger.warning(
"Skip [%s], event not found: %s" % (name, event_name)
)
continue
# Search for project
project = Project.query.filter_by(name=name).first()
if not project:
logging.info("Creating project: %s" % name)
current_app.logger.info("Creating project: %s" % name)
project = Project()
else:
logging.info("Updating project: %s" % name)
current_app.logger.info("Updating project: %s" % name)
project.set_from_data(pjt)
project.update_null_fields()
project.event_id = event.id
@ -240,15 +245,15 @@ def import_activities(data, dry_run=False):
tstamp = datetime.utcfromtimestamp(act["time"])
activity = Activity.query.filter_by(name=aname, timestamp=tstamp).first()
if activity:
logging.info("Skipping activity: %s", tstamp)
current_app.logger.info("Skipping activity: %s", tstamp)
continue
logging.info("Creating activity: %s", tstamp)
current_app.logger.info("Creating activity: %s", tstamp)
if act["project_name"] != pname:
pname = act["project_name"]
# TODO: unreliable; rather use a map of project_id to new id
proj = Project.query.filter_by(name=pname).first()
if not proj:
logging.warning("Error! Project not found: %s" % pname)
current_app.logger.warning("Error! Project not found: %s" % pname)
continue
activity = Activity(aname, proj.id)
activity.set_from_data(act)
@ -290,7 +295,7 @@ def fetch_datapackage(url, dry_run=False, all_data=False):
# For security, can only be used from CLI.
# In the future, we can add a subscription setting on the server side.
if not url.endswith("datapackage.json"):
logging.error("Invalid URL: %s", url)
current_app.logger.error("Invalid URL: %s", url)
return {}
try:
data = requests.get(url, timeout=REQUEST_TIMEOUT).json()
@ -298,7 +303,7 @@ def fetch_datapackage(url, dry_run=False, all_data=False):
except json.decoder.JSONDecodeError:
return {"errors": ["Could not load package due to JSON error"]}
except requests.exceptions.RequestException:
logging.error("Could not connect to %s" % url)
current_app.logger.error("Could not connect to %s" % url)
return {}

View file

@ -23,6 +23,7 @@ from dribdat.settings import ProdConfig # noqa: I005
from dribdat.utils import timesince, markdownit
from dribdat.onebox import make_oembedplus
from pytz import timezone
import logging
def init_app(config_object=ProdConfig):
@ -36,18 +37,18 @@ def init_app(config_object=ProdConfig):
app.config.from_object(config_object)
# Set up cross-site access to the API
if app.config['SERVER_CORS']:
if app.config["SERVER_CORS"]:
CORS(app, resources={r"/api/*": {"origins": "*"}})
app.config['CORS_HEADERS'] = 'Content-Type'
app.config["CORS_HEADERS"] = "Content-Type"
# Set up using an external proxy/static server
if app.config['SERVER_PROXY']:
if app.config["SERVER_PROXY"]:
app.wsgi_app = ProxyFix(app, x_for=1, x_proto=1, x_host=1)
else:
# Internally optimize static file hosting
app.wsgi_app = WhiteNoise(app.wsgi_app, prefix='static/')
for static in ('css', 'img', 'js', 'public'):
app.wsgi_app.add_files('dribdat/static/' + static)
app.wsgi_app = WhiteNoise(app.wsgi_app, prefix="static/")
for static in ("css", "img", "js", "public"):
app.wsgi_app.add_files("dribdat/static/" + static)
register_extensions(app)
register_blueprints(app)
@ -76,9 +77,9 @@ def register_extensions(app):
def init_mailman(app):
"""Initialize mailer support."""
if 'MAIL_SERVER' in app.config and app.config['MAIL_SERVER']:
if not app.config['MAIL_DEFAULT_SENDER']:
app.logger.warn('MAIL_DEFAULT_SENDER is required to send email')
if "MAIL_SERVER" in app.config and app.config["MAIL_SERVER"]:
if not app.config["MAIL_DEFAULT_SENDER"]:
app.logger.warn("MAIL_DEFAULT_SENDER is required to send email")
else:
mail = Mail()
mail.init_app(app)
@ -86,10 +87,12 @@ def init_mailman(app):
def init_talisman(app):
"""Initialize Talisman support."""
if 'SERVER_SSL' in app.config and app.config['SERVER_SSL']:
Talisman(app,
content_security_policy=app.config['CSP_DIRECTIVES'],
frame_options_allow_from='*')
if "SERVER_SSL" in app.config and app.config["SERVER_SSL"]:
Talisman(
app,
content_security_policy=app.config["CSP_DIRECTIVES"],
frame_options_allow_from="*",
)
def register_blueprints(app):
@ -112,11 +115,13 @@ def register_oauthhandlers(app):
def register_errorhandlers(app):
"""Register error handlers."""
def render_error(error):
"""Render error template."""
# If a HTTPException, pull the `code` attribute; default to 500
error_code = getattr(error, 'code', 500)
return render_template('{0}.html'.format(error_code)), error_code
error_code = getattr(error, "code", 500)
return render_template("{0}.html".format(error_code)), error_code
for errcode in [401, 404, 500]:
app.errorhandler(errcode)(render_error)
return None
@ -124,12 +129,12 @@ def register_errorhandlers(app):
def register_shellcontext(app):
"""Register shell context objects."""
def shell_context():
"""Shell context objects."""
from dribdat.user.models import User
return {
'db': db,
'User': User}
return {"db": db, "User": User}
app.shell_context_processor(shell_context)
@ -143,28 +148,27 @@ def register_commands(app):
def register_filters(app):
"""Register filters for templates."""
#
# Conversion of Markdown to HTML
@app.template_filter()
def markdown(value):
return markdownit(value)
#Misaka(app, autolink=True, fenced_code=True, strikethrough=True, tables=True)
# Misaka(app, autolink=True, fenced_code=True, strikethrough=True, tables=True)
# Registration of handlers for micawber
app.oembed_providers = bootstrap_basic()
@app.template_filter()
def onebox(value):
return make_oembedplus(
value, app.oembed_providers, maxwidth=600, maxheight=400
)
return make_oembedplus(value, app.oembed_providers, maxwidth=600, maxheight=400)
# Timezone helper
app.tz = timezone(app.config['TIME_ZONE'] or 'UTC')
app.tz = timezone(app.config["TIME_ZONE"] or "UTC")
# Lambda filters for safe image_url's
app.jinja_env.filters['quote_plus'] = lambda u: quote_plus(u or '', ':/?&=')
app.jinja_env.filters["quote_plus"] = lambda u: quote_plus(u or "", ":/?&=")
# Custom filters
@app.template_filter()
@ -176,35 +180,39 @@ def register_filters(app):
return timesince(value, default="now!", until=True)
@app.template_filter()
def format_date(value, format='%d.%m.%Y'):
if value is None: return ''
def format_date(value, format="%d.%m.%Y"):
if value is None:
return ""
return value.strftime(format)
@app.template_filter()
def format_datetime(value, format='%d.%m.%Y %H:%M'):
if value is None: return ''
def format_datetime(value, format="%d.%m.%Y %H:%M"):
if value is None:
return ""
return value.strftime(format)
def register_loggers(app):
"""Initialize and configure logging."""
import logging
stream_handler = logging.StreamHandler()
app.logger.addHandler(stream_handler)
if 'DEBUG' in app.config and app.config['DEBUG']:
if "DEBUG" in app.config and app.config["DEBUG"]:
app.logger.setLevel(logging.DEBUG)
print("Setting logger level to DEBUG")
app.logger.info("Setting logger level to DEBUG")
else:
app.logger.setLevel(logging.ERROR)
print("Setting logger level to ERROR")
app.logger.info("Setting logger level to ERROR")
def register_caching(app):
"""Prevent cached responses in debug."""
if 'DEBUG' in app.config and app.config['DEBUG']:
if "DEBUG" in app.config and app.config["DEBUG"]:
@app.after_request
def after_request(response):
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate, public, max-age=0"
response.headers["Cache-Control"] = (
"no-cache, no-store, must-revalidate, public, max-age=0"
)
response.headers["Expires"] = 0
response.headers["Pragma"] = "no-cache"
return response

View file

@ -1,9 +1,9 @@
"""Boxout module for Data Packages."""
import re
import logging
import pystache
from datetime import datetime
from flask import current_app
from frictionless import Package
TEMPLATE_PACKAGE = r"""
@ -51,14 +51,14 @@ TEMPLATE_PACKAGE = r"""
</div>
"""
dpkg_url_re = re.compile(r'.*(http?s:\/\/.+datapackage\.json)\)*')
dpkg_url_re = re.compile(r".*(http?s:\/\/.+datapackage\.json)\)*")
def chk_datapackage(line):
"""Check the url matching dataset pattern."""
return (
(line.startswith('http') and line.endswith('datapackage.json'))
or line.endswith('datapackage.json)'))
line.startswith("http") and line.endswith("datapackage.json")
) or line.endswith("datapackage.json)")
def box_datapackage(line, cache=None):
@ -70,28 +70,27 @@ def box_datapackage(line, cache=None):
if cache and cache.has(url):
return cache.get(url)
try:
logging.info("Fetching Data Package: <%s>" % url)
current_app.logger.info("Fetching Data Package: <%s>" % url)
package = Package(url)
except Exception: # noqa: B902
logging.warning("Data Package not parsed: <%s>" % url)
current_app.logger.warning("Data Package not parsed: <%s>" % url)
return None
if package.created:
dt = datetime.fromisoformat(package.created).strftime("%d.%m.%Y")
else:
dt = ''
base_url = url.replace('/datapackage.json', '')
dt = ""
base_url = url.replace("/datapackage.json", "")
# Adjust for absolute URLs
for r in range(0, len(package.resources)):
if not 'path' in package.resources[r]:
if not "path" in package.resources[r]:
continue
rp = package.resources[r]['path']
if rp and not rp.startswith('http'):
package.resources[r]['path'] = '/'.join([base_url, rp])
rp = package.resources[r]["path"]
if rp and not rp.startswith("http"):
package.resources[r]["path"] = "/".join([base_url, rp])
# Render to template
box = pystache.render(
TEMPLATE_PACKAGE, {'url': url, 'dp': package, 'date': dt})
box = pystache.render(TEMPLATE_PACKAGE, {"url": url, "dp": package, "date": dt})
if cache:
cache.set(url, box)
if cache and cache.has(url):
logging.debug("Cached Data Package: <%s>" % url)
current_app.logger.debug("Cached Data Package: <%s>" % url)
return box

View file

@ -4,7 +4,6 @@
from flask import url_for, current_app
from flask_mailman import EmailMessage
from dribdat.utils import random_password # noqa: I005
import logging
def user_activation_message(user, act_hash):
@ -28,7 +27,7 @@ def user_activation_message(user, act_hash):
+ "d}}BD{t"
)
# --------------------
logging.debug(act_url)
current_app.logger.debug(act_url)
return msg
@ -40,10 +39,10 @@ def user_activation(user):
msg = user_activation_message(user, act_hash)
# print(msg.body)
if "mailman" not in current_app.extensions:
logging.warning("E-mail extension has not been configured")
current_app.logger.warning("E-mail extension has not been configured")
return act_hash
msg.to = [user.email]
logging.info("Sending activation mail to user %d" % user.id)
current_app.logger.info("Sending activation mail to user %d" % user.id)
msg.send(fail_silently=True)
return act_hash
@ -52,10 +51,10 @@ def user_registration(user_email):
"""Send an invitation by e-mail."""
msg = user_invitation_message()
if "mailman" not in current_app.extensions:
logging.warning("E-mail extension has not been configured")
current_app.logger.warning("E-mail extension has not been configured")
return
msg.to = [user_email]
logging.info("Sending registration mail")
current_app.logger.info("Sending registration mail")
msg.send(fail_silently=True)
@ -89,10 +88,10 @@ def user_invitation_message(project=None):
def user_invitation(user_email, project):
"""Send an invitation by e-mail."""
if "mailman" not in current_app.extensions:
logging.warning("E-mail extension has not been configured")
current_app.logger.warning("E-mail extension has not been configured")
return False
msg = user_invitation_message(project)
msg.to = [user_email]
logging.info("Sending activation mail to %s" % user_email)
current_app.logger.info("Sending activation mail to %s" % user_email)
msg.send(fail_silently=True)
return True

View file

@ -2,8 +2,7 @@
"""Jinja formatters for Oneboxes and Embeds."""
import re
import logging
from flask import url_for
from flask import current_app, url_for
from micawber.parsers import standalone_url_re, full_handler
from .boxout.ckan import box_dataset, chk_dataset, ini_dataset
from .boxout.dribdat import box_project
@ -15,29 +14,30 @@ from dribdat.extensions import cache
def format_webembed(url, project_id=None):
"""Create a well-formatted frame for project embeds."""
if not url:
return ''
return ""
urltest = url.lower().strip()
if urltest.startswith('<iframe '):
if urltest.startswith("<iframe "):
# Allow IFRAMEs
# TODO: add a setting
return url
elif urltest.endswith('.pdf') and project_id is not None:
elif urltest.endswith(".pdf") and project_id is not None:
# Redirect to embed visualizer of this document
url = url_for('project.render', project_id=project_id)
elif urltest.startswith('https://query.wikidata.org/'):
url = url_for("project.render", project_id=project_id)
elif urltest.startswith("https://query.wikidata.org/"):
# Fix WikiData queries
url = url.replace('https://query.wikidata.org/',
'https://query.wikidata.org/embed.html')
elif urltest.startswith('https://youtu.be/'):
url = url.replace(
"https://query.wikidata.org/", "https://query.wikidata.org/embed.html"
)
elif urltest.startswith("https://youtu.be/"):
# Fix YouTube mobile link
url = url.replace('https://youtu.be/',
'https://www.youtube.com/embed/')
url = url.replace('?t=', '?start=')
elif urltest.startswith('https://www.youtube.com/watch?'):
url = url.replace("https://youtu.be/", "https://www.youtube.com/embed/")
url = url.replace("?t=", "?start=")
elif urltest.startswith("https://www.youtube.com/watch?"):
# Fix YouTube web link
url = url.replace('https://www.youtube.com/watch?v=',
'https://www.youtube.com/embed/')
url = url.replace('?t=', '?start=')
url = url.replace(
"https://www.youtube.com/watch?v=", "https://www.youtube.com/embed/"
)
url = url.replace("?t=", "?start=")
# TODO: add more embeddables here
# TODO: whitelist
return '<iframe src="%s"></iframe>' % url
@ -45,7 +45,7 @@ def format_webembed(url, project_id=None):
def format_webslides(longtext):
"""Format a Reveal.js slide presentation"""
return '''
return """
<div class="reveal">
<div class="slides">
<section data-markdown>
@ -53,7 +53,7 @@ def format_webslides(longtext):
</section>
</div>
</div>
''' % longtext.replace('\n***', '\n---')
""" % longtext.replace("\n***", "\n---")
# Adjusting for Thematic Break format
@ -66,7 +66,7 @@ def repl_onebox(mat=None, li=None):
return
if mat.group(1):
url = mat.group(1).strip()
if '/project/' in url:
if "/project/" in url:
# Try to parse a project link
return box_project(url) or mat.group()
return mat.group()
@ -74,7 +74,7 @@ def repl_onebox(mat=None, li=None):
def make_onebox(raw_html):
"""Create a onebox container."""
url = re.escape(url_for('public.home', _external=True))
url = re.escape(url_for("public.home", _external=True))
regexp = re.compile('<a href="(%s.+?)">(%s.+?)</a>' % (url, url))
return re.sub(regexp, repl_onebox, raw_html)
@ -85,8 +85,8 @@ def make_oembedplus(text, oembed_providers, **params):
parsed = []
has_dataset = False
# Url to projects
home_url = re.escape(url_for('public.home', _external=True) + 'project/')
home_url_re = re.compile('(%s.+)' % home_url)
home_url = re.escape(url_for("public.home", _external=True) + "project/")
home_url_re = re.compile("(%s.+)" % home_url)
# Iterate each line (inefficient!)
for line in lines:
newline = None
@ -113,7 +113,7 @@ def make_oembedplus(text, oembed_providers, **params):
# Add init code
if has_dataset:
parsed = [ini_dataset()] + parsed
return '\n'.join(parsed)
return "\n".join(parsed)
def box_default(line, oembed_providers, **params):
@ -122,7 +122,7 @@ def box_default(line, oembed_providers, **params):
try:
response = oembed_providers.request(url, **params)
except Exception: # noqa: B902
logging.info("OEmbed could not parse: <%s>" % url)
current_app.logger.info("OEmbed could not parse: <%s>" % url)
else:
return full_handler(url, response, **params)
return None