Limnoria/plugins/Fediverse/test.py
Valentin Lorentz eaf7222509 Fediverse: Cache host support for webfinger before snarfing.
This is much cheaper both for us and the host if the host
doesn't support activitypub at all (which is what happens
most of the time).
2020-05-14 21:33:34 +02:00

485 lines
18 KiB
Python

###
# Copyright (c) 2020, Valentin Lorentz
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions, and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions, and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of the author of this software nor the name of
# contributors to this software may be used to endorse or promote products
# derived from this software without specific prior written consent.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
###
import os
import json
import functools
import contextlib
from multiprocessing import Manager
from supybot import conf, log, utils
from supybot.test import ChannelPluginTestCase, network
from . import activitypub as ap
from .test_data import (
PRIVATE_KEY,
HOSTMETA_URL,
HOSTMETA_DATA,
WEBFINGER_URL,
WEBFINGER_DATA,
ACTOR_URL,
ACTOR_DATA,
OUTBOX_URL,
OUTBOX_DATA,
STATUS_URL,
STATUS_DATA,
OUTBOX_FIRSTPAGE_URL,
OUTBOX_FIRSTPAGE_DATA,
BOOSTED_URL,
BOOSTED_DATA,
BOOSTED_ACTOR_URL,
BOOSTED_ACTOR_DATA,
)
class BaseFediverseTestCase(ChannelPluginTestCase):
config = {
# Allow snarfing the same URL twice in a row
"supybot.snarfThrottle": 0.0
}
plugins = ("Fediverse",)
def setUp(self):
super().setUp()
path = conf.supybot.directories.data.dirize(
"Fediverse/instance_key.pem"
)
os.makedirs(os.path.dirname(path), exist_ok=True)
with open(path, "wb") as fd:
fd.write(PRIVATE_KEY)
class NetworkedFediverseTestCase(BaseFediverseTestCase):
if network:
def testNetworkProfile(self):
self.assertRegexp("profile @val@oc.todon.fr", "0E082B40E4376B1E")
# TODO: add a test with an instance which only allows fetches
# with valid signatures.
def testNetworkProfileUnknown(self):
self.assertResponse(
"profile @nonexistinguser@oc.todon.fr",
"Error: Unknown user @nonexistinguser@oc.todon.fr.",
)
def testHasWebfingerSupport(self):
self.assertTrue(ap.has_webfinger_support("oc.todon.fr"))
self.assertFalse(ap.has_webfinger_support("example.org"))
class NetworklessFediverseTestCase(BaseFediverseTestCase):
timeout = 0.1
@contextlib.contextmanager
def mockWebfingerSupport(self, value):
original_has_webfinger_support = ap.has_webfinger_support
@functools.wraps(original_has_webfinger_support)
def newf(hostname):
if value == "not called":
assert False
assert type(value) is bool
return value
ap.has_webfinger_support = newf
yield
ap.has_webfinger_support = original_has_webfinger_support
@contextlib.contextmanager
def mockRequests(self, expected_requests):
with Manager() as m:
expected_requests = m.list(list(expected_requests))
original_getUrlContent = utils.web.getUrlContent
@functools.wraps(original_getUrlContent)
def newf(url, headers={}, data=None):
self.assertIsNone(data, "Unexpected POST to %s" % url)
assert expected_requests, url
(expected_url, response) = expected_requests.pop(0)
self.assertEqual(url, expected_url, "Unexpected URL: %s" % url)
log.debug("Got request to %s", url)
if isinstance(response, bytes):
return response
elif isinstance(response, Exception):
raise response
else:
assert False, response
utils.web.getUrlContent = newf
try:
yield
finally:
utils.web.getUrlContent = original_getUrlContent
self.assertEqual(
list(expected_requests), [], "Less requests than expected."
)
def testFeaturedNone(self):
featured = {
"@context": "https://www.w3.org/ns/activitystreams",
"id": "https://example.org/users/someuser/collections/featured",
"type": "OrderedCollection",
"orderedItems": [],
}
expected_requests = [
(HOSTMETA_URL, HOSTMETA_DATA),
(WEBFINGER_URL, WEBFINGER_DATA),
(ACTOR_URL, ACTOR_DATA),
(
"https://example.org/users/someuser/collections/featured",
json.dumps(featured).encode(),
),
]
with self.mockRequests(expected_requests):
self.assertResponse(
"featured @someuser@example.org", "No featured statuses."
)
def testFeaturedSome(self):
featured = {
"@context": [
"https://www.w3.org/ns/activitystreams",
{
"ostatus": "http://ostatus.org#",
"atomUri": "ostatus:atomUri",
"inReplyToAtomUri": "ostatus:inReplyToAtomUri",
"conversation": "ostatus:conversation",
"sensitive": "as:sensitive",
"toot": "http://joinmastodon.org/ns#",
"votersCount": "toot:votersCount",
},
],
"id": "https://example.org/users/someuser/collections/featured",
"type": "OrderedCollection",
"orderedItems": [
{
"id": "https://example.org/users/someuser/statuses/123456789",
"type": "Note",
"summary": None,
"inReplyTo": "https://example.org/users/someuser/statuses/100543712346463856",
"published": "2018-08-13T15:49:00Z",
"url": "https://example.org/@someuser/123456789",
"attributedTo": "https://example.org/users/someuser",
"to": ["https://example.org/users/someuser/followers"],
"cc": ["https://www.w3.org/ns/activitystreams#Public"],
"sensitive": False,
"atomUri": "https://example.org/users/someuser/statuses/123456789",
"inReplyToAtomUri": "https://example.org/users/someuser/statuses/100543712346463856",
"conversation": "tag:example.org,2018-08-13:objectId=3002048:objectType=Conversation",
"content": "<p>This is a pinned toot</p>",
"contentMap": {"en": "<p>This is a pinned toot</p>"},
"attachment": [],
"tag": [],
"replies": {
"id": "https://example.org/users/someuser/statuses/123456789/replies",
"type": "Collection",
"first": {
"type": "CollectionPage",
"next": "https://example.org/users/someuser/statuses/123456789/replies?min_id=100723569923690076&page=true",
"partOf": "https://example.org/users/someuser/statuses/123456789/replies",
"items": [
"https://example.org/users/someuser/statuses/100723569923690076"
],
},
},
}
],
}
expected_requests = [
(HOSTMETA_URL, HOSTMETA_DATA),
(WEBFINGER_URL, WEBFINGER_DATA),
(ACTOR_URL, ACTOR_DATA),
(
"https://example.org/users/someuser/collections/featured",
json.dumps(featured).encode(),
),
]
with self.mockRequests(expected_requests):
self.assertRegexp(
"featured @someuser@example.org", "This is a pinned toot"
)
def testProfile(self):
expected_requests = [
(HOSTMETA_URL, HOSTMETA_DATA),
(WEBFINGER_URL, WEBFINGER_DATA),
(ACTOR_URL, ACTOR_DATA),
]
with self.mockRequests(expected_requests):
self.assertResponse(
"profile @someuser@example.org",
"\x02someuser\x02 (@someuser@example.org): My Biography",
)
def testProfileSnarfer(self):
with self.mockWebfingerSupport("not called"), self.mockRequests([]):
self.assertSnarfNoResponse("aaa @nonexistinguser@example.org bbb")
with conf.supybot.plugins.Fediverse.snarfers.username.context(True):
expected_requests = [
(HOSTMETA_URL, HOSTMETA_DATA),
(WEBFINGER_URL, WEBFINGER_DATA),
(ACTOR_URL, ACTOR_DATA),
]
# First request, should work
with self.mockWebfingerSupport(True), self.mockRequests(
expected_requests
):
self.assertSnarfResponse(
"aaa @someuser@example.org bbb",
"\x02someuser\x02 (@someuser@example.org): My Biography",
)
# Same request; it is all cached
with self.mockWebfingerSupport("not called"), self.mockRequests(
[]
):
self.assertSnarfResponse(
"aaa @someuser@example.org bbb",
"\x02someuser\x02 (@someuser@example.org): My Biography",
)
# Nonexisting user
expected_requests = [
(HOSTMETA_URL, HOSTMETA_DATA),
(WEBFINGER_URL, utils.web.Error("blah")),
]
with self.mockWebfingerSupport("not called"), self.mockRequests(
expected_requests
):
self.assertSnarfNoResponse(
"aaa @nonexistinguser@example.org bbb"
)
def testProfileSnarferNoWebfinger(self):
with conf.supybot.plugins.Fediverse.snarfers.username.context(False):
# No webfinger support, shouldn't make requests
with self.mockWebfingerSupport(False), self.mockRequests([]):
self.assertSnarfNoResponse("aaa @someuser@example.org bbb")
def testProfileUrlSnarfer(self):
with self.mockWebfingerSupport("not called"), self.mockRequests([]):
self.assertSnarfNoResponse(
"aaa https://example.org/users/someuser bbb"
)
with conf.supybot.plugins.Fediverse.snarfers.profile.context(True):
expected_requests = [(ACTOR_URL, utils.web.Error("blah"))]
with self.mockWebfingerSupport(True), self.mockRequests(
expected_requests
):
self.assertSnarfNoResponse(
"aaa https://example.org/users/someuser bbb"
)
expected_requests = [(ACTOR_URL, ACTOR_DATA)]
with self.mockWebfingerSupport("not called"), self.mockRequests(
expected_requests
):
self.assertSnarfResponse(
"aaa https://example.org/users/someuser bbb",
"\x02someuser\x02 (@someuser@example.org): My Biography",
)
def testProfileUnknown(self):
expected_requests = [
(HOSTMETA_URL, HOSTMETA_DATA),
(WEBFINGER_URL, utils.web.Error("blah")),
]
with self.mockRequests(expected_requests):
self.assertResponse(
"profile @nonexistinguser@example.org",
"Error: Unknown user @nonexistinguser@example.org.",
)
def testStatus(self):
expected_requests = [
(STATUS_URL, STATUS_DATA),
(ACTOR_URL, ACTOR_DATA),
]
with self.mockRequests(expected_requests):
self.assertResponse(
"status https://example.org/users/someuser/statuses/1234",
"\x02someuser\x02 (@someuser@example.org): "
+ "@ FirstAuthor I am replying to you",
)
def testStatusError(self):
expected_requests = [(STATUS_URL, utils.web.Error("blah"))]
with self.mockRequests(expected_requests):
self.assertResponse(
"status https://example.org/users/someuser/statuses/1234",
"Error: Could not get status: blah",
)
expected_requests = [
(STATUS_URL, STATUS_DATA),
(ACTOR_URL, utils.web.Error("blah")),
]
with self.mockRequests(expected_requests):
self.assertResponse(
"status https://example.org/users/someuser/statuses/1234",
"<error: blah>: " + "@ FirstAuthor I am replying to you",
)
def testStatuses(self):
expected_requests = [
(HOSTMETA_URL, HOSTMETA_DATA),
(WEBFINGER_URL, WEBFINGER_DATA),
(ACTOR_URL, ACTOR_DATA),
(OUTBOX_URL, OUTBOX_DATA),
(OUTBOX_FIRSTPAGE_URL, OUTBOX_FIRSTPAGE_DATA),
(BOOSTED_URL, BOOSTED_DATA),
(BOOSTED_ACTOR_URL, BOOSTED_ACTOR_DATA),
]
with self.mockRequests(expected_requests):
self.assertResponse(
"statuses @someuser@example.org",
"\x02someuser\x02 (@someuser@example.org): "
+ "@ FirstAuthor I am replying to you, "
+ "\x02someuser\x02 (@someuser@example.org): "
+ "\x02[CW This is a content warning]\x02 "
+ "This is a status with a content warning, and "
+ "\x02Boosted User\x02 (@BoostedUser@example.net): "
+ "Status Content",
)
# The actors are cached from the previous request
expected_requests = [
(OUTBOX_URL, OUTBOX_DATA),
(OUTBOX_FIRSTPAGE_URL, OUTBOX_FIRSTPAGE_DATA),
(BOOSTED_URL, BOOSTED_DATA),
]
with self.mockRequests(expected_requests):
with conf.supybot.plugins.Fediverse.format.statuses.showContentWithCW.context(
False
):
self.assertResponse(
"statuses @someuser@example.org",
"\x02someuser\x02 (@someuser@example.org): "
+ "@ FirstAuthor I am replying to you, "
+ "\x02someuser\x02 (@someuser@example.org): "
+ "CW This is a content warning, and "
+ "\x02Boosted User\x02 (@BoostedUser@example.net): "
+ "Status Content",
)
def testStatusUrlSnarferDisabled(self):
with self.mockWebfingerSupport("not called"), self.mockRequests([]):
self.assertSnarfNoResponse(
"aaa https://example.org/users/someuser/statuses/1234 bbb"
)
def testStatusUrlSnarfer(self):
with conf.supybot.plugins.Fediverse.snarfers.status.context(True):
expected_requests = [
(STATUS_URL, STATUS_DATA),
(ACTOR_URL, ACTOR_DATA),
]
with self.mockWebfingerSupport(True), self.mockRequests(
expected_requests
):
self.assertSnarfResponse(
"aaa https://example.org/users/someuser/statuses/1234 bbb",
"\x02someuser\x02 (@someuser@example.org): "
+ "@ FirstAuthor I am replying to you",
)
def testStatusUrlSnarferErrors(self):
with conf.supybot.plugins.Fediverse.snarfers.status.context(True):
expected_requests = [(STATUS_URL, utils.web.Error("blah"))]
with self.mockWebfingerSupport(True), self.mockRequests(
expected_requests
):
self.assertSnarfNoResponse(
"aaa https://example.org/users/someuser/statuses/1234 bbb"
)
expected_requests = [
(STATUS_URL, STATUS_DATA),
(ACTOR_URL, utils.web.Error("blah")),
]
with self.mockWebfingerSupport("not called"), self.mockRequests(
expected_requests
):
self.assertSnarfResponse(
"aaa https://example.org/users/someuser/statuses/1234 bbb",
"<error: blah>: @ FirstAuthor I am replying to you",
)
def testSnarferType(self):
# Sends a request, notices it's a status, gives up
with conf.supybot.plugins.Fediverse.snarfers.profile.context(True):
expected_requests = [(STATUS_URL, STATUS_DATA)]
with self.mockWebfingerSupport(True), self.mockRequests(
expected_requests
):
self.assertSnarfNoResponse(
"aaa https://example.org/users/someuser/statuses/1234 bbb"
)
# Sends a request, notices it's a profile, gives up
with conf.supybot.plugins.Fediverse.snarfers.profile.context(True):
expected_requests = [(ACTOR_URL, ACTOR_DATA)]
with self.mockWebfingerSupport("not called"), self.mockRequests(
expected_requests
):
self.assertSnarfNoResponse(
"aaa https://example.org/users/someuser/ bbb"
)
# vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79: