From 49930c6d772993ff52b790c933d5ca48d86f7622 Mon Sep 17 00:00:00 2001 From: Andrew Zaborowski Date: Sat, 20 May 2017 01:39:41 +0200 Subject: [PATCH] autotests: Add a SignalLevelAgent test This test takes about 40 seconds. --- autotests/testRSSIAgent/hw.conf | 6 ++ autotests/testRSSIAgent/open.conf | 4 ++ autotests/testRSSIAgent/test.py | 112 ++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+) create mode 100644 autotests/testRSSIAgent/hw.conf create mode 100644 autotests/testRSSIAgent/open.conf create mode 100644 autotests/testRSSIAgent/test.py diff --git a/autotests/testRSSIAgent/hw.conf b/autotests/testRSSIAgent/hw.conf new file mode 100644 index 00000000..3abe2bbe --- /dev/null +++ b/autotests/testRSSIAgent/hw.conf @@ -0,0 +1,6 @@ +[SETUP] +num_radios=2 +max_test_exec_interval_sec=60 + +[HOSTAPD] +rad0=open.conf diff --git a/autotests/testRSSIAgent/open.conf b/autotests/testRSSIAgent/open.conf new file mode 100644 index 00000000..7e2c13dd --- /dev/null +++ b/autotests/testRSSIAgent/open.conf @@ -0,0 +1,4 @@ +hw_mode=g +channel=1 +ssid=TestOpen +utf8_ssid=1 diff --git a/autotests/testRSSIAgent/test.py b/autotests/testRSSIAgent/test.py new file mode 100644 index 00000000..c0813dde --- /dev/null +++ b/autotests/testRSSIAgent/test.py @@ -0,0 +1,112 @@ +#! /usr/bin/python3 + +import unittest +import sys +import time + +sys.path.append('../util') +import iwd +from iwd import IWD +from iwd import NetworkType +from hwsim import Hwsim + +class TstAgent(iwd.SignalAgent): + def handle_new_level(self, path, level): + self.device_path = path + self.level = level + self.calls += 1 + +class Test(unittest.TestCase): + def test_rssi_agent(self): + rule = Hwsim().rules.create() + rule.signal = -4000 + + wd = IWD() + + device = wd.list_devices()[0]; + + # Register agent early to catch any unexpected notifications + agent = TstAgent() + agent.calls = 0 + device.register_signal_agent(agent, [-20, -40, -60, -80]) + + condition = 'not obj.scanning' + wd.wait_for_object_condition(device, condition) + + device.scan() + + condition = 'obj.scanning' + wd.wait_for_object_condition(device, condition) + + condition = 'not obj.scanning' + wd.wait_for_object_condition(device, condition) + + ordered_networks = device.get_ordered_networks() + + self.assertEqual(len(ordered_networks), 1) + ordered_network = ordered_networks[0] + self.assertEqual(ordered_network.name, "TestOpen") + self.assertEqual(ordered_network.type, NetworkType.open) + self.assertEqual(ordered_network.signal_strength, -4000) + + condition = 'not obj.connected' + wd.wait_for_object_condition(ordered_network.network_object, condition) + + ordered_network.network_object.connect() + + condition = 'obj.connected' + wd.wait_for_object_condition(ordered_network.network_object, condition) + + # Test with 5 signal strength levels first, then with 3 + + def test_list(rssi_pairs): + # Roughly test big jumps in RSSI value that cross from one + # level range to another, then with small changes that shouldn't + # cause any notification. + # Allow 3 secs for the agent to receive the final signal + # strength level number and allow more than one notification + # until it reaches the target value because the kernel (mac80211 + # at least) uses a moving-window average value over the last RSSI + # measurements received from the driver and it changes gradually. + # Normally 1 second is enough. + self.assertEqual(agent.calls, 0) + for centre, level in rssi_pairs: + rule.signal = centre + agent.level = -1 + condition = 'obj.level == ' + str(level) + wd.wait_for_object_condition(agent, condition, 3) + + self.assertTrue(agent.calls > 0) + self.assertEqual(agent.device_path, device.device_path) + agent.calls = 0 + + for offset in [-900, 500, -100, 900]: + rule.signal = centre - offset + wd.wait(0.5) + + self.assertEqual(agent.calls, 0) + + test_list([(-7000, 3), (-1000, 0), (-3000, 1), (-5000, 2), + (-7000, 3), (-1000, 0), (-5000, 2)]) + device.unregister_signal_agent(agent) + device.register_signal_agent(agent, [-35, -65]) + test_list([(-1500, 0), (-5000, 1), (-7500, 2), (-1500, 0)]) + + device.disconnect() + + condition = 'not obj.connected' + wd.wait_for_object_condition(ordered_network.network_object, condition) + + self.assertEqual(agent.calls, 0) + device.unregister_signal_agent(agent) + + @classmethod + def setUpClass(cls): + pass + + @classmethod + def tearDownClass(cls): + IWD.clear_storage() + +if __name__ == '__main__': + unittest.main(exit=True)