3
0
mirror of https://git.kernel.org/pub/scm/network/wireless/iwd.git synced 2025-01-24 22:04:16 +01:00
iwd/autotests/util/hlrauc.py
James Prestwood 14dcda4d59 autotest: EAP-AKA autotest
Implemented milenage algorithm in hlrauc.py. Unlike EAP-SIM, the
authentication center must compute several values to give back
to the server (hostapd). This was already done by IWD as the peer
in EAP-AKA, but was also needed on the server side (HLR AuC).
2017-08-21 18:22:28 -05:00

175 lines
5.3 KiB
Python

import socket
import os
import threading
from Crypto.Cipher import AES
class AuthCenter:
'''
Home Location Register (HLR) and Authentication Center (AuC) used for
for retrieving SIM/AKA values. This provides a UDP server that
hostapd can communicate with to obtain SIM values.
'''
def __init__(self, sock_path, config_file):
self._read_config(config_file)
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self._socket.setblocking(0)
if os.path.isfile(sock_path):
os.unlink(sock_path)
self._socket.bind(sock_path)
self._rxhandle = threading.Thread(target=self._rx_thread)
self._rxhandle.shutdown = False
self._rxhandle.start()
def _rx_thread(self):
while (True):
if self._rxhandle.shutdown == True:
break
try:
data, addr = self._socket.recvfrom(1000)
data = data.decode('ascii')
resp = self._process_data(data)
except:
continue
if resp:
self._socket.sendto(bytearray(resp, 'UTF-8'), addr)
def _read_config(self, file):
self._database = {}
with open(file) as f:
for line in f:
if line[0] == '#':
continue
else:
data = line.split(':')
self._database[data[0]] = ':'.join(data[1:])
def _process_data(self, data):
if data[:12] == "SIM-REQ-AUTH":
# SIM requests just return the stored values for the IMSI
imsi, num_chals = data[13:].split(' ')
data = self._database[imsi]
response = "SIM-RESP-AUTH %s" % imsi
response += (' ' + data)*int(num_chals)
return response
elif data[:12] == "AKA-REQ-AUTH":
# AKA requests must compute the milenage parameters for the IMSI
imsi = data.split(' ')[1]
data = self._database[imsi]
k, opc, amf, sqn = data.split(':')
rand = self._bytetostring(os.urandom(16))
response = "AKA-RESP-AUTH %s " % imsi
return response + self._get_milenage(opc, k, rand, sqn, amf)
def _bytetostring(self, b):
return ''.join(format(x, '02x') for x in b)
def _xor(self, a, b):
ret = bytearray(16)
for i in range(len(a)):
ret[i] = a[i] ^ b[i]
return ret
def _get_milenage(self, opc, k, rand, sqn, amf):
'''
Computes milenage values from OPc, K, RAND, SQN and AMF
Returns a concatenated list (RAND + AUTN + IK + CK + RES) that
will be sent back as the response to the client (hostapd). This
is a python re-write of the function eap_aka_get_milenage() from
src/simutil.c
'''
opc = bytearray.fromhex(opc)
k = bytearray.fromhex(k)
# rand gets returned, so it should be left as a hex string
_rand = bytearray.fromhex(rand)
sqn = bytearray.fromhex(sqn)
amf = bytearray.fromhex(amf)
aes1 = AES.new(bytes(k), AES.MODE_ECB)
tmp1 = self._xor(_rand, opc)
tmp1 = aes1.encrypt(bytes(tmp1))
tmp1 = bytearray(tmp1)
tmp2 = bytearray()
tmp2[0:6] = sqn
tmp2[6:2] = amf
tmp2[9:6] = sqn
tmp2[15:2] = amf
tmp3 = bytearray(16)
for i in range(len(tmp1)):
tmp3[(i + 8) % 16] = tmp2[i] ^ opc[i]
tmp3 = self._xor(tmp3, tmp1)
aes2 = AES.new(bytes(k), AES.MODE_ECB)
tmp1 = aes2.encrypt(bytes(tmp3))
tmp1 = bytearray(tmp1)
tmp1 = self._xor(tmp1, opc)
maca = self._bytetostring(tmp1)
tmp1 = self._xor(_rand, opc)
aes3 = AES.new(bytes(k), AES.MODE_ECB)
tmp2 = aes3.encrypt(bytes(tmp1))
tmp2 = bytearray(tmp2)
tmp1 = self._xor(tmp2, opc)
tmp1[15] ^= 1
aes4 = AES.new(bytes(k), AES.MODE_ECB)
tmp3 = aes4.encrypt(bytes(tmp1))
tmp3 = bytearray(tmp3)
tmp3 = self._xor(tmp3, opc)
res = self._bytetostring(tmp3[8:16])
ak = self._bytetostring(tmp3[0:6])
for i in range(len(tmp1)):
tmp1[(i + 12) % 16] = tmp2[i] ^ opc[i]
tmp1[15] ^= 1 << 1
aes5 = AES.new(bytes(k), AES.MODE_ECB)
tmp1 = aes5.encrypt(bytes(tmp1))
tmp1 = bytearray(tmp1)
tmp1 = self._xor(tmp1, opc)
ck = self._bytetostring(tmp1)
for i in range(len(tmp1)):
tmp1[(i + 8) % 16] = tmp2[i] ^ opc[i]
tmp1[15] ^= 1 << 2
aes6 = AES.new(bytes(k), AES.MODE_ECB)
tmp1 = aes6.encrypt(bytes(tmp1))
tmp1 = bytearray(tmp1)
tmp1 = self._xor(tmp1, opc)
ik = self._bytetostring(tmp1)
tmp1 = bytearray.fromhex(ak)
autn = bytearray(6)
for i in range(0, 6):
autn[i] = sqn[i] ^ tmp1[i]
autn[6:2] = amf
autn[8:8] = bytearray.fromhex(maca)[0:8]
autn = self._bytetostring(autn)
return rand + ' ' + autn + ' ' + ik + ' ' + ck + ' ' + res
def stop(self):
'''
Stop the Authentication server and close the socket
'''
self._rxhandle.shutdown = True
self._rxhandle.join()
self._socket.close()