autotest: EAP-SIM autotest

Included an HLR AuC python implementation that is required by
hostapd. This is what hostapd uses to retrieve SIM card values
over a UNIX socket.
This commit is contained in:
James Prestwood 2017-08-21 14:09:05 -07:00 committed by Denis Kenzior
parent b2fe7fe230
commit 7c61d0365e
7 changed files with 157 additions and 0 deletions

View File

@ -0,0 +1,62 @@
#!/usr/bin/python3
import unittest
import sys
sys.path.append('../util')
import iwd
from iwd import IWD
from iwd import PSKAgent
from iwd import NetworkType
from hlrauc import AuthCenter
class Test(unittest.TestCase):
def test_connection_success(self):
auth = AuthCenter('/tmp/hlrauc.sock', '/tmp/sim.db')
wd = IWD()
devices = wd.list_devices();
self.assertIsNotNone(devices)
device = devices[0]
condition = 'not obj.scanning'
wd.wait_for_object_condition(device, condition)
device.scan()
condition = 'not obj.scanning'
wd.wait_for_object_condition(device, condition)
ordered_networks = device.get_ordered_networks()
ordered_network = ordered_networks[0]
self.assertEqual(ordered_network.name, "ssidEAP-SIM")
self.assertEqual(ordered_network.type, NetworkType.eap)
condition = 'not obj.connected'
wd.wait_for_object_condition(ordered_network.network_object, condition)
ordered_network.network_object.connect()
condition = 'obj.connected'
wd.wait_for_object_condition(ordered_network.network_object, condition)
device.disconnect()
condition = 'not obj.connected'
wd.wait_for_object_condition(ordered_network.network_object, condition)
auth.stop()
@classmethod
def setUpClass(cls):
IWD.copy_to_storage('ssidEAP-SIM.8021x')
@classmethod
def tearDownClass(cls):
IWD.clear_storage()
if __name__ == '__main__':
unittest.main(exit=True)

View File

@ -0,0 +1,7 @@
[SETUP]
num_radios=2
max_test_exec_interval_sec=40
tmpfs_extra_stuff=sim.eap_user:sim.db
[HOSTAPD]
rad0=ssidEAP-SIM.conf

View File

@ -0,0 +1 @@
32010000000000:D0D1D2D3D4D5D6D7:E1E2E3E4:DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD

View File

@ -0,0 +1 @@
"abc@example.com" SIM

View File

@ -0,0 +1,6 @@
[Security]
EAP-Method=SIM
EAP-Identity=abc@example.com
EAP-SIM-Kc=d0d1d2d3d4d5d6d7d0d1d2d3d4d5d6d7d0d1d2d3d4d5d6d7
EAP-SIM-IMSI=132010000000000@example.com
EAP-SIM-SRES=e1e2e3e4e1e2e3e4e1e2e3e4

View File

@ -0,0 +1,16 @@
hw_mode=g
channel=1
driver=nl80211
ieee8021x=1
eap_server=1
ssid=ssidEAP-SIM
eap_user_file=/tmp/sim.eap_user
eap_sim_db=unix:/tmp/hlrauc.sock
wpa=2
wpa_key_mgmt=WPA-EAP
wpa_pairwise=TKIP CCMP
rsn_pairwise=CCMP TKIP
wpa_passphrase=secret123
channel=1
eap_sim_aka_result_ind=1

64
autotests/util/hlrauc.py Normal file
View File

@ -0,0 +1,64 @@
import socket
import os
import threading
from Crypto.Cipher import AES
class AuthCenter:
'''
Home Location Register (HLR) and Authentication Center (AuC) used for
for retrieving SIM/AKA values. This provides a UDP server that
hostapd can communicate with to obtain SIM values.
'''
def __init__(self, sock_path, config_file):
self._read_config(config_file)
self._socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
self._socket.setblocking(0)
if os.path.isfile(sock_path):
os.unlink(sock_path)
self._socket.bind(sock_path)
self._rxhandle = threading.Thread(target=self._rx_thread)
self._rxhandle.shutdown = False
self._rxhandle.start()
def _rx_thread(self):
while (True):
if self._rxhandle.shutdown == True:
break
try:
data, addr = self._socket.recvfrom(1000)
data = data.decode('ascii')
resp = self._process_data(data)
except:
continue
if resp:
self._socket.sendto(bytearray(resp, 'UTF-8'), addr)
def _read_config(self, file):
self._database = {}
with open(file) as f:
for line in f:
if line[0] == '#':
continue
else:
data = line.split(':')
self._database[data[0]] = ':'.join(data[1:])
def _process_data(self, data):
if data[:12] == "SIM-REQ-AUTH":
# SIM requests just return the stored values for the IMSI
imsi, num_chals = data[13:].split(' ')
data = self._database[imsi]
response = "SIM-RESP-AUTH %s" % imsi
response += (' ' + data)*int(num_chals)
return response
def stop(self):
'''
Stop the Authentication server and close the socket
'''
self._rxhandle.shutdown = True
self._rxhandle.join()
self._socket.close()