dribdat/dribdat/apifetch.py

530 lines
18 KiB
Python

# -*- coding: utf-8 -*-
"""Collecting data from third party API repositories."""
import re
import requests
import bleach
import logging
from pyquery import PyQuery as pq # noqa: N813
from base64 import b64decode
from flask_misaka import markdown
from bleach.sanitizer import ALLOWED_ATTRIBUTES
from urllib.parse import quote_plus
from .apievents import (
fetch_commits_github,
fetch_commits_gitlab,
fetch_commits_gitea,
)
from .utils import (
sanitize_url, load_presets, load_yaml_presets, fix_relative_links
)
from future.standard_library import install_aliases
install_aliases()
# In seconds, how long to wait for API response
REQUEST_TIMEOUT = 10
def FetchStageConfig(url, top_element='stages', by_col='name'):
"""Download a remote YAML stages configuration."""
if not url.startswith('http:') and not url.startswith('https:'):
logging.info("Loading stages from file")
return load_yaml_presets(top_element, by_col, url)
logging.info("Loading stages from URL")
data = requests.get(url, timeout=REQUEST_TIMEOUT)
if data.text.find('stages:') < 0:
logging.debug("No stage data", data.text)
return {}
blob = data.text
return load_presets(blob, top_element, by_col)
def FetchGiteaProject(project_url):
"""Download data from Codeberg, a large Gitea site."""
# Docs: https://codeberg.org/api/swagger
site_root = "https://codeberg.org"
url_q = quote_plus(project_url, '/')
api_repos = site_root + "/api/v1/repos/%s" % url_q
api_content = api_repos + "/contents"
# Collect basic data
logging.info("Fetching Gitea", url_q)
data = requests.get(api_repos, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug("No data", data.text)
return {}
json = data.json()
if 'name' not in json:
logging.debug("Invalid data", data.text)
return {}
# Collect the README
data = requests.get(api_content, timeout=REQUEST_TIMEOUT)
readme = ""
if not data.text.find('{') < 0:
readmeurl = None
for repo_file in data.json():
if 'readme' in repo_file['name'].lower():
readmeurl = repo_file['download_url']
readmedata = requests.get(readmeurl, timeout=REQUEST_TIMEOUT)
readme = readmedata.text
break
if readmeurl is None:
logging.info("Could not find README", url_q)
issuesurl = ''
if json['has_issues']:
issuesurl = json['html_url'] + '/issues'
return {
'type': 'Gitea',
'name': json['name'],
'summary': json['description'],
'description': readme,
'source_url': json['html_url'],
'image_url': json['avatar_url'] or json['owner']['avatar_url'],
'contact_url': issuesurl,
'commits': fetch_commits_gitea(url_q)
}
def FetchGitlabProject(project_url):
"""Download data from GitLab."""
WEB_BASE = "https://gitlab.com"
API_BASE = WEB_BASE + "/api/v4/projects/%s"
url_q = quote_plus(project_url)
# Collect basic data
logging.info("Fetching GitLab", url_q)
data = requests.get(API_BASE % url_q, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug("No data", data.text)
return {}
json = data.json()
if 'name' not in json:
logging.debug("Invalid data", data.text)
return {}
# Collect the README
readmeurl = json['readme_url'] + '?inline=false'
readmeurl = readmeurl.replace('-/blob/', '-/raw/')
readmedata = requests.get(readmeurl, timeout=REQUEST_TIMEOUT)
readme = readmedata.text or ""
return {
'type': 'GitLab',
'name': json['name'],
'summary': json['description'],
'description': readme,
'source_url': json['web_url'],
'image_url': json['avatar_url'],
'contact_url': json['web_url'] + '/issues',
'commits': fetch_commits_gitlab(json['id'])
}
def FetchGitlabAvatar(email):
"""Download a user avatar from GitLab."""
apiurl = "https://gitlab.com/api/v4/avatar?email=%s&size=80"
data = requests.get(apiurl % email, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug("No data", data.text)
return None
json = data.json()
if 'avatar_url' not in json:
return None
return json['avatar_url']
def FetchGithubProject(project_url):
"""Download data from GitHub."""
API_BASE = "https://api.github.com/repos/%s"
logging.info("Fetching GitHub", project_url)
data = requests.get(API_BASE % project_url, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug("No data", data.text)
return {}
json = data.json()
if 'name' not in json or 'full_name' not in json:
logging.debug("Invalid data", data.text)
return {}
repo_full_name = json['full_name']
default_branch = json['default_branch'] or 'main'
readmeurl = "%s/readme" % (API_BASE % project_url)
readmedata = requests.get(readmeurl, timeout=REQUEST_TIMEOUT)
readme = ''
if readmedata.text.find('{') < 0:
logging.debug("No readme", data.text)
else:
readme = readmedata.json()
if 'content' not in readme:
readme = ''
else:
readme = b64decode(readme['content']).decode('utf-8')
# Fix relative links in text
imgroot = "https://raw.githubusercontent.com"
readme = fix_relative_links(readme, imgroot, repo_full_name, default_branch)
return {
'type': 'GitHub',
'name': json['name'],
'summary': json['description'],
'description': readme,
'webpage_url': json['homepage'],
'source_url': json['html_url'],
'image_url': json['owner']['avatar_url'],
'contact_url': json['html_url'] + '/issues',
'download_url': json['html_url'] + '/releases',
'commits': fetch_commits_github(repo_full_name)
}
def FetchBitbucketProject(project_url):
"""Download data from Bitbucket."""
WEB_BASE = "https://bitbucket.org/%s"
API_BASE = "https://api.bitbucket.org/2.0/repositories/%s"
logging.info("Fetching Bitbucket", project_url)
data = requests.get(API_BASE % project_url, timeout=REQUEST_TIMEOUT)
if data.text.find('{') < 0:
logging.debug('No data at', project_url)
return {}
json = data.json()
if 'name' not in json:
logging.debug('Invalid format at', project_url)
return {}
readme = ''
for docext in ['.md', '.rst', '.txt', '']:
readmedata = requests.get(
API_BASE % project_url + '/src/HEAD/README.md',
timeout=REQUEST_TIMEOUT)
if readmedata.text.find('{"type":"error"') != 0:
readme = readmedata.text
break
web_url = WEB_BASE % project_url
contact_url = json['website'] or web_url
if json['has_issues']:
contact_url = "%s/issues" % web_url
image_url = ''
if 'project' in json and \
'links' in json['project'] \
and 'avatar' in json['project']['links']:
image_url = json['project']['links']['avatar']['href']
elif 'links' in json and 'avatar' in json['links']:
image_url = json['links']['avatar']['href']
return {
'type': 'Bitbucket',
'name': json['name'],
'summary': json['description'],
'description': readme,
'webpage_url': json['website'],
'source_url': web_url,
'image_url': image_url,
'contact_url': contact_url,
}
def FetchDataProject(datapackage_url):
"""Try to load a Data Package formatted JSON file."""
# TODO: use frictionlessdata library!
project_url = datapackage_url.replace('datapackage.json', '')
project_url = sanitize_url(project_url) + 'datapackage.json'
data = requests.get(project_url, timeout=REQUEST_TIMEOUT)
# TODO: treat dribdat events as special
logging.info("Fetching Data Package", project_url)
if data.text.find('{') < 0:
logging.debug('No data at', project_url)
return {}
json = data.json()
if 'name' not in json or 'title' not in json:
logging.debug('Invalid format at', project_url)
return {}
try:
text_content = parse_data_package(json)
except KeyError:
text_content = '(Could not parse Data Package contents)'
if 'homepage' in json:
contact_url = json['homepage'] or ''
elif 'maintainers' in json and \
len(json['maintainers']) > 0 and \
'web' in json['maintainers'][0]:
contact_url = json['maintainers'][0]['web']
return {
'type': 'Data Package',
'name': json['name'],
'summary': json['title'],
'description': text_content,
'source_url': project_url,
'logo_icon': 'box-open',
'contact_url': contact_url,
}
def parse_data_package(json):
"""Extract contents of a Data Package."""
text_content = ''
if 'description' in json:
text_content = json['description'] + '\n\n'
if 'resources' in json:
text_content = text_content + '\n### Resources\n\n'
for r in json['resources']:
rn = r['name']
if 'path' in r:
rn = "[%s](%s)" % (rn, r['path'])
text_content = text_content + '- ' + rn + '\n'
if 'sources' in json:
text_content = text_content + '\n### Sources\n\n'
for r in json['sources']:
rn = r['title']
if 'path' in r:
rn = "[%s](%s)" % (rn, r['path'])
text_content = text_content + '- ' + rn + '\n'
if text_content == '':
raise KeyError('No content')
return text_content
def FetchDribdatProject(dribdat_url):
"""Try to load a Dribdat project from a remote page."""
project_url = dribdat_url.replace('/project/', '/api/project/')
project_url = sanitize_url(project_url) + '?full=1'
data = requests.get(project_url, timeout=REQUEST_TIMEOUT)
# TODO: treat dribdat events as special
logging.info("Fetching Dribdat site", project_url)
if data.text.find('{') < 0:
logging.debug('No data at', project_url)
return {}
json = data.json()
if 'project' not in json or 'event' not in json:
logging.debug('Invalid format at', project_url)
return {}
projectdata = json['project']
projectdata['type'] = 'Dribdat'
projectdata['description'] = projectdata['longtext']
return projectdata
# Basis: https://github.com/mozilla/bleach/blob/master/bleach/sanitizer.py#L16
ALLOWED_HTML_TAGS = [
'acronym', 'a', 'blockquote', 'li', 'abbr',
'strong', 'b', 'i', 'ul', 'ol', 'code', 'em',
'img', 'font', 'center', 'sub', 'sup', 'pre',
'table', 'tr', 'thead', 'tbody', 'td',
'h1', 'h2', 'h3', 'h4', 'h5',
'p', 'u'
]
ALLOWED_HTML_ATTR = ALLOWED_ATTRIBUTES
ALLOWED_HTML_ATTR['h1'] = ['id']
ALLOWED_HTML_ATTR['h2'] = ['id']
ALLOWED_HTML_ATTR['h3'] = ['id']
ALLOWED_HTML_ATTR['h4'] = ['id']
ALLOWED_HTML_ATTR['h5'] = ['id']
ALLOWED_HTML_ATTR['a'] = ['href', 'title', 'class', 'name']
ALLOWED_HTML_ATTR['img'] = ['src', 'width', 'height', 'alt', 'class']
ALLOWED_HTML_ATTR['font'] = ['color']
def RequestRemoteContent(project_url):
try:
# TODO: the admin should be able to whitelist a range of allowed
# online resources controlling the domains from which we can
# fetch remote content.
project_url = sanitize_url(project_url)
logging.info("Fetching", project_url)
data = requests.get(project_url, timeout=REQUEST_TIMEOUT)
return data.text or None
except requests.exceptions.RequestException:
logging.warning("Could not connect to %s" % project_url)
return None
def FetchWebProject(project_url):
"""Parse a remote Document, wiki or website URL."""
datatext = RequestRemoteContent(project_url)
if datatext is None: return {}
# Google Document
if project_url.startswith('https://docs.google.com/document'):
return FetchWebGoogleDoc(datatext, project_url)
# Instructables
elif project_url.startswith('https://www.instructables.com/'):
return FetchWebInstructables(datatext, project_url)
# CodiMD / HackMD
elif datatext.find('<div id="doc" ') > 0:
return FetchWebCodiMD(datatext, project_url)
# DokuWiki
elif datatext.find('<meta name="generator" content="DokuWiki"/>') > 0:
return FetchWebDokuWiki(datatext, project_url)
# Etherpad
elif datatext.find('pad.importExport.exportetherpad') > 0:
return FetchWebEtherpad(datatext, project_url)
def FetchWebGoogleDoc(text, url):
"""Help extract data from a Google doc."""
doc = pq(text)
doc("style").remove()
ptitle = doc("div#title") or doc("div#header")
if len(ptitle) < 1:
return {}
content = doc("div#contents")
if len(content) < 1:
return {}
content = content.html().strip()
if not content or len(content) < 1:
return {}
html_content = bleach.clean(content, strip=True,
tags=frozenset(ALLOWED_HTML_TAGS),
attributes=ALLOWED_HTML_ATTR)
obj = {}
# {
# 'type': 'Google', ...
# 'name': name,
# 'summary': summary,
# 'description': html_content,
# 'image_url': image_url
# 'source_url': project_url,
# }
obj['type'] = 'Google Docs'
obj['name'] = ptitle.text()
obj['description'] = html_content
obj['source_url'] = url
obj['logo_icon'] = 'paperclip'
return obj
def FetchWebCodiMD(text, url):
"""Help extract data from CodiMD."""
doc = pq(text)
ptitle = doc("title")
if len(ptitle) < 1:
return {}
content = doc("div#doc").html()
if len(content) < 1:
return {}
obj = {}
obj['type'] = 'Markdown'
obj['name'] = ptitle.text()
obj['description'] = markdown(content)
obj['source_url'] = url
obj['logo_icon'] = 'outdent'
return obj
def FetchWebDokuWiki(text, url):
"""Help extract data from DokuWiki."""
doc = pq(text)
ptitle = doc("span.pageId")
if len(ptitle) < 1:
return {}
content = doc("div.dw-content")
if len(content) < 1:
return {}
html_content = bleach.clean(content.html().strip(), strip=True,
tags=ALLOWED_HTML_TAGS,
attributes=ALLOWED_HTML_ATTR)
obj = {}
obj['type'] = 'DokuWiki'
obj['name'] = ptitle.text().replace('project:', '')
obj['description'] = html_content
obj['source_url'] = url
obj['logo_icon'] = 'list-ul'
return obj
def FetchWebEtherpad(text, url):
"""Help extract data from Etherpad Lite."""
ptitle = url.split('/')[-1]
if len(ptitle) < 1:
return {}
text_content = requests.get(
"%s/export/txt" % url,
timeout=REQUEST_TIMEOUT).text
obj = {}
obj['type'] = 'Etherpad'
obj['name'] = ptitle.replace('_', ' ')
obj['description'] = text_content
obj['source_url'] = url
obj['logo_icon'] = 'pen'
return obj
def FetchWebInstructables(text, url):
"""Help extract data from Instructables."""
doc = pq(text)
ptitle = doc(".header-title")
content = doc(".main-content")
if len(content) < 1 or len(ptitle) < 1:
return {}
html_content = ParseInstructablesPage(content)
obj = {}
obj['type'] = 'Instructables'
obj['name'] = ptitle.text()
obj['description'] = html_content
obj['source_url'] = url
obj['logo_icon'] = 'wrench'
return obj
def FetchWebGitHub(url):
"""Grab a Markdown source from a GitHub link."""
if not url.endswith('.md') or not '/blob/' in url:
return {}
filename = url.split('/')[-1].replace('.md', '')
rawurl = url.replace('/blob/', '/raw/').replace("https://github.com/", '')
rawdata = requests.get("https://github.com/" + rawurl, timeout=REQUEST_TIMEOUT)
text_content = rawdata.text or ""
return {
'type': 'Markdown',
'name': filename,
'description': text_content,
'source_url': url,
'logo_icon': 'outdent',
}
def FetchWebGitHubGist(url):
"""Grab a Markdown source from a GitHub Gist link."""
rawurl = url.replace("https://gist.github.com/", '') + '/raw'
rawdata = requests.get("https://gist.githubusercontent.com/" + rawurl, timeout=REQUEST_TIMEOUT)
text_content = rawdata.text or ""
return {
'type': 'Markdown',
'name': 'Gist',
'description': text_content,
'source_url': url,
'logo_icon': 'outdent',
}
def ParseInstructablesPage(content):
"""Create an HTML summary of content."""
html_content = ""
for step in content.find(".step"):
step_title = pq(step).find('.step-title')
if step_title is not None:
html_content += '<h3>' + step_title.text() + '</h3>'
# Grab photos
for img in pq(step).find('noscript'):
if '{{ file' not in pq(img).html():
html_content += pq(img).html()
# Iterate through body
step_content = pq(step).find('.step-body')
if step_content is None:
continue
for elem in pq(step_content).children():
elem_tag, p = ParseInstructablesElement(elem)
if elem_tag is None:
continue
html_content += '<%s>%s</%s>' % (elem_tag, p, elem_tag)
return html_content
def ParseInstructablesElement(elem):
"""Check and return minimal contents."""
if elem.tag == 'pre':
if elem.text is None:
return None, None
return 'pre', elem.text
else:
p = pq(elem).html()
if p is None:
return None, None
p = bleach.clean(p.strip(), strip=True,
tags=ALLOWED_HTML_TAGS,
attributes=ALLOWED_HTML_ATTR)
return elem.tag, p