From e87ad5b5a3801eecb0c34b19e972e94b564600c6 Mon Sep 17 00:00:00 2001 From: Valentin Lorentz Date: Sat, 10 Sep 2016 20:16:28 +0200 Subject: [PATCH] Untested implementation of SASL SCRAM auth. --- src/irclib.py | 78 ++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 65 insertions(+), 13 deletions(-) diff --git a/src/irclib.py b/src/irclib.py index 25bc73715..30dd0c29f 100644 --- a/src/irclib.py +++ b/src/irclib.py @@ -35,10 +35,14 @@ import base64 import collections try: - from ecdsa import SigningKey, BadDigestError - ecdsa = True + import ecdsa except ImportError: - ecdsa = False + ecdsa = None + +try: + import pyxmpp2_scram as scram +except ImportError: + scram = None from . import conf, ircdb, ircmsgs, ircutils, log, utils, world from .utils.str import rsplit @@ -994,6 +998,7 @@ class Irc(IrcCommandDispatcher, log.Firewalled): self.sasl_username = network_config.sasl.username() self.sasl_password = network_config.sasl.password() self.sasl_ecdsa_key = network_config.sasl.ecdsa_key() + self.sasl_scram_state = {'step': 'uninitialized'} self.authenticate_decoder = None self.sasl_next_mechanisms = [] self.sasl_current_mechanism = None @@ -1006,6 +1011,9 @@ class Irc(IrcCommandDispatcher, log.Firewalled): network_config.certfile() or conf.supybot.protocols.irc.certfile()): self.sasl_next_mechanisms.append(mechanism) + elif mechanism.startswith('scram-') and scram and \ + self.sasl_username and self.sasl_password: + self.sasl_next_mechanisms.append(mechanism) elif mechanism == 'plain' and \ self.sasl_username and self.sasl_password: self.sasl_next_mechanisms.append(mechanism) @@ -1101,20 +1109,24 @@ class Irc(IrcCommandDispatcher, log.Firewalled): mechanism = self.sasl_current_mechanism if mechanism == 'ecdsa-nist256p-challenge': - if string == b'': - self.sendSaslString(self.sasl_username.encode('utf-8')) - return + self.doAuthenticateEcdsa(string) + elif mechanism == 'external': + self.sendSaslString(b'') + elif mechanism.startswith('scram-'): + step = self.sasl_scram_state['step'] try: - with open(self.sasl_ecdsa_key) as fd: - private_key = SigningKey.from_pem(fd.read()) - authstring = private_key.sign(base64.b64decode(msg.args[0].encode())) - self.sendSaslString(authstring) - except (BadDigestError, OSError, ValueError): + if step == 'uninitialized': + self.doAuthenticateScramFirst() + elif step == 'first-sent': + self.doAuthenticateScramChallenge(string) + elif step == 'final-sent': + self.doAuthenticateScramFinish(string) + else: + assert False + except scram.ScramException: self.sendMsg(ircmsgs.IrcMsg(command='AUTHENTICATE', args=('*',))) self.tryNextSaslMechanism() - elif mechanism == 'external': - self.sendSaslString(b'') elif mechanism == 'plain': authstring = b'\0'.join([ self.sasl_username.encode('utf-8'), @@ -1123,6 +1135,46 @@ class Irc(IrcCommandDispatcher, log.Firewalled): ]) self.sendSaslString(authstring) + def doAuthenticateEcdsa(self, string): + if string == b'': + self.sendSaslString(self.sasl_username.encode('utf-8')) + return + try: + with open(self.sasl_ecdsa_key) as fd: + private_key = ecdsa.SigningKey.from_pem(fd.read()) + authstring = private_key.sign(base64.b64decode(msg.args[0].encode())) + self.sendSaslString(authstring) + except (ecdsa.BadDigestError, OSError, ValueError): + self.sendMsg(ircmsgs.IrcMsg(command='AUTHENTICATE', + args=('*',))) + self.tryNextSaslMechanism() + + def doAuthenticateScramFirst(self): + """Handle sending the client-first message of SCRAM auth.""" + hash_name = mechanism[len('scram-'):] + if hash_name.endswith('-plus'): + hash_name = hash_name[:-len('-plus')] + authenticator = scram.SCRAMClientAuthenticator(hash_name, + channel_binding=False) + self.sasl_scram_state['authenticator'] = authenticator + client_first = authenticator.start({ + 'username': self.sasl_username, + 'password': self.sasl_password, + }) + self.sendSaslString(client_first) + self.sasl_scram_state['step'] = 'first-sent' + + def doAuthenticateScramChallenge(self, challenge): + client_final = self.sasl_scram_state['authenticator'] \ + .challenge(challenge) + self.sasl_scram_state['step'] = 'final-sent' + + def doAuthenticateScramFinish(self, data): + # TODO: do something with BadSuccessException + res = self.sasl_scram_state['authenticator'] \ + .finish(data) + self.sasl_scram_state['step'] = 'authenticated' + def do903(self, msg): log.info('%s: SASL authentication successful', self.network) self.sasl_authenticated = True