mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2024-12-04 19:09:24 +01:00
6a9de526a8
The AuthCenter will now wait for the RX thread to start before continuing with the test. Also removed the non blocking option and fixed the loop to handle a blocking recvfrom call.
261 lines
7.5 KiB
Python
261 lines
7.5 KiB
Python
import socket
|
|
import os
|
|
import threading
|
|
import sys
|
|
import signal
|
|
from Crypto.Cipher import AES
|
|
|
|
class AuthCenter:
|
|
'''
|
|
Home Location Register (HLR) and Authentication Center (AuC) used for
|
|
for retrieving SIM/AKA values. This provides a UDP server that
|
|
hostapd can communicate with to obtain SIM values.
|
|
'''
|
|
def __init__(self, sock_path, config_file):
|
|
self._read_config(config_file)
|
|
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
|
|
if os.path.exists(sock_path):
|
|
os.unlink(sock_path)
|
|
self._socket.bind(sock_path)
|
|
|
|
self._rxhandle = threading.Thread(target=self._rx_thread)
|
|
self._rxhandle.ready = threading.Event()
|
|
self._rxhandle.start()
|
|
|
|
# wait for rx thread to start
|
|
self._rxhandle.ready.wait()
|
|
|
|
def _rx_thread(self):
|
|
self._rxhandle.ready.set()
|
|
while (True):
|
|
try:
|
|
data, addr = self._socket.recvfrom(1000)
|
|
data = data.decode('ascii')
|
|
resp = self._process_data(data)
|
|
except OSError:
|
|
break
|
|
except:
|
|
print("Exception:", sys.exc_info()[0])
|
|
break
|
|
if resp:
|
|
self._socket.sendto(bytearray(resp, 'UTF-8'), addr)
|
|
|
|
def _read_config(self, file):
|
|
self._database = {}
|
|
with open(file) as f:
|
|
for line in f:
|
|
if line[0] == '#':
|
|
continue
|
|
else:
|
|
data = line.strip('\n').split(':')
|
|
self._database[data[0]] = data[1:]
|
|
|
|
def _process_data(self, data):
|
|
if data[:12] == "SIM-REQ-AUTH":
|
|
# SIM requests just return the stored values for the IMSI
|
|
imsi, num_chals = data[13:].split(' ')
|
|
if not imsi or not num_chals:
|
|
return "ERROR"
|
|
|
|
data = self._database.get(imsi, None)
|
|
if not data:
|
|
return "ERROR"
|
|
|
|
response = "SIM-RESP-AUTH %s" % imsi
|
|
response += (' ' + ':'.join(data))*int(num_chals)
|
|
|
|
return response
|
|
elif data[:12] == "AKA-REQ-AUTH":
|
|
# AKA requests must compute the milenage parameters for the IMSI
|
|
imsi = data.split(' ')[1]
|
|
data = self._database.get(imsi, None)
|
|
if not data:
|
|
return "ERROR"
|
|
|
|
# make sure this is an AKA entry
|
|
if len(data) < 4:
|
|
return "ERROR"
|
|
|
|
k, opc, amf, sqn = data
|
|
|
|
rand = self._bytetostring(os.urandom(16))
|
|
|
|
response = "AKA-RESP-AUTH %s " % imsi
|
|
|
|
return response + self._get_milenage(opc, k, rand, sqn, amf)
|
|
elif data[:8] == "AKA-AUTS":
|
|
# sync error, parse out SQN and reset in database
|
|
imsi, auts, rand = data[9:].split(' ')
|
|
|
|
entry = self._database.get(imsi, None)
|
|
if not entry:
|
|
return "ERROR"
|
|
|
|
# make sure this is an AKA entry
|
|
if len(entry) < 4:
|
|
return "ERROR"
|
|
|
|
k, opc, amf, sqn = entry
|
|
|
|
# calculate/set new sequence number
|
|
entry[3] = self._resync_autn(opc, k, rand, auts)
|
|
self._database[imsi] = entry
|
|
|
|
return None
|
|
|
|
def _bytetostring(self, b):
|
|
return ''.join(format(x, '02x') for x in b)
|
|
|
|
def _xor(self, a, b):
|
|
ret = bytearray(16)
|
|
for i in range(len(a)):
|
|
ret[i] = a[i] ^ b[i]
|
|
return ret
|
|
|
|
def _resync_autn(self, opc, k, rand, auts):
|
|
opc = bytearray.fromhex(opc)
|
|
k = bytearray.fromhex(k)
|
|
rand = bytearray.fromhex(rand)
|
|
auts = bytearray.fromhex(auts)
|
|
new_sqn = bytearray(6)
|
|
ak_star = bytearray(6)
|
|
|
|
temp = self._xor(rand, opc)
|
|
aes1 = AES.new(bytes(k), AES.MODE_ECB)
|
|
temp = aes1.encrypt(bytes(temp))
|
|
temp = bytearray(temp)
|
|
|
|
out5 = bytearray(16)
|
|
for i in range(16):
|
|
out5[(i + 4) % 16] = temp[i] ^ opc[i];
|
|
|
|
out5[15] ^= 8
|
|
|
|
aes2 = AES.new(bytes(k), AES.MODE_ECB)
|
|
out5 = aes2.encrypt(bytes(out5))
|
|
out5 = bytearray(out5)
|
|
|
|
for i in range(6):
|
|
ak_star[i] = out5[i] ^ opc[i]
|
|
|
|
for i in range(6):
|
|
new_sqn[i] = auts[i] ^ ak_star[i]
|
|
|
|
return self._bytetostring(new_sqn)
|
|
|
|
def _get_milenage(self, opc, k, rand, sqn, amf):
|
|
'''
|
|
Computes milenage values from OPc, K, RAND, SQN and AMF
|
|
Returns a concatenated list (RAND + AUTN + IK + CK + RES) that
|
|
will be sent back as the response to the client (hostapd). This
|
|
is a python re-write of the function eap_aka_get_milenage() from
|
|
src/simutil.c
|
|
'''
|
|
opc = bytearray.fromhex(opc)
|
|
k = bytearray.fromhex(k)
|
|
# rand gets returned, so it should be left as a hex string
|
|
_rand = bytearray.fromhex(rand)
|
|
sqn = bytearray.fromhex(sqn)
|
|
amf = bytearray.fromhex(amf)
|
|
|
|
aes1 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp1 = self._xor(_rand, opc)
|
|
tmp1 = aes1.encrypt(bytes(tmp1))
|
|
tmp1 = bytearray(tmp1)
|
|
|
|
tmp2 = bytearray()
|
|
tmp2[0:6] = sqn
|
|
tmp2[6:2] = amf
|
|
tmp2[9:6] = sqn
|
|
tmp2[15:2] = amf
|
|
|
|
tmp3 = bytearray(16)
|
|
for i in range(len(tmp1)):
|
|
tmp3[(i + 8) % 16] = tmp2[i] ^ opc[i]
|
|
|
|
tmp3 = self._xor(tmp3, tmp1)
|
|
|
|
aes2 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp1 = aes2.encrypt(bytes(tmp3))
|
|
tmp1 = bytearray(tmp1)
|
|
|
|
tmp1 = self._xor(tmp1, opc)
|
|
maca = self._bytetostring(tmp1[0:8])
|
|
|
|
tmp1 = self._xor(_rand, opc)
|
|
aes3 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp2 = aes3.encrypt(bytes(tmp1))
|
|
tmp2 = bytearray(tmp2)
|
|
|
|
tmp1 = self._xor(tmp2, opc)
|
|
tmp1[15] ^= 1
|
|
|
|
aes4 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp3 = aes4.encrypt(bytes(tmp1))
|
|
tmp3 = bytearray(tmp3)
|
|
|
|
tmp3 = self._xor(tmp3, opc)
|
|
|
|
res = self._bytetostring(tmp3[8:16])
|
|
ak = self._bytetostring(tmp3[0:6])
|
|
|
|
for i in range(len(tmp1)):
|
|
tmp1[(i + 12) % 16] = tmp2[i] ^ opc[i]
|
|
|
|
tmp1[15] ^= 1 << 1
|
|
aes5 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp1 = aes5.encrypt(bytes(tmp1))
|
|
tmp1 = bytearray(tmp1)
|
|
|
|
tmp1 = self._xor(tmp1, opc)
|
|
ck = self._bytetostring(tmp1)
|
|
|
|
for i in range(len(tmp1)):
|
|
tmp1[(i + 8) % 16] = tmp2[i] ^ opc[i]
|
|
|
|
tmp1[15] ^= 1 << 2
|
|
aes6 = AES.new(bytes(k), AES.MODE_ECB)
|
|
tmp1 = aes6.encrypt(bytes(tmp1))
|
|
tmp1 = bytearray(tmp1)
|
|
tmp1 = self._xor(tmp1, opc)
|
|
ik = self._bytetostring(tmp1)
|
|
|
|
tmp1 = bytearray.fromhex(ak)
|
|
autn = bytearray(6)
|
|
for i in range(0, 6):
|
|
autn[i] = sqn[i] ^ tmp1[i]
|
|
|
|
autn[6:2] = amf
|
|
autn[8:8] = bytearray.fromhex(maca)[0:8]
|
|
|
|
autn = self._bytetostring(autn)
|
|
|
|
return rand + ' ' + autn + ' ' + ik + ' ' + ck + ' ' + res
|
|
|
|
def stop(self):
|
|
'''
|
|
Stop the Authentication server and close the socket
|
|
'''
|
|
self._socket.shutdown(socket.SHUT_RDWR)
|
|
self._socket.close()
|
|
self._rxhandle.join()
|
|
|
|
if __name__ == '__main__':
|
|
'''
|
|
This will run in a stand-alone mode for testing
|
|
'''
|
|
if len(sys.argv) < 3:
|
|
print('Usage: ./hlrauc.py <sock_path> <config>')
|
|
sys.exit()
|
|
|
|
hlrauc = AuthCenter(sys.argv[1], sys.argv[2])
|
|
|
|
def signal_handler(signal, frame):
|
|
print('Exiting...')
|
|
hlrauc.stop()
|
|
sys.exit()
|
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
signal.pause()
|