mirror of
https://github.com/Mikaela/Limnoria.git
synced 2024-11-22 10:29:25 +01:00
Fix STS parsing and handling of unchecked-TLS connections (#1524)
* ircutils: Fix incorrect log message on invalid STS policy * STS: fix confusion over what a secure connection is irclib computed 'secure_connection' when TLS is enabled and TLS certs are checked; but ircutils used the value to parse STS policies, which should only care about being TLS or not. This commit fixes the incorrect parsing on unchecked-TLS, and triggers a reconnect when a STS policy is encountered in this case, to force TLS certs to be checked before storing the policy. * Accept STS policies when reconnecting after getting it over cleartext ircutils.parseStsPolicy() was passed self.driver.ssl which is the configured value, even though the connection was forced to be TLS temporarily * ci: Lower timeout * Fix typo in test name Co-authored-by: James Lu <james@overdrivenetworks.com> --------- Co-authored-by: James Lu <james@overdrivenetworks.com>
This commit is contained in:
parent
97d67777d6
commit
2902a85dbd
1
.github/workflows/test.yml
vendored
1
.github/workflows/test.yml
vendored
@ -11,6 +11,7 @@ jobs:
|
||||
build:
|
||||
|
||||
runs-on: ${{ matrix.runs-on }}
|
||||
timeout-minutes: 10
|
||||
strategy:
|
||||
matrix:
|
||||
include:
|
||||
|
@ -108,7 +108,7 @@ class ServersMixin(object):
|
||||
|
||||
# The policy was stored, which means it was received on a secure
|
||||
# connection.
|
||||
policy = ircutils.parseStsPolicy(log, policy, secure_connection=True)
|
||||
policy = ircutils.parseStsPolicy(log, policy, tls_connection=True)
|
||||
|
||||
if lastDisconnect + policy['duration'] < time.time():
|
||||
log.info('STS policy expired, removing.')
|
||||
|
@ -2079,11 +2079,13 @@ class Irc(IrcCommandDispatcher, log.Firewalled):
|
||||
self.capUpkeep(msg)
|
||||
|
||||
def _onCapSts(self, policy, msg):
|
||||
tls_connection = self.driver.currentServer.force_tls_verification \
|
||||
or self.driver.ssl
|
||||
secure_connection = self.driver.currentServer.force_tls_verification \
|
||||
or (self.driver.ssl and self.driver.anyCertValidationEnabled())
|
||||
|
||||
parsed_policy = ircutils.parseStsPolicy(
|
||||
log, policy, secure_connection=secure_connection)
|
||||
log, policy, tls_connection=tls_connection)
|
||||
if parsed_policy is None:
|
||||
# There was an error (and it was logged). Ignore it and proceed
|
||||
# with the connection.
|
||||
@ -2106,11 +2108,28 @@ class Irc(IrcCommandDispatcher, log.Firewalled):
|
||||
self.driver.currentServer.hostname,
|
||||
self.driver.currentServer.port,
|
||||
policy)
|
||||
elif self.driver.ssl:
|
||||
# SSL enabled, but certificates are not checked -> reconnect on the
|
||||
# same port and check certificates, before storing the STS policy.
|
||||
hostname = self.driver.currentServer.hostname
|
||||
port = self.driver.currentServer.port
|
||||
attempt = self.driver.currentServer.attempt
|
||||
|
||||
log.info('Got STS policy over insecure TLS connection; '
|
||||
'reconnecting to check certificates. %r',
|
||||
self.driver.currentServer)
|
||||
# Reconnect to the server, but with TLS *and* certificate
|
||||
# validation this time.
|
||||
self.state.fsm.on_shutdown(self, msg)
|
||||
|
||||
self.driver.reconnect(
|
||||
server=Server(hostname, port, attempt, True),
|
||||
wait=True)
|
||||
else:
|
||||
hostname = self.driver.currentServer.hostname
|
||||
attempt = self.driver.currentServer.attempt
|
||||
|
||||
log.info('Got STS policy over insecure connection; '
|
||||
log.info('Got STS policy over insecure (cleartext) connection; '
|
||||
'reconnecting to secure port. %r',
|
||||
self.driver.currentServer)
|
||||
# Reconnect to the server, but with TLS *and* certificate
|
||||
|
@ -1072,28 +1072,27 @@ def parseCapabilityKeyValue(s):
|
||||
|
||||
return d
|
||||
|
||||
|
||||
def parseStsPolicy(logger, policy, secure_connection):
|
||||
def parseStsPolicy(logger, policy, tls_connection):
|
||||
parsed_policy = parseCapabilityKeyValue(policy)
|
||||
|
||||
for key in ('port', 'duration'):
|
||||
if key == 'duration' and not secure_connection:
|
||||
if key == 'duration' and not tls_connection:
|
||||
if key in parsed_policy:
|
||||
del parsed_policy[key]
|
||||
continue
|
||||
elif key == 'port' and secure_connection:
|
||||
elif key == 'port' and tls_connection:
|
||||
if key in parsed_policy:
|
||||
del parsed_policy[key]
|
||||
continue
|
||||
if parsed_policy.get(key) is None:
|
||||
logger.error('Missing or empty "%s" key in STS policy. '
|
||||
'Aborting connection.', key)
|
||||
'Ignoring policy.', key)
|
||||
return None
|
||||
try:
|
||||
parsed_policy[key] = int(parsed_policy[key])
|
||||
except ValueError:
|
||||
logger.error('Expected integer as value for key "%s" in STS '
|
||||
'policy, got %r instead. Aborting connection.',
|
||||
'policy, got %r instead. Ignoring policy.',
|
||||
key, parsed_policy[key])
|
||||
return None
|
||||
|
||||
|
@ -840,79 +840,85 @@ class StsTestCase(SupyTestCase):
|
||||
def tearDown(self):
|
||||
ircdb.networks.networks = {}
|
||||
|
||||
def testStsInSecureConnection(self):
|
||||
def _testStsInSecureConnection(self, cap_value):
|
||||
self.irc.driver.anyCertValidationEnabled.return_value = True
|
||||
self.irc.driver.ssl = True
|
||||
self.irc.driver.currentServer = drivers.Server('irc.test', 6697, None, False)
|
||||
self.irc.feedMsg(ircmsgs.IrcMsg(command='CAP',
|
||||
args=('*', 'LS', 'sts=duration=42,port=12345')))
|
||||
args=('*', 'LS', 'sts=' + cap_value)))
|
||||
|
||||
self.assertEqual(ircdb.networks.getNetwork('test').stsPolicies, {
|
||||
'irc.test': (6697, 'duration=42,port=12345')})
|
||||
'irc.test': (6697, cap_value)})
|
||||
self.irc.driver.reconnect.assert_not_called()
|
||||
|
||||
def testStsInSecureConnectionNoPort(self):
|
||||
def testStsInSecureConnectionWithPort(self):
|
||||
self._testStsInSecureConnection('duration=42,port=12345')
|
||||
|
||||
def testStsInSecureConnectionWithoutPort(self):
|
||||
self._testStsInSecureConnection('duration=42')
|
||||
|
||||
def testStsInSecureConnectionMissingDuration(self):
|
||||
# "A persistence policy, expressed via the duration key. REQUIRED on a
|
||||
# secure connection"
|
||||
self.irc.driver.anyCertValidationEnabled.return_value = True
|
||||
self.irc.driver.ssl = True
|
||||
self.irc.driver.currentServer = drivers.Server('irc.test', 6697, None, False)
|
||||
self.irc.feedMsg(ircmsgs.IrcMsg(command='CAP',
|
||||
args=('*', 'LS', 'sts=duration=42')))
|
||||
args=('*', 'LS', 'sts=port=12345')))
|
||||
|
||||
self.assertEqual(ircdb.networks.getNetwork('test').stsPolicies, {
|
||||
'irc.test': (6697, 'duration=42')})
|
||||
self.assertEqual(ircdb.networks.getNetwork('test').stsPolicies, {})
|
||||
self.irc.driver.reconnect.assert_not_called()
|
||||
|
||||
def testStsInInsecureTlsConnection(self):
|
||||
def _testStsInInsecureTlsConnection(self, cap_value):
|
||||
self.irc.driver.anyCertValidationEnabled.return_value = False
|
||||
self.irc.driver.ssl = True
|
||||
self.irc.driver.currentServer = drivers.Server('irc.test', 6667, None, False)
|
||||
self.irc.driver.currentServer = drivers.Server('irc.test', 6697, None, False)
|
||||
self.irc.feedMsg(ircmsgs.IrcMsg(command='CAP',
|
||||
args=('*', 'LS', 'sts=duration=42,port=6697')))
|
||||
args=('*', 'LS', 'sts=' + cap_value)))
|
||||
|
||||
self.assertEqual(ircdb.networks.getNetwork('test').stsPolicies, {})
|
||||
self.irc.driver.reconnect.assert_called_once_with(
|
||||
server=drivers.Server('irc.test', 6697, None, True),
|
||||
wait=True)
|
||||
|
||||
def testStsInCleartextConnection(self):
|
||||
def testStsInInsecureTlsConnectionWithPort(self):
|
||||
self._testStsInInsecureTlsConnection('duration=42,port=6697')
|
||||
|
||||
def testStsInInsecureTlsConnectionWithoutPort(self):
|
||||
self._testStsInInsecureTlsConnection('duration=42')
|
||||
|
||||
def _testStsInCleartextConnection(self, cap_value):
|
||||
self.irc.driver.anyCertValidationEnabled.return_value = False
|
||||
self.irc.driver.ssl = True
|
||||
self.irc.driver.ssl = False
|
||||
self.irc.driver.currentServer = drivers.Server('irc.test', 6667, None, False)
|
||||
self.irc.feedMsg(ircmsgs.IrcMsg(command='CAP',
|
||||
args=('*', 'LS', 'sts=duration=42,port=6697')))
|
||||
args=('*', 'LS', 'sts=' + cap_value)))
|
||||
|
||||
self.assertEqual(ircdb.networks.getNetwork('test').stsPolicies, {})
|
||||
self.irc.driver.reconnect.assert_called_once_with(
|
||||
server=drivers.Server('irc.test', 6697, None, True),
|
||||
wait=True)
|
||||
|
||||
def testStsInCleartextConnectionWithDuration(self):
|
||||
self._testStsInCleartextConnection('duration=42,port=6697')
|
||||
|
||||
def testStsInCleartextConnectionWithoutDuration(self):
|
||||
self._testStsInCleartextConnection('port=6697')
|
||||
|
||||
def testStsInCleartextConnectionInvalidDuration(self):
|
||||
# "Servers MAY send this key to all clients, but insecurely
|
||||
# connected clients MUST ignore it."
|
||||
self._testStsInCleartextConnection('duration=foo,port=6697')
|
||||
|
||||
def testStsInCleartextConnectionMissingPort(self):
|
||||
self.irc.driver.anyCertValidationEnabled.return_value = False
|
||||
self.irc.driver.ssl = True
|
||||
self.irc.driver.ssl = False
|
||||
self.irc.driver.currentServer = drivers.Server('irc.test', 6667, None, False)
|
||||
self.irc.feedMsg(ircmsgs.IrcMsg(command='CAP',
|
||||
args=('*', 'LS', 'sts=duration=foo,port=6697')))
|
||||
args=('*', 'LS', 'sts=duration=42')))
|
||||
|
||||
self.assertEqual(ircdb.networks.getNetwork('test').stsPolicies, {})
|
||||
self.irc.driver.reconnect.assert_called_once_with(
|
||||
server=drivers.Server('irc.test', 6697, None, True),
|
||||
wait=True)
|
||||
|
||||
def testStsInCleartextConnectionNoDuration(self):
|
||||
# "Servers MAY send this key to all clients, but insecurely
|
||||
# connected clients MUST ignore it."
|
||||
self.irc.driver.anyCertValidationEnabled.return_value = False
|
||||
self.irc.driver.ssl = True
|
||||
self.irc.driver.currentServer = drivers.Server('irc.test', 6667, None, False)
|
||||
self.irc.feedMsg(ircmsgs.IrcMsg(command='CAP',
|
||||
args=('*', 'LS', 'sts=port=6697')))
|
||||
|
||||
self.assertEqual(ircdb.networks.getNetwork('test').stsPolicies, {})
|
||||
self.irc.driver.reconnect.assert_called_once_with(
|
||||
server=drivers.Server('irc.test', 6697, None, True),
|
||||
wait=True)
|
||||
self.irc.driver.reconnect.assert_not_called()
|
||||
|
||||
class IrcTestCase(SupyTestCase):
|
||||
def setUp(self):
|
||||
|
Loading…
Reference in New Issue
Block a user