mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2025-01-24 05:14:05 +01:00
32907c10df
For testing purposes, it is useful to run hlrauc.py by itself not including it from another python script like autotests do. Better error checking was also added as testing can result in badly formatted data.
207 lines
6.0 KiB
Python
207 lines
6.0 KiB
Python
import socket
|
|
import os
|
|
import threading
|
|
import sys
|
|
import signal
|
|
from Crypto.Cipher import AES
|
|
|
|
class AuthCenter:
|
|
'''
|
|
Home Location Register (HLR) and Authentication Center (AuC) used for
|
|
for retrieving SIM/AKA values. This provides a UDP server that
|
|
hostapd can communicate with to obtain SIM values.
|
|
'''
|
|
def __init__(self, sock_path, config_file):
|
|
self._read_config(config_file)
|
|
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
self._socket.setblocking(0)
|
|
if os.path.exists(sock_path):
|
|
os.unlink(sock_path)
|
|
self._socket.bind(sock_path)
|
|
|
|
self._rxhandle = threading.Thread(target=self._rx_thread)
|
|
self._rxhandle.shutdown = False
|
|
self._rxhandle.start()
|
|
|
|
def _rx_thread(self):
|
|
while (True):
|
|
if self._rxhandle.shutdown == True:
|
|
break
|
|
try:
|
|
data, addr = self._socket.recvfrom(1000)
|
|
data = data.decode('ascii')
|
|
resp = self._process_data(data)
|
|
except:
|
|
continue
|
|
if resp:
|
|
self._socket.sendto(bytearray(resp, 'UTF-8'), addr)
|
|
|
|
def _read_config(self, file):
|
|
self._database = {}
|
|
with open(file) as f:
|
|
for line in f:
|
|
if line[0] == '#':
|
|
continue
|
|
else:
|
|
data = line.split(':')
|
|
self._database[data[0]] = ':'.join(data[1:])
|
|
|
|
def _process_data(self, data):
|
|
if data[:12] == "SIM-REQ-AUTH":
|
|
# SIM requests just return the stored values for the IMSI
|
|
imsi, num_chals = data[13:].split(' ')
|
|
if not imsi or not num_chals:
|
|
return "ERROR"
|
|
|
|
data = self._database.get(imsi, None)
|
|
if not data:
|
|
return "ERROR"
|
|
|
|
response = "SIM-RESP-AUTH %s" % imsi
|
|
response += (' ' + data)*int(num_chals)
|
|
|
|
return response
|
|
elif data[:12] == "AKA-REQ-AUTH":
|
|
# AKA requests must compute the milenage parameters for the IMSI
|
|
imsi = data.split(' ')[1]
|
|
data = self._database.get(imsi, None)
|
|
if not data:
|
|
return "ERROR"
|
|
|
|
# make sure this is an AKA entry
|
|
if len(data.split(':')) < 4:
|
|
return "ERROR"
|
|
|
|
k, opc, amf, sqn = data.split(':')
|
|
|
|
rand = self._bytetostring(os.urandom(16))
|
|
|
|
response = "AKA-RESP-AUTH %s " % imsi
|
|
|
|
return response + self._get_milenage(opc, k, rand, sqn, amf)
|
|
|
|
def _bytetostring(self, b):
|
|
return ''.join(format(x, '02x') for x in b)
|
|
|
|
def _xor(self, a, b):
|
|
ret = bytearray(16)
|
|
for i in range(len(a)):
|
|
ret[i] = a[i] ^ b[i]
|
|
return ret
|
|
|
|
def _get_milenage(self, opc, k, rand, sqn, amf):
|
|
'''
|
|
Computes milenage values from OPc, K, RAND, SQN and AMF
|
|
Returns a concatenated list (RAND + AUTN + IK + CK + RES) that
|
|
will be sent back as the response to the client (hostapd). This
|
|
is a python re-write of the function eap_aka_get_milenage() from
|
|
src/simutil.c
|
|
'''
|
|
opc = bytearray.fromhex(opc)
|
|
k = bytearray.fromhex(k)
|
|
# rand gets returned, so it should be left as a hex string
|
|
_rand = bytearray.fromhex(rand)
|
|
sqn = bytearray.fromhex(sqn)
|
|
amf = bytearray.fromhex(amf)
|
|
|
|
aes1 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp1 = self._xor(_rand, opc)
|
|
tmp1 = aes1.encrypt(bytes(tmp1))
|
|
tmp1 = bytearray(tmp1)
|
|
|
|
tmp2 = bytearray()
|
|
tmp2[0:6] = sqn
|
|
tmp2[6:2] = amf
|
|
tmp2[9:6] = sqn
|
|
tmp2[15:2] = amf
|
|
|
|
tmp3 = bytearray(16)
|
|
for i in range(len(tmp1)):
|
|
tmp3[(i + 8) % 16] = tmp2[i] ^ opc[i]
|
|
|
|
tmp3 = self._xor(tmp3, tmp1)
|
|
|
|
aes2 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp1 = aes2.encrypt(bytes(tmp3))
|
|
tmp1 = bytearray(tmp1)
|
|
|
|
tmp1 = self._xor(tmp1, opc)
|
|
maca = self._bytetostring(tmp1)
|
|
|
|
tmp1 = self._xor(_rand, opc)
|
|
aes3 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp2 = aes3.encrypt(bytes(tmp1))
|
|
tmp2 = bytearray(tmp2)
|
|
|
|
tmp1 = self._xor(tmp2, opc)
|
|
tmp1[15] ^= 1
|
|
|
|
aes4 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp3 = aes4.encrypt(bytes(tmp1))
|
|
tmp3 = bytearray(tmp3)
|
|
|
|
tmp3 = self._xor(tmp3, opc)
|
|
|
|
res = self._bytetostring(tmp3[8:16])
|
|
ak = self._bytetostring(tmp3[0:6])
|
|
|
|
for i in range(len(tmp1)):
|
|
tmp1[(i + 12) % 16] = tmp2[i] ^ opc[i]
|
|
|
|
tmp1[15] ^= 1 << 1
|
|
aes5 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp1 = aes5.encrypt(bytes(tmp1))
|
|
tmp1 = bytearray(tmp1)
|
|
|
|
tmp1 = self._xor(tmp1, opc)
|
|
ck = self._bytetostring(tmp1)
|
|
|
|
for i in range(len(tmp1)):
|
|
tmp1[(i + 8) % 16] = tmp2[i] ^ opc[i]
|
|
|
|
tmp1[15] ^= 1 << 2
|
|
aes6 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp1 = aes6.encrypt(bytes(tmp1))
|
|
tmp1 = bytearray(tmp1)
|
|
tmp1 = self._xor(tmp1, opc)
|
|
ik = self._bytetostring(tmp1)
|
|
|
|
tmp1 = bytearray.fromhex(ak)
|
|
autn = bytearray(6)
|
|
for i in range(0, 6):
|
|
autn[i] = sqn[i] ^ tmp1[i]
|
|
|
|
autn[6:2] = amf
|
|
autn[8:8] = bytearray.fromhex(maca)[0:8]
|
|
|
|
autn = self._bytetostring(autn)
|
|
|
|
return rand + ' ' + autn + ' ' + ik + ' ' + ck + ' ' + res
|
|
|
|
def stop(self):
|
|
'''
|
|
Stop the Authentication server and close the socket
|
|
'''
|
|
self._rxhandle.shutdown = True
|
|
self._rxhandle.join()
|
|
self._socket.close()
|
|
|
|
if __name__ == '__main__':
|
|
'''
|
|
This will run in a stand-alone mode for testing
|
|
'''
|
|
if len(sys.argv) < 3:
|
|
print('Usage: ./hlrauc.py <sock_path> <config>')
|
|
sys.exit()
|
|
|
|
hlrauc = AuthCenter(sys.argv[1], sys.argv[2])
|
|
|
|
def signal_handler(signal, frame):
|
|
print('Exiting...')
|
|
hlrauc.stop()
|
|
sys.exit()
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
signal.pause()
|