### # Copyright (c) 2020, Valentin Lorentz # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, # this list of conditions, and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions, and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the author of this software nor the name of # contributors to this software may be used to endorse or promote products # derived from this software without specific prior written consent. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. ### import json import base64 import functools import contextlib import urllib.parse import xml.etree.ElementTree as ET from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.asymmetric import padding from supybot import commands, conf from supybot.utils import web XRD_URI = "{http://docs.oasis-open.org/ns/xri/xrd-1.0}" ACTIVITY_MIMETYPE = "application/activity+json" class ActivityPubError(Exception): pass class ProtocolError(ActivityPubError): pass class HostmetaError(ProtocolError): pass class ActivityPubProtocolError(ActivityPubError): pass class WebfingerError(ProtocolError): pass class ActorNotFound(ActivityPubError): pass def sandbox(f): """Runs a function in a process with limited memory to prevent XML memory bombs """ @functools.wraps(f) def newf(*args, **kwargs): try: return commands.process( f, *args, **kwargs, timeout=10, heap_size=100 * 1024 * 1024, pn="Fediverse", cn=f.__name__ ) except (commands.ProcessTimeoutError, MemoryError): raise web.Error( "Page is too big or the server took " "too much time to answer the request." ) return newf @contextlib.contextmanager def convert_exceptions(to_class, msg="", from_none=False): try: yield except Exception as e: arg = msg + str(e) if from_none: raise to_class(arg) from None else: raise to_class(arg) from e @sandbox def _get_webfinger_url(hostname): with convert_exceptions(HostmetaError): doc = ET.fromstring( web.getUrlContent("https://%s/.well-known/host-meta" % hostname) ) for link in doc.iter(XRD_URI + "Link"): if link.attrib["rel"] == "lrdd": return link.attrib["template"] return "https://%s/.well-known/webfinger?resource={uri}" def webfinger(hostname, uri): template = _get_webfinger_url(hostname) assert template with convert_exceptions(ActorNotFound): content = web.getUrlContent( template.replace("{uri}", uri), headers={"Accept": "application/json"}, ) with convert_exceptions(WebfingerError, "Invalid JSON: ", True): return json.loads(content) def get_instance_actor_url(): root_url = conf.supybot.servers.http.publicUrl() if not root_url: return None return urllib.parse.urljoin(root_url, "/fediverse/instance_actor") def _get_private_key(): path = conf.supybot.directories.data.dirize("Fediverse/instance_key.pem") with open(path, "rb") as fd: return serialization.load_pem_private_key( fd.read(), password=None, backend=default_backend() ) def get_public_key(): return _get_private_key().public_key() def get_public_key_pem(): return get_public_key().public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo, ) def signed_request(url, headers=None, data=None): method = "get" if data is None else "post" instance_actor_url = get_instance_actor_url() headers = headers or {} if instance_actor_url: signed_headers = [ ( "(request-target)", method + " " + urllib.parse.urlparse(url).path, ) ] for (header_name, header_value) in headers.items(): signed_headers.append((header_name.lower(), header_value)) signed_text = "\n".join("%s: %s" % header for header in signed_headers) private_key = _get_private_key() signature = private_key.sign( signed_text.encode(), padding.PKCS1v15(), hashes.SHA256() ) headers["Signature"] = ( 'keyId="%s#main-key",' % instance_actor_url + 'headers="%s",' % " ".join(k for (k, v) in signed_headers) + 'signature="%s"' % base64.b64encode(signature).decode() ) with convert_exceptions(ActivityPubProtocolError): return web.getUrlContent(url, headers=headers, data=data) def actor_url(localuser, hostname): uri = "acct:%s@%s" % (localuser, hostname) for link in webfinger(hostname, uri)["links"]: if link["rel"] == "self" and link["type"] == ACTIVITY_MIMETYPE: return link["href"] raise ActorNotFound(localuser, hostname) def get_actor(localuser, hostname): url = actor_url(localuser, hostname) content = signed_request(url, headers={"Accept": ACTIVITY_MIMETYPE}) assert content is not None with convert_exceptions(ActivityPubProtocolError, "Invalid JSON: ", True): return json.loads(content)