mirror of
https://github.com/Mikaela/Limnoria.git
synced 2025-02-05 00:54:07 +01:00
eaf7222509
This is much cheaper both for us and the host if the host doesn't support activitypub at all (which is what happens most of the time).
485 lines
18 KiB
Python
485 lines
18 KiB
Python
###
|
|
# Copyright (c) 2020, Valentin Lorentz
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions, and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions, and the following disclaimer in the
|
|
# documentation and/or other materials provided with the distribution.
|
|
# * Neither the name of the author of this software nor the name of
|
|
# contributors to this software may be used to endorse or promote products
|
|
# derived from this software without specific prior written consent.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
|
|
###
|
|
|
|
import os
|
|
import json
|
|
import functools
|
|
import contextlib
|
|
from multiprocessing import Manager
|
|
|
|
from supybot import conf, log, utils
|
|
from supybot.test import ChannelPluginTestCase, network
|
|
|
|
from . import activitypub as ap
|
|
from .test_data import (
|
|
PRIVATE_KEY,
|
|
HOSTMETA_URL,
|
|
HOSTMETA_DATA,
|
|
WEBFINGER_URL,
|
|
WEBFINGER_DATA,
|
|
ACTOR_URL,
|
|
ACTOR_DATA,
|
|
OUTBOX_URL,
|
|
OUTBOX_DATA,
|
|
STATUS_URL,
|
|
STATUS_DATA,
|
|
OUTBOX_FIRSTPAGE_URL,
|
|
OUTBOX_FIRSTPAGE_DATA,
|
|
BOOSTED_URL,
|
|
BOOSTED_DATA,
|
|
BOOSTED_ACTOR_URL,
|
|
BOOSTED_ACTOR_DATA,
|
|
)
|
|
|
|
|
|
class BaseFediverseTestCase(ChannelPluginTestCase):
|
|
config = {
|
|
# Allow snarfing the same URL twice in a row
|
|
"supybot.snarfThrottle": 0.0
|
|
}
|
|
plugins = ("Fediverse",)
|
|
|
|
def setUp(self):
|
|
super().setUp()
|
|
path = conf.supybot.directories.data.dirize(
|
|
"Fediverse/instance_key.pem"
|
|
)
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
with open(path, "wb") as fd:
|
|
fd.write(PRIVATE_KEY)
|
|
|
|
|
|
class NetworkedFediverseTestCase(BaseFediverseTestCase):
|
|
if network:
|
|
|
|
def testNetworkProfile(self):
|
|
self.assertRegexp("profile @val@oc.todon.fr", "0E082B40E4376B1E")
|
|
# TODO: add a test with an instance which only allows fetches
|
|
# with valid signatures.
|
|
|
|
def testNetworkProfileUnknown(self):
|
|
self.assertResponse(
|
|
"profile @nonexistinguser@oc.todon.fr",
|
|
"Error: Unknown user @nonexistinguser@oc.todon.fr.",
|
|
)
|
|
|
|
def testHasWebfingerSupport(self):
|
|
self.assertTrue(ap.has_webfinger_support("oc.todon.fr"))
|
|
self.assertFalse(ap.has_webfinger_support("example.org"))
|
|
|
|
|
|
class NetworklessFediverseTestCase(BaseFediverseTestCase):
|
|
timeout = 0.1
|
|
|
|
@contextlib.contextmanager
|
|
def mockWebfingerSupport(self, value):
|
|
original_has_webfinger_support = ap.has_webfinger_support
|
|
|
|
@functools.wraps(original_has_webfinger_support)
|
|
def newf(hostname):
|
|
if value == "not called":
|
|
assert False
|
|
assert type(value) is bool
|
|
return value
|
|
|
|
ap.has_webfinger_support = newf
|
|
|
|
yield
|
|
|
|
ap.has_webfinger_support = original_has_webfinger_support
|
|
|
|
@contextlib.contextmanager
|
|
def mockRequests(self, expected_requests):
|
|
with Manager() as m:
|
|
expected_requests = m.list(list(expected_requests))
|
|
original_getUrlContent = utils.web.getUrlContent
|
|
|
|
@functools.wraps(original_getUrlContent)
|
|
def newf(url, headers={}, data=None):
|
|
self.assertIsNone(data, "Unexpected POST to %s" % url)
|
|
assert expected_requests, url
|
|
(expected_url, response) = expected_requests.pop(0)
|
|
self.assertEqual(url, expected_url, "Unexpected URL: %s" % url)
|
|
log.debug("Got request to %s", url)
|
|
|
|
if isinstance(response, bytes):
|
|
return response
|
|
elif isinstance(response, Exception):
|
|
raise response
|
|
else:
|
|
assert False, response
|
|
|
|
utils.web.getUrlContent = newf
|
|
|
|
try:
|
|
yield
|
|
finally:
|
|
utils.web.getUrlContent = original_getUrlContent
|
|
|
|
self.assertEqual(
|
|
list(expected_requests), [], "Less requests than expected."
|
|
)
|
|
|
|
def testFeaturedNone(self):
|
|
featured = {
|
|
"@context": "https://www.w3.org/ns/activitystreams",
|
|
"id": "https://example.org/users/someuser/collections/featured",
|
|
"type": "OrderedCollection",
|
|
"orderedItems": [],
|
|
}
|
|
expected_requests = [
|
|
(HOSTMETA_URL, HOSTMETA_DATA),
|
|
(WEBFINGER_URL, WEBFINGER_DATA),
|
|
(ACTOR_URL, ACTOR_DATA),
|
|
(
|
|
"https://example.org/users/someuser/collections/featured",
|
|
json.dumps(featured).encode(),
|
|
),
|
|
]
|
|
with self.mockRequests(expected_requests):
|
|
self.assertResponse(
|
|
"featured @someuser@example.org", "No featured statuses."
|
|
)
|
|
|
|
def testFeaturedSome(self):
|
|
featured = {
|
|
"@context": [
|
|
"https://www.w3.org/ns/activitystreams",
|
|
{
|
|
"ostatus": "http://ostatus.org#",
|
|
"atomUri": "ostatus:atomUri",
|
|
"inReplyToAtomUri": "ostatus:inReplyToAtomUri",
|
|
"conversation": "ostatus:conversation",
|
|
"sensitive": "as:sensitive",
|
|
"toot": "http://joinmastodon.org/ns#",
|
|
"votersCount": "toot:votersCount",
|
|
},
|
|
],
|
|
"id": "https://example.org/users/someuser/collections/featured",
|
|
"type": "OrderedCollection",
|
|
"orderedItems": [
|
|
{
|
|
"id": "https://example.org/users/someuser/statuses/123456789",
|
|
"type": "Note",
|
|
"summary": None,
|
|
"inReplyTo": "https://example.org/users/someuser/statuses/100543712346463856",
|
|
"published": "2018-08-13T15:49:00Z",
|
|
"url": "https://example.org/@someuser/123456789",
|
|
"attributedTo": "https://example.org/users/someuser",
|
|
"to": ["https://example.org/users/someuser/followers"],
|
|
"cc": ["https://www.w3.org/ns/activitystreams#Public"],
|
|
"sensitive": False,
|
|
"atomUri": "https://example.org/users/someuser/statuses/123456789",
|
|
"inReplyToAtomUri": "https://example.org/users/someuser/statuses/100543712346463856",
|
|
"conversation": "tag:example.org,2018-08-13:objectId=3002048:objectType=Conversation",
|
|
"content": "<p>This is a pinned toot</p>",
|
|
"contentMap": {"en": "<p>This is a pinned toot</p>"},
|
|
"attachment": [],
|
|
"tag": [],
|
|
"replies": {
|
|
"id": "https://example.org/users/someuser/statuses/123456789/replies",
|
|
"type": "Collection",
|
|
"first": {
|
|
"type": "CollectionPage",
|
|
"next": "https://example.org/users/someuser/statuses/123456789/replies?min_id=100723569923690076&page=true",
|
|
"partOf": "https://example.org/users/someuser/statuses/123456789/replies",
|
|
"items": [
|
|
"https://example.org/users/someuser/statuses/100723569923690076"
|
|
],
|
|
},
|
|
},
|
|
}
|
|
],
|
|
}
|
|
|
|
expected_requests = [
|
|
(HOSTMETA_URL, HOSTMETA_DATA),
|
|
(WEBFINGER_URL, WEBFINGER_DATA),
|
|
(ACTOR_URL, ACTOR_DATA),
|
|
(
|
|
"https://example.org/users/someuser/collections/featured",
|
|
json.dumps(featured).encode(),
|
|
),
|
|
]
|
|
|
|
with self.mockRequests(expected_requests):
|
|
self.assertRegexp(
|
|
"featured @someuser@example.org", "This is a pinned toot"
|
|
)
|
|
|
|
def testProfile(self):
|
|
expected_requests = [
|
|
(HOSTMETA_URL, HOSTMETA_DATA),
|
|
(WEBFINGER_URL, WEBFINGER_DATA),
|
|
(ACTOR_URL, ACTOR_DATA),
|
|
]
|
|
|
|
with self.mockRequests(expected_requests):
|
|
self.assertResponse(
|
|
"profile @someuser@example.org",
|
|
"\x02someuser\x02 (@someuser@example.org): My Biography",
|
|
)
|
|
|
|
def testProfileSnarfer(self):
|
|
with self.mockWebfingerSupport("not called"), self.mockRequests([]):
|
|
self.assertSnarfNoResponse("aaa @nonexistinguser@example.org bbb")
|
|
|
|
with conf.supybot.plugins.Fediverse.snarfers.username.context(True):
|
|
expected_requests = [
|
|
(HOSTMETA_URL, HOSTMETA_DATA),
|
|
(WEBFINGER_URL, WEBFINGER_DATA),
|
|
(ACTOR_URL, ACTOR_DATA),
|
|
]
|
|
|
|
# First request, should work
|
|
with self.mockWebfingerSupport(True), self.mockRequests(
|
|
expected_requests
|
|
):
|
|
self.assertSnarfResponse(
|
|
"aaa @someuser@example.org bbb",
|
|
"\x02someuser\x02 (@someuser@example.org): My Biography",
|
|
)
|
|
|
|
# Same request; it is all cached
|
|
with self.mockWebfingerSupport("not called"), self.mockRequests(
|
|
[]
|
|
):
|
|
self.assertSnarfResponse(
|
|
"aaa @someuser@example.org bbb",
|
|
"\x02someuser\x02 (@someuser@example.org): My Biography",
|
|
)
|
|
|
|
# Nonexisting user
|
|
|
|
expected_requests = [
|
|
(HOSTMETA_URL, HOSTMETA_DATA),
|
|
(WEBFINGER_URL, utils.web.Error("blah")),
|
|
]
|
|
|
|
with self.mockWebfingerSupport("not called"), self.mockRequests(
|
|
expected_requests
|
|
):
|
|
self.assertSnarfNoResponse(
|
|
"aaa @nonexistinguser@example.org bbb"
|
|
)
|
|
|
|
def testProfileSnarferNoWebfinger(self):
|
|
with conf.supybot.plugins.Fediverse.snarfers.username.context(False):
|
|
# No webfinger support, shouldn't make requests
|
|
with self.mockWebfingerSupport(False), self.mockRequests([]):
|
|
self.assertSnarfNoResponse("aaa @someuser@example.org bbb")
|
|
|
|
def testProfileUrlSnarfer(self):
|
|
with self.mockWebfingerSupport("not called"), self.mockRequests([]):
|
|
self.assertSnarfNoResponse(
|
|
"aaa https://example.org/users/someuser bbb"
|
|
)
|
|
|
|
with conf.supybot.plugins.Fediverse.snarfers.profile.context(True):
|
|
expected_requests = [(ACTOR_URL, utils.web.Error("blah"))]
|
|
|
|
with self.mockWebfingerSupport(True), self.mockRequests(
|
|
expected_requests
|
|
):
|
|
self.assertSnarfNoResponse(
|
|
"aaa https://example.org/users/someuser bbb"
|
|
)
|
|
|
|
expected_requests = [(ACTOR_URL, ACTOR_DATA)]
|
|
|
|
with self.mockWebfingerSupport("not called"), self.mockRequests(
|
|
expected_requests
|
|
):
|
|
self.assertSnarfResponse(
|
|
"aaa https://example.org/users/someuser bbb",
|
|
"\x02someuser\x02 (@someuser@example.org): My Biography",
|
|
)
|
|
|
|
def testProfileUnknown(self):
|
|
expected_requests = [
|
|
(HOSTMETA_URL, HOSTMETA_DATA),
|
|
(WEBFINGER_URL, utils.web.Error("blah")),
|
|
]
|
|
|
|
with self.mockRequests(expected_requests):
|
|
self.assertResponse(
|
|
"profile @nonexistinguser@example.org",
|
|
"Error: Unknown user @nonexistinguser@example.org.",
|
|
)
|
|
|
|
def testStatus(self):
|
|
expected_requests = [
|
|
(STATUS_URL, STATUS_DATA),
|
|
(ACTOR_URL, ACTOR_DATA),
|
|
]
|
|
|
|
with self.mockRequests(expected_requests):
|
|
self.assertResponse(
|
|
"status https://example.org/users/someuser/statuses/1234",
|
|
"\x02someuser\x02 (@someuser@example.org): "
|
|
+ "@ FirstAuthor I am replying to you",
|
|
)
|
|
|
|
def testStatusError(self):
|
|
expected_requests = [(STATUS_URL, utils.web.Error("blah"))]
|
|
|
|
with self.mockRequests(expected_requests):
|
|
self.assertResponse(
|
|
"status https://example.org/users/someuser/statuses/1234",
|
|
"Error: Could not get status: blah",
|
|
)
|
|
|
|
expected_requests = [
|
|
(STATUS_URL, STATUS_DATA),
|
|
(ACTOR_URL, utils.web.Error("blah")),
|
|
]
|
|
|
|
with self.mockRequests(expected_requests):
|
|
self.assertResponse(
|
|
"status https://example.org/users/someuser/statuses/1234",
|
|
"<error: blah>: " + "@ FirstAuthor I am replying to you",
|
|
)
|
|
|
|
def testStatuses(self):
|
|
expected_requests = [
|
|
(HOSTMETA_URL, HOSTMETA_DATA),
|
|
(WEBFINGER_URL, WEBFINGER_DATA),
|
|
(ACTOR_URL, ACTOR_DATA),
|
|
(OUTBOX_URL, OUTBOX_DATA),
|
|
(OUTBOX_FIRSTPAGE_URL, OUTBOX_FIRSTPAGE_DATA),
|
|
(BOOSTED_URL, BOOSTED_DATA),
|
|
(BOOSTED_ACTOR_URL, BOOSTED_ACTOR_DATA),
|
|
]
|
|
|
|
with self.mockRequests(expected_requests):
|
|
self.assertResponse(
|
|
"statuses @someuser@example.org",
|
|
"\x02someuser\x02 (@someuser@example.org): "
|
|
+ "@ FirstAuthor I am replying to you, "
|
|
+ "\x02someuser\x02 (@someuser@example.org): "
|
|
+ "\x02[CW This is a content warning]\x02 "
|
|
+ "This is a status with a content warning, and "
|
|
+ "\x02Boosted User\x02 (@BoostedUser@example.net): "
|
|
+ "Status Content",
|
|
)
|
|
|
|
# The actors are cached from the previous request
|
|
expected_requests = [
|
|
(OUTBOX_URL, OUTBOX_DATA),
|
|
(OUTBOX_FIRSTPAGE_URL, OUTBOX_FIRSTPAGE_DATA),
|
|
(BOOSTED_URL, BOOSTED_DATA),
|
|
]
|
|
|
|
with self.mockRequests(expected_requests):
|
|
with conf.supybot.plugins.Fediverse.format.statuses.showContentWithCW.context(
|
|
False
|
|
):
|
|
self.assertResponse(
|
|
"statuses @someuser@example.org",
|
|
"\x02someuser\x02 (@someuser@example.org): "
|
|
+ "@ FirstAuthor I am replying to you, "
|
|
+ "\x02someuser\x02 (@someuser@example.org): "
|
|
+ "CW This is a content warning, and "
|
|
+ "\x02Boosted User\x02 (@BoostedUser@example.net): "
|
|
+ "Status Content",
|
|
)
|
|
|
|
def testStatusUrlSnarferDisabled(self):
|
|
with self.mockWebfingerSupport("not called"), self.mockRequests([]):
|
|
self.assertSnarfNoResponse(
|
|
"aaa https://example.org/users/someuser/statuses/1234 bbb"
|
|
)
|
|
|
|
def testStatusUrlSnarfer(self):
|
|
with conf.supybot.plugins.Fediverse.snarfers.status.context(True):
|
|
expected_requests = [
|
|
(STATUS_URL, STATUS_DATA),
|
|
(ACTOR_URL, ACTOR_DATA),
|
|
]
|
|
|
|
with self.mockWebfingerSupport(True), self.mockRequests(
|
|
expected_requests
|
|
):
|
|
self.assertSnarfResponse(
|
|
"aaa https://example.org/users/someuser/statuses/1234 bbb",
|
|
"\x02someuser\x02 (@someuser@example.org): "
|
|
+ "@ FirstAuthor I am replying to you",
|
|
)
|
|
|
|
def testStatusUrlSnarferErrors(self):
|
|
with conf.supybot.plugins.Fediverse.snarfers.status.context(True):
|
|
expected_requests = [(STATUS_URL, utils.web.Error("blah"))]
|
|
|
|
with self.mockWebfingerSupport(True), self.mockRequests(
|
|
expected_requests
|
|
):
|
|
self.assertSnarfNoResponse(
|
|
"aaa https://example.org/users/someuser/statuses/1234 bbb"
|
|
)
|
|
|
|
expected_requests = [
|
|
(STATUS_URL, STATUS_DATA),
|
|
(ACTOR_URL, utils.web.Error("blah")),
|
|
]
|
|
|
|
with self.mockWebfingerSupport("not called"), self.mockRequests(
|
|
expected_requests
|
|
):
|
|
self.assertSnarfResponse(
|
|
"aaa https://example.org/users/someuser/statuses/1234 bbb",
|
|
"<error: blah>: @ FirstAuthor I am replying to you",
|
|
)
|
|
|
|
def testSnarferType(self):
|
|
# Sends a request, notices it's a status, gives up
|
|
with conf.supybot.plugins.Fediverse.snarfers.profile.context(True):
|
|
expected_requests = [(STATUS_URL, STATUS_DATA)]
|
|
|
|
with self.mockWebfingerSupport(True), self.mockRequests(
|
|
expected_requests
|
|
):
|
|
self.assertSnarfNoResponse(
|
|
"aaa https://example.org/users/someuser/statuses/1234 bbb"
|
|
)
|
|
|
|
# Sends a request, notices it's a profile, gives up
|
|
with conf.supybot.plugins.Fediverse.snarfers.profile.context(True):
|
|
expected_requests = [(ACTOR_URL, ACTOR_DATA)]
|
|
|
|
with self.mockWebfingerSupport("not called"), self.mockRequests(
|
|
expected_requests
|
|
):
|
|
self.assertSnarfNoResponse(
|
|
"aaa https://example.org/users/someuser/ bbb"
|
|
)
|
|
|
|
|
|
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79:
|