### # Copyright (c) 2020, Valentin Lorentz # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, # this list of conditions, and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions, and the following disclaimer in the # documentation and/or other materials provided with the distribution. # * Neither the name of the author of this software nor the name of # contributors to this software may be used to endorse or promote products # derived from this software without specific prior written consent. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. ### import os import json import functools import contextlib from multiprocessing import Manager from supybot import commands, conf, utils from supybot.test import ChannelPluginTestCase, network from .test_data import ( PRIVATE_KEY, HOSTMETA_URL, HOSTMETA_DATA, WEBFINGER_URL, WEBFINGER_DATA, ACTOR_URL, ACTOR_DATA, OUTBOX_URL, OUTBOX_DATA, STATUS_URL, STATUS_DATA, OUTBOX_FIRSTPAGE_URL, OUTBOX_FIRSTPAGE_DATA, BOOSTED_URL, BOOSTED_DATA, BOOSTED_ACTOR_URL, BOOSTED_ACTOR_DATA, ) class FediverseTestCase(ChannelPluginTestCase): config = { # Allow snarfing the same URL twice in a row "supybot.snarfThrottle": 0.0 } plugins = ("Fediverse",) def setUp(self): super().setUp() path = conf.supybot.directories.data.dirize( "Fediverse/instance_key.pem" ) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, "wb") as fd: fd.write(PRIVATE_KEY) @contextlib.contextmanager def mockRequests(self, expected_requests): with Manager() as m: expected_requests = m.list(list(expected_requests)) original_getUrlContent = utils.web.getUrlContent @functools.wraps(original_getUrlContent) def newf(url, headers={}, data=None): self.assertIsNone(data, "Unexpected POST to %s" % url) assert expected_requests, url (expected_url, response) = expected_requests.pop(0) self.assertEqual(url, expected_url, "Unexpected URL: %s" % url) if isinstance(response, bytes): return response elif isinstance(response, Exception): raise response else: assert False, response utils.web.getUrlContent = newf try: yield finally: utils.web.getUrlContent = original_getUrlContent self.assertEqual( list(expected_requests), [], "Less requests than expected." ) if network: def testNetworkProfile(self): self.assertRegexp("profile @val@oc.todon.fr", "0E082B40E4376B1E") # TODO: add a test with an instance which only allows fetches # with valid signatures. def testNetworkProfileUnknown(self): self.assertResponse( "profile @nonexistinguser@oc.todon.fr", "Error: Unknown user @nonexistinguser@oc.todon.fr.", ) def testFeaturedNone(self): featured = { "@context": "https://www.w3.org/ns/activitystreams", "id": "https://example.org/users/someuser/collections/featured", "type": "OrderedCollection", "orderedItems": [], } expected_requests = [ (HOSTMETA_URL, HOSTMETA_DATA), (WEBFINGER_URL, WEBFINGER_DATA), (ACTOR_URL, ACTOR_DATA), ( "https://example.org/users/someuser/collections/featured", json.dumps(featured).encode(), ), ] with self.mockRequests(expected_requests): self.assertResponse( "featured @someuser@example.org", "No featured statuses." ) def testFeaturedSome(self): featured = { "@context": [ "https://www.w3.org/ns/activitystreams", { "ostatus": "http://ostatus.org#", "atomUri": "ostatus:atomUri", "inReplyToAtomUri": "ostatus:inReplyToAtomUri", "conversation": "ostatus:conversation", "sensitive": "as:sensitive", "toot": "http://joinmastodon.org/ns#", "votersCount": "toot:votersCount", }, ], "id": "https://example.org/users/someuser/collections/featured", "type": "OrderedCollection", "orderedItems": [ { "id": "https://example.org/users/someuser/statuses/123456789", "type": "Note", "summary": None, "inReplyTo": "https://example.org/users/someuser/statuses/100543712346463856", "published": "2018-08-13T15:49:00Z", "url": "https://example.org/@someuser/123456789", "attributedTo": "https://example.org/users/someuser", "to": ["https://example.org/users/someuser/followers"], "cc": ["https://www.w3.org/ns/activitystreams#Public"], "sensitive": False, "atomUri": "https://example.org/users/someuser/statuses/123456789", "inReplyToAtomUri": "https://example.org/users/someuser/statuses/100543712346463856", "conversation": "tag:example.org,2018-08-13:objectId=3002048:objectType=Conversation", "content": "

This is a pinned toot

", "contentMap": {"en": "

This is a pinned toot

"}, "attachment": [], "tag": [], "replies": { "id": "https://example.org/users/someuser/statuses/123456789/replies", "type": "Collection", "first": { "type": "CollectionPage", "next": "https://example.org/users/someuser/statuses/123456789/replies?min_id=100723569923690076&page=true", "partOf": "https://example.org/users/someuser/statuses/123456789/replies", "items": [ "https://example.org/users/someuser/statuses/100723569923690076" ], }, }, } ], } expected_requests = [ (HOSTMETA_URL, HOSTMETA_DATA), (WEBFINGER_URL, WEBFINGER_DATA), (ACTOR_URL, ACTOR_DATA), ( "https://example.org/users/someuser/collections/featured", json.dumps(featured).encode(), ), ] with self.mockRequests(expected_requests): self.assertRegexp( "featured @someuser@example.org", "This is a pinned toot" ) def testProfile(self): expected_requests = [ (HOSTMETA_URL, HOSTMETA_DATA), (WEBFINGER_URL, WEBFINGER_DATA), (ACTOR_URL, ACTOR_DATA), ] with self.mockRequests(expected_requests): self.assertResponse( "profile @someuser@example.org", "\x02someuser\x02 (@someuser@example.org): My Biography", ) def testProfileSnarfer(self): with self.mockRequests([]): self.assertSnarfNoResponse( "aaa @nonexistinguser@example.org bbb", timeout=1 ) with conf.supybot.plugins.Fediverse.snarfers.username.context(True): expected_requests = [ (HOSTMETA_URL, HOSTMETA_DATA), (WEBFINGER_URL, WEBFINGER_DATA), (ACTOR_URL, ACTOR_DATA), ] with self.mockRequests(expected_requests): self.assertSnarfResponse( "aaa @someuser@example.org bbb", "\x02someuser\x02 (@someuser@example.org): My Biography", ) expected_requests = [ (HOSTMETA_URL, HOSTMETA_DATA), (WEBFINGER_URL, utils.web.Error("blah")), ] with self.mockRequests(expected_requests): self.assertSnarfNoResponse( "aaa @nonexistinguser@example.org bbb", timeout=1 ) def testProfileUrlSnarfer(self): with self.mockRequests([]): self.assertSnarfNoResponse( "aaa https://example.org/users/someuser bbb", timeout=1 ) with conf.supybot.plugins.Fediverse.snarfers.profile.context(True): expected_requests = [(ACTOR_URL, utils.web.Error("blah"))] with self.mockRequests(expected_requests): self.assertSnarfNoResponse( "aaa https://example.org/users/someuser bbb", timeout=1 ) expected_requests = [(ACTOR_URL, ACTOR_DATA)] with self.mockRequests(expected_requests): self.assertSnarfResponse( "aaa https://example.org/users/someuser bbb", "\x02someuser\x02 (@someuser@example.org): My Biography", ) def testProfileUnknown(self): expected_requests = [ (HOSTMETA_URL, HOSTMETA_DATA), (WEBFINGER_URL, utils.web.Error("blah")), ] with self.mockRequests(expected_requests): self.assertResponse( "profile @nonexistinguser@example.org", "Error: Unknown user @nonexistinguser@example.org.", ) def testStatus(self): expected_requests = [ (STATUS_URL, STATUS_DATA), (ACTOR_URL, ACTOR_DATA), ] with self.mockRequests(expected_requests): self.assertResponse( "status https://example.org/users/someuser/statuses/1234", "\x02someuser\x02 (@someuser@example.org): " + "@ FirstAuthor I am replying to you", ) def testStatusError(self): expected_requests = [(STATUS_URL, utils.web.Error("blah"))] with self.mockRequests(expected_requests): self.assertResponse( "status https://example.org/users/someuser/statuses/1234", "Error: Could not get status: blah", ) expected_requests = [ (STATUS_URL, STATUS_DATA), (ACTOR_URL, utils.web.Error("blah")), ] with self.mockRequests(expected_requests): self.assertResponse( "status https://example.org/users/someuser/statuses/1234", ": " + "@ FirstAuthor I am replying to you", ) def testStatuses(self): expected_requests = [ (HOSTMETA_URL, HOSTMETA_DATA), (WEBFINGER_URL, WEBFINGER_DATA), (ACTOR_URL, ACTOR_DATA), (OUTBOX_URL, OUTBOX_DATA), (OUTBOX_FIRSTPAGE_URL, OUTBOX_FIRSTPAGE_DATA), (BOOSTED_URL, BOOSTED_DATA), (BOOSTED_ACTOR_URL, BOOSTED_ACTOR_DATA), ] with self.mockRequests(expected_requests): self.assertResponse( "statuses @someuser@example.org", "\x02someuser\x02 (@someuser@example.org): " + "@ FirstAuthor I am replying to you, " + "\x02someuser\x02 (@someuser@example.org): " + "\x02[CW This is a content warning]\x02 " + "This is a status with a content warning, and " + "\x02Boosted User\x02 (@BoostedUser@example.net): " + "Status Content", ) # The actors are cached from the previous request expected_requests = [ (OUTBOX_URL, OUTBOX_DATA), (OUTBOX_FIRSTPAGE_URL, OUTBOX_FIRSTPAGE_DATA), (BOOSTED_URL, BOOSTED_DATA), ] with self.mockRequests(expected_requests): with conf.supybot.plugins.Fediverse.format.statuses.showContentWithCW.context( False ): self.assertResponse( "statuses @someuser@example.org", "\x02someuser\x02 (@someuser@example.org): " + "@ FirstAuthor I am replying to you, " + "\x02someuser\x02 (@someuser@example.org): " + "CW This is a content warning, and " + "\x02Boosted User\x02 (@BoostedUser@example.net): " + "Status Content", ) def testStatusUrlSnarferDisabled(self): with self.mockRequests([]): self.assertSnarfNoResponse( "aaa https://example.org/users/someuser/statuses/1234 bbb", timeout=1, ) def testStatusUrlSnarfer(self): with conf.supybot.plugins.Fediverse.snarfers.status.context(True): expected_requests = [ (STATUS_URL, STATUS_DATA), (ACTOR_URL, ACTOR_DATA), ] with self.mockRequests(expected_requests): self.assertSnarfResponse( "aaa https://example.org/users/someuser/statuses/1234 bbb", "\x02someuser\x02 (@someuser@example.org): " + "@ FirstAuthor I am replying to you", ) def testStatusUrlSnarferErrors(self): with conf.supybot.plugins.Fediverse.snarfers.status.context(True): expected_requests = [(STATUS_URL, utils.web.Error("blah"))] with self.mockRequests(expected_requests): self.assertSnarfNoResponse( "aaa https://example.org/users/someuser/statuses/1234 bbb", timeout=1, ) expected_requests = [ (STATUS_URL, STATUS_DATA), (ACTOR_URL, utils.web.Error("blah")), ] with self.mockRequests(expected_requests): self.assertSnarfResponse( "aaa https://example.org/users/someuser/statuses/1234 bbb", ": @ FirstAuthor I am replying to you", ) def testSnarferType(self): # Sends a request, notices it's a status, gives up with conf.supybot.plugins.Fediverse.snarfers.profile.context(True): expected_requests = [(STATUS_URL, STATUS_DATA)] with self.mockRequests(expected_requests): self.assertSnarfNoResponse( "aaa https://example.org/users/someuser/statuses/1234 bbb", timeout=1, ) # Sends a request, notices it's a profile, gives up with conf.supybot.plugins.Fediverse.snarfers.profile.context(True): expected_requests = [(ACTOR_URL, ACTOR_DATA)] with self.mockRequests(expected_requests): self.assertSnarfNoResponse( "aaa https://example.org/users/someuser/ bbb", timeout=1 ) # vim:set shiftwidth=4 tabstop=4 expandtab textwidth=79: