mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2025-04-12 11:27:51 +02:00
autotests: updated hlrauc.py to support re-sync
If the peer detects a sync error, it sends back AUTS. The authentication center must then re-synchronize and update the SQN it has saved for the given ISMI.
This commit is contained in:
parent
5f3bf1dbac
commit
263074511d
@ -44,7 +44,7 @@ class AuthCenter:
|
|||||||
continue
|
continue
|
||||||
else:
|
else:
|
||||||
data = line.split(':')
|
data = line.split(':')
|
||||||
self._database[data[0]] = ':'.join(data[1:])
|
self._database[data[0]] = data[1:]
|
||||||
|
|
||||||
def _process_data(self, data):
|
def _process_data(self, data):
|
||||||
if data[:12] == "SIM-REQ-AUTH":
|
if data[:12] == "SIM-REQ-AUTH":
|
||||||
@ -58,7 +58,7 @@ class AuthCenter:
|
|||||||
return "ERROR"
|
return "ERROR"
|
||||||
|
|
||||||
response = "SIM-RESP-AUTH %s" % imsi
|
response = "SIM-RESP-AUTH %s" % imsi
|
||||||
response += (' ' + data)*int(num_chals)
|
response += (' ' + ':'.join(data))*int(num_chals)
|
||||||
|
|
||||||
return response
|
return response
|
||||||
elif data[:12] == "AKA-REQ-AUTH":
|
elif data[:12] == "AKA-REQ-AUTH":
|
||||||
@ -69,16 +69,35 @@ class AuthCenter:
|
|||||||
return "ERROR"
|
return "ERROR"
|
||||||
|
|
||||||
# make sure this is an AKA entry
|
# make sure this is an AKA entry
|
||||||
if len(data.split(':')) < 4:
|
if len(data) < 4:
|
||||||
return "ERROR"
|
return "ERROR"
|
||||||
|
|
||||||
k, opc, amf, sqn = data.split(':')
|
k, opc, amf, sqn = data
|
||||||
|
|
||||||
rand = self._bytetostring(os.urandom(16))
|
rand = self._bytetostring(os.urandom(16))
|
||||||
|
|
||||||
response = "AKA-RESP-AUTH %s " % imsi
|
response = "AKA-RESP-AUTH %s " % imsi
|
||||||
|
|
||||||
return response + self._get_milenage(opc, k, rand, sqn, amf)
|
return response + self._get_milenage(opc, k, rand, sqn, amf)
|
||||||
|
elif data[:8] == "AKA-AUTS":
|
||||||
|
# sync error, parse out SQN and reset in database
|
||||||
|
imsi, auts, rand = data[9:].split(' ')
|
||||||
|
|
||||||
|
entry = self._database.get(imsi, None)
|
||||||
|
if not entry:
|
||||||
|
return "ERROR"
|
||||||
|
|
||||||
|
# make sure this is an AKA entry
|
||||||
|
if len(entry) < 4:
|
||||||
|
return "ERROR"
|
||||||
|
|
||||||
|
k, opc, amf, sqn = entry
|
||||||
|
|
||||||
|
# calculate/set new sequence number
|
||||||
|
entry[3] = self._resync_autn(opc, k, rand, auts)
|
||||||
|
self._database[imsi] = entry
|
||||||
|
|
||||||
|
return None
|
||||||
|
|
||||||
def _bytetostring(self, b):
|
def _bytetostring(self, b):
|
||||||
return ''.join(format(x, '02x') for x in b)
|
return ''.join(format(x, '02x') for x in b)
|
||||||
@ -89,6 +108,37 @@ class AuthCenter:
|
|||||||
ret[i] = a[i] ^ b[i]
|
ret[i] = a[i] ^ b[i]
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
def _resync_autn(self, opc, k, rand, auts):
|
||||||
|
opc = bytearray.fromhex(opc)
|
||||||
|
k = bytearray.fromhex(k)
|
||||||
|
rand = bytearray.fromhex(rand)
|
||||||
|
auts = bytearray.fromhex(auts)
|
||||||
|
new_sqn = bytearray(6)
|
||||||
|
ak_star = bytearray(6)
|
||||||
|
|
||||||
|
temp = self._xor(rand, opc)
|
||||||
|
aes1 = AES.new(bytes(k), AES.MODE_ECB)
|
||||||
|
temp = aes1.encrypt(bytes(temp))
|
||||||
|
temp = bytearray(temp)
|
||||||
|
|
||||||
|
out5 = bytearray(16)
|
||||||
|
for i in range(16):
|
||||||
|
out5[(i + 4) % 16] = temp[i] ^ opc[i];
|
||||||
|
|
||||||
|
out5[15] ^= 8
|
||||||
|
|
||||||
|
aes2 = AES.new(bytes(k), AES.MODE_ECB)
|
||||||
|
out5 = aes2.encrypt(bytes(out5))
|
||||||
|
out5 = bytearray(out5)
|
||||||
|
|
||||||
|
for i in range(6):
|
||||||
|
ak_star[i] = out5[i] ^ opc[i]
|
||||||
|
|
||||||
|
for i in range(6):
|
||||||
|
new_sqn[i] = auts[i] ^ ak_star[i]
|
||||||
|
|
||||||
|
return self._bytetostring(new_sqn)
|
||||||
|
|
||||||
def _get_milenage(self, opc, k, rand, sqn, amf):
|
def _get_milenage(self, opc, k, rand, sqn, amf):
|
||||||
'''
|
'''
|
||||||
Computes milenage values from OPc, K, RAND, SQN and AMF
|
Computes milenage values from OPc, K, RAND, SQN and AMF
|
||||||
@ -126,7 +176,7 @@ class AuthCenter:
|
|||||||
tmp1 = bytearray(tmp1)
|
tmp1 = bytearray(tmp1)
|
||||||
|
|
||||||
tmp1 = self._xor(tmp1, opc)
|
tmp1 = self._xor(tmp1, opc)
|
||||||
maca = self._bytetostring(tmp1)
|
maca = self._bytetostring(tmp1[0:8])
|
||||||
|
|
||||||
tmp1 = self._xor(_rand, opc)
|
tmp1 = self._xor(_rand, opc)
|
||||||
aes3 = AES.new(bytes(k), AES.MODE_ECB)
|
aes3 = AES.new(bytes(k), AES.MODE_ECB)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user