mirror of
https://github.com/Mikaela/Limnoria.git
synced 2024-11-05 10:39:22 +01:00
287 lines
9.0 KiB
Python
287 lines
9.0 KiB
Python
###
|
|
# Copyright (c) 2020-2021, The Limnoria Contributors
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions, and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions, and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
# * Neither the name of the author of this software nor the name of
|
|
# contributors to this software may be used to endorse or promote products
|
|
# derived from this software without specific prior written consent.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
###
|
|
|
|
import os
|
|
import json
|
|
import email
|
|
import base64
|
|
import functools
|
|
import contextlib
|
|
import urllib.error
|
|
import urllib.parse
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from cryptography.hazmat.backends import default_backend
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.primitives.asymmetric.rsa import generate_private_key
|
|
|
|
|
|
from supybot import commands, conf
|
|
from supybot.utils import gen, web
|
|
|
|
|
|
XRD_URI = "{http://docs.oasis-open.org/ns/xri/xrd-1.0}"
|
|
ACTIVITY_MIMETYPE = "application/activity+json"
|
|
|
|
|
|
class ActivityPubError(Exception):
|
|
pass
|
|
|
|
|
|
class ProtocolError(ActivityPubError):
|
|
pass
|
|
|
|
|
|
class HostmetaError(ProtocolError):
|
|
pass
|
|
|
|
|
|
class ActivityPubProtocolError(ActivityPubError):
|
|
pass
|
|
|
|
|
|
class WebfingerError(ProtocolError):
|
|
pass
|
|
|
|
|
|
class ActorNotFound(ActivityPubError):
|
|
pass
|
|
|
|
|
|
def sandbox(f):
|
|
"""Runs a function in a process with limited memory to prevent
|
|
XML memory bombs
|
|
<https://docs.python.org/3/library/xml.html#xml-vulnerabilities>
|
|
"""
|
|
|
|
@functools.wraps(f)
|
|
def newf(*args, **kwargs):
|
|
try:
|
|
return commands.process(
|
|
f,
|
|
*args,
|
|
timeout=10,
|
|
heap_size=300 * 1024 * 1024,
|
|
pn="Fediverse",
|
|
cn=f.__name__,
|
|
**kwargs
|
|
)
|
|
except (commands.ProcessTimeoutError, MemoryError):
|
|
raise web.Error(
|
|
"Page is too big or the server took "
|
|
"too much time to answer the request."
|
|
)
|
|
|
|
return newf
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def convert_exceptions(to_class, msg="", from_none=False):
|
|
try:
|
|
yield
|
|
except Exception as e:
|
|
arg = msg + str(e)
|
|
if from_none:
|
|
raise to_class(arg) from None
|
|
else:
|
|
raise to_class(arg) from e
|
|
|
|
|
|
@sandbox
|
|
def _get_webfinger_url(hostname):
|
|
with convert_exceptions(HostmetaError):
|
|
doc = ET.fromstring(
|
|
web.getUrlContent("https://%s/.well-known/host-meta" % hostname)
|
|
)
|
|
|
|
for link in doc.iter(XRD_URI + "Link"):
|
|
if link.attrib["rel"] == "lrdd":
|
|
return link.attrib["template"]
|
|
|
|
return "https://%s/.well-known/webfinger?resource={uri}"
|
|
|
|
|
|
def has_webfinger_support(hostname):
|
|
"""Returns whether the hostname probably supports webfinger or not.
|
|
|
|
This relies on an edge case of the Webfinger specification,
|
|
so it may not successfully detect some hosts because they don't follow
|
|
the specification."""
|
|
request = urllib.request.Request(
|
|
"https://%s/.well-known/webfinger" % hostname, method="HEAD"
|
|
)
|
|
try:
|
|
urllib.request.urlopen(request)
|
|
except urllib.error.HTTPError as e:
|
|
if e.code == 400:
|
|
# RFC 7033 requires a 400 response when the "resource" parameter
|
|
# is missing: https://tools.ietf.org/html/rfc7033#section-4.2
|
|
#
|
|
# This works for:
|
|
# * Misskey
|
|
# * PeerTube
|
|
# * Pleroma
|
|
return True
|
|
elif e.headers.get("Content-Type", "") == "application/jrd+json":
|
|
# WriteFreely, and possibly others.
|
|
# https://github.com/writeas/writefreely/issues/310
|
|
return True
|
|
elif e.code == 404:
|
|
if e.headers.get("Server", "").lower() == "mastodon":
|
|
# https://github.com/tootsuite/mastodon/issues/13757
|
|
return True
|
|
|
|
# Else, the host probably doesn't support Webfinger.
|
|
|
|
# Known false negatives:
|
|
# * Nextcloud (returns 404)
|
|
# * Pixelfed (returns 302 to the homepage):
|
|
# https://github.com/pixelfed/pixelfed/issues/2180
|
|
# * Plume (returns 404):
|
|
# https://github.com/Plume-org/Plume/issues/770
|
|
return False
|
|
|
|
|
|
def webfinger(hostname, uri):
|
|
template = _get_webfinger_url(hostname)
|
|
assert template, "missing webfinger url template for %s" % hostname
|
|
|
|
with convert_exceptions(ActorNotFound):
|
|
content = web.getUrlContent(
|
|
template.replace("{uri}", uri),
|
|
headers={"Accept": "application/json"},
|
|
)
|
|
|
|
with convert_exceptions(WebfingerError, "Invalid JSON: ", True):
|
|
return json.loads(content.decode())
|
|
|
|
|
|
def get_instance_actor_url():
|
|
root_url = conf.supybot.servers.http.publicUrl()
|
|
if not root_url:
|
|
return None
|
|
|
|
return urllib.parse.urljoin(root_url, "/fediverse/instance_actor")
|
|
|
|
|
|
def _generate_private_key():
|
|
return generate_private_key(
|
|
public_exponent=65537, key_size=2048, backend=default_backend()
|
|
)
|
|
|
|
|
|
def _get_private_key():
|
|
path = conf.supybot.directories.data.dirize("Fediverse/instance_key.pem")
|
|
if not os.path.isfile(path):
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
key = _generate_private_key()
|
|
pem = key.private_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PrivateFormat.TraditionalOpenSSL,
|
|
encryption_algorithm=serialization.NoEncryption(),
|
|
)
|
|
with open(path, "wb") as fd:
|
|
fd.write(pem)
|
|
|
|
with open(path, "rb") as fd:
|
|
return serialization.load_pem_private_key(
|
|
fd.read(), password=None, backend=default_backend()
|
|
)
|
|
|
|
|
|
def get_public_key():
|
|
return _get_private_key().public_key()
|
|
|
|
|
|
def get_public_key_pem():
|
|
return get_public_key().public_bytes(
|
|
encoding=serialization.Encoding.PEM,
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
)
|
|
|
|
|
|
def signed_request(url, headers=None, data=None):
|
|
method = "get" if data is None else "post"
|
|
instance_actor_url = get_instance_actor_url()
|
|
headers = gen.InsensitivePreservingDict(headers or {})
|
|
|
|
if "Date" not in headers:
|
|
headers["Date"] = email.utils.formatdate(usegmt=True)
|
|
|
|
if instance_actor_url:
|
|
parsed_url = urllib.parse.urlparse(url)
|
|
signed_headers = [
|
|
("(request-target)", method + " " + parsed_url.path),
|
|
("host", parsed_url.hostname),
|
|
]
|
|
for (header_name, header_value) in headers.items():
|
|
signed_headers.append((header_name.lower(), header_value))
|
|
signed_text = "\n".join("%s: %s" % header for header in signed_headers)
|
|
|
|
private_key = _get_private_key()
|
|
signature = private_key.sign(
|
|
signed_text.encode(), padding.PKCS1v15(), hashes.SHA256()
|
|
)
|
|
|
|
headers["Signature"] = (
|
|
'keyId="%s#main-key",' % instance_actor_url
|
|
+ 'headers="%s",' % " ".join(k for (k, v) in signed_headers)
|
|
+ 'signature="%s"' % base64.b64encode(signature).decode()
|
|
)
|
|
|
|
with convert_exceptions(ActivityPubProtocolError):
|
|
return web.getUrlContent(url, headers=headers, data=data)
|
|
|
|
|
|
def actor_url(localuser, hostname):
|
|
uri = "acct:%s@%s" % (localuser, hostname)
|
|
for link in webfinger(hostname, uri)["links"]:
|
|
if link["rel"] == "self" and link["type"] == ACTIVITY_MIMETYPE:
|
|
return link["href"]
|
|
|
|
raise ActorNotFound(localuser, hostname)
|
|
|
|
|
|
def get_actor(localuser, hostname):
|
|
url = actor_url(localuser, hostname)
|
|
|
|
return get_resource_from_url(url)
|
|
|
|
|
|
def get_resource_from_url(url):
|
|
content = signed_request(url, headers={"Accept": ACTIVITY_MIMETYPE})
|
|
|
|
assert content is not None, "Content from %s is None" % url
|
|
|
|
with convert_exceptions(ActivityPubProtocolError, "Invalid JSON: ", True):
|
|
return json.loads(content.decode())
|