3
0
mirror of https://git.kernel.org/pub/scm/network/wireless/iwd.git synced 2024-12-23 05:52:52 +01:00
iwd/autotests/util/hwsim.py
James Prestwood e2aca6e917 auto-t: correctly import Mapping from collections
The current way this was being done was to import collections and
use collections.Mapping. This has been deprecated since python 3.3
but has worked up until python 3.10. After python 3.10 this will
no longer work, and Mapping must be imported from collections.abc.
2022-06-01 11:01:50 -05:00

487 lines
15 KiB
Python
Executable File

#!/usr/bin/python3
import dbus
import sys
from collections.abc import Mapping
from weakref import WeakValueDictionary
from abc import ABCMeta, abstractmethod
from enum import Enum
from scapy.all import *
from scapy.contrib.wpa_eapol import WPA_key
import iwd
from config import ctx
HWSIM_SERVICE = 'net.connman.hwsim'
HWSIM_RULE_MANAGER_INTERFACE = 'net.connman.hwsim.RuleManager'
HWSIM_RULE_INTERFACE = 'net.connman.hwsim.Rule'
HWSIM_RADIO_MANAGER_INTERFACE = 'net.connman.hwsim.RadioManager'
HWSIM_RADIO_INTERFACE = 'net.connman.hwsim.Radio'
HWSIM_INTERFACE_INTERFACE = 'net.connman.hwsim.Interface'
HWSIM_AGENT_MANAGER_PATH = '/'
class HwsimDBusAbstract(iwd.AsyncOpAbstract):
__metaclass__ = ABCMeta
def __init__(self, object_path, properties = None, namespace=ctx):
self._bus = namespace.get_bus()
self._object_path = object_path
proxy = self._bus.get_object(HWSIM_SERVICE, self._object_path)
self._iface = dbus.Interface(proxy, self._iface_name)
self._prop_proxy = dbus.Interface(proxy, iwd.DBUS_PROPERTIES)
if properties is None:
self._properties = self._prop_proxy.GetAll(self._iface_name)
else:
self._properties = properties
self._prop_proxy.connect_to_signal("PropertiesChanged",
self._property_changed_handler, path_keyword="path")
def _property_changed_handler(self, interface, changed, invalidated, path):
if interface == self._iface_name and path == self._object_path:
for name, value in changed.items():
self._properties[name] = value
@abstractmethod
def __str__(self):
pass
@property
def path(self):
return self._object_path
class Rule(HwsimDBusAbstract):
_iface_name = HWSIM_RULE_INTERFACE
@property
def source(self):
return self._properties['Source']
@source.setter
def source(self, value):
self._prop_proxy.Set(self._iface_name, 'Source', value, reply_handler=self._success,
error_handler=self._failure)
self._wait_for_async_op()
@property
def destination(self):
return self._properties['Destination']
@destination.setter
def destination(self, value):
self._prop_proxy.Set(self._iface_name, 'Destination', value, reply_handler=self._success,
error_handler=self._failure)
self._wait_for_async_op()
@property
def bidirectional(self):
return bool(self._properties['Bidirectional'])
@bidirectional.setter
def bidirectional(self, value):
self._prop_proxy.Set(self._iface_name, 'Bidirectional',
dbus.Boolean(value), reply_handler=self._success, error_handler=self._failure)
self._wait_for_async_op()
@property
def frequency(self):
return int(self._properties['Frequency'])
@frequency.setter
def frequency(self, value):
self._prop_proxy.Set(self._iface_name, 'Frequency',
dbus.UInt32(value), reply_handler=self._success, error_handler=self._failure)
self._wait_for_async_op()
@property
def priority(self):
return int(self._properties['Priority'])
@priority.setter
def priority(self, value):
self._prop_proxy.Set(self._iface_name, 'Priority',
dbus.Int16(value), reply_handler=self._success, error_handler=self._failure)
self._wait_for_async_op()
@property
def signal(self):
return int(self._properties['SignalStrength'])
@signal.setter
def signal(self, value):
self._prop_proxy.Set(self._iface_name, 'SignalStrength',
dbus.Int16(value), reply_handler=self._success, error_handler=self._failure)
self._wait_for_async_op()
@property
def drop(self):
return bool(self._properties['Drop'])
@drop.setter
def drop(self, value):
self._prop_proxy.Set(self._iface_name, 'Drop', dbus.Boolean(value),
reply_handler=self._success, error_handler=self._failure)
self._wait_for_async_op()
@property
def delay(self):
return int(self._properties['Delay'])
@delay.setter
def delay(self, value):
self._prop_proxy.Set(self._iface_name, 'Delay', dbus.UInt32(value),
reply_handler=self._success, error_handler=self._failure)
self._wait_for_async_op()
@property
def prefix(self):
return self._properties['Prefix']
@prefix.setter
def prefix(self, value):
self._prop_proxy.Set(self._iface_name, 'Prefix', dbus.ByteArray.fromhex(value),
reply_handler=self._success, error_handler=self._failure)
self._wait_for_async_op()
@property
def enabled(self):
return self._properties['Enabled']
@enabled.setter
def enabled(self, value):
self._prop_proxy.Set(self._iface_name, 'Enabled', value,
reply_handler=self._success, error_handler=self._failure)
self._wait_for_async_op()
@property
def match_times(self):
return self._properties['MatchTimes']
@match_times.setter
def match_times(self, value):
self._prop_proxy.Set(self._iface_name, 'MatchTimes', dbus.UInt16(value))
@property
def drop_ack(self):
return self._properties(['DropAck'])
@drop_ack.setter
def drop_ack(self, value):
self._prop_proxy.Set(self._iface_name, 'DropAck', value)
@property
def match(self):
return self._properties['MatchBytes']
@match.setter
def match(self, value):
self._prop_proxy.Set(self._iface_name, 'MatchBytes', dbus.ByteArray.fromhex(value))
@property
def match_offset(self):
return self._properties(['MatchBytesOffset'])
@match_offset.setter
def match_offset(self, value):
self._prop_proxy.Set(self._iface_name, 'MatchBytesOffset', dbus.UInt16(value))
def remove(self):
self._iface.Remove(reply_handler=self._success,
error_handler=self._failure)
self._wait_for_async_op()
def __del__(self):
self.remove()
def __str__(self, prefix = ''):
return prefix + 'Rule: ' + self.path + '\n' + \
prefix + '\tSource:\t\t' + self.source + '\n' + \
prefix + '\tDestination:\t' + self.destination + '\n' + \
prefix + '\tBidirectional:\t' + \
str(self.bidirectional) + '\n' + \
prefix + '\tPriority:\t' + str(self.priority) + '\n' +\
prefix + '\tFrequency:\t' + str(self.frequency) + '\n' + \
prefix + '\tApply rssi:\t' + str(self.signal) + '\n' + \
prefix + '\tApply drop:\t' + str(self.drop) + '\n' + \
prefix + '\tPrefix:\t' + str([hex(b) for b in self.prefix]) + '\n' + \
prefix + '\tDelay:\t' + str(self.delay) + '\n' + \
prefix + '\tEnabled:\t' + str(self.enabled) + '\n'
class RuleSet(Mapping):
def __init__(self, hwsim, objects):
self._dict = {}
self._rule_manager = hwsim.rule_manager
hwsim.object_manager.connect_to_signal("InterfacesAdded",
self._interfaces_added_handler, HWSIM_RULE_INTERFACE)
hwsim.object_manager.connect_to_signal("InterfacesRemoved",
self._interfaces_removed_handler, HWSIM_RULE_INTERFACE)
for path in objects:
for interface in objects[path]:
if interface == HWSIM_RULE_INTERFACE:
self._dict[path] = Rule(path, objects[path][interface])
def __getitem__(self, key):
return self._dict.__getitem__(key)
def __iter__(self):
return self._dict.__iter__()
def __len__(self):
return self._dict.__len__()
def __delitem__(self, key):
self._dict.pop(key).remove()
def _interfaces_added_handler(self, path, interfaces):
self._dict[path] = Rule(interfaces[HWSIM_RULE_INTERFACE])
def _interfaces_removed_handler(self, path, interfaces):
del _dict[path]
def create(self):
path = self._rule_manager.AddRule()
obj = Rule(path)
self._dict[path] = obj
return obj
def remove_all(self):
for rule in self._dict.values():
rule.remove()
class Radio(HwsimDBusAbstract):
_iface_name = HWSIM_RADIO_INTERFACE
@property
def name(self):
return self._properties['Name']
@property
def addresses(self):
return [str(addr) for addr in self._properties['Addresses']]
def remove(self):
self._iface.Destroy(reply_handler=self._success,
error_handler=self._failure)
self._wait_for_async_op()
def __str__(self, prefix = ''):
return prefix + 'Radio: ' + self.path + '\n' + \
prefix + '\tName:\t\t' + self.name + '\n' + \
prefix + '\tAddresses:\t' + repr(self.destination) + '\n'
class RadioList(Mapping):
def __init__(self, hwsim, objects):
self._dict = {}
self._radio_manager = hwsim.radio_manager
hwsim.object_manager.connect_to_signal("InterfacesAdded",
self._interfaces_added_handler, HWSIM_RADIO_INTERFACE)
hwsim.object_manager.connect_to_signal("InterfacesRemoved",
self._interfaces_removed_handler, HWSIM_RADIO_INTERFACE)
for path in objects:
for interface in objects[path]:
if interface == HWSIM_RADIO_INTERFACE:
self._dict[path] = Radio(path, objects[path][interface])
def __getitem__(self, key):
return self._dict.__getitem__(key)
def __iter__(self):
return self._dict.__iter__()
def __len__(self):
return self._dict.__len__()
def __delitem__(self, key):
self._dict.pop(key).remove()
def values(self):
return self._dict.values()
def _interfaces_added_handler(self, path, interfaces):
self._dict[path] = Radio(interfaces[HWSIM_RADIO_INTERFACE])
def _interfaces_removed_handler(self, path, interfaces):
del _dict[path]
def create(self, name, p2p_device=False, iftype_disable=None,
cipher_disable=None, wait=True):
args = dbus.Dictionary({
'Name': name,
'P2P': p2p_device,
'NoVirtualInterface': True,
}, signature='sv')
if iftype_disable:
args['InterfaceTypeDisable'] = iftype_disable
if cipher_disable:
args['CipherTypeDisable'] = cipher_disable
if not wait:
self._radio_manager.CreateRadio(args, reply_handler=self._success,
error_handler=self._failure)
return None
path = self._radio_manager.CreateRadio(args)
obj = Radio(path)
self._dict[path] = obj
return obj
def _success(self, bla):
pass
def _failure(self, ex):
pass
class Hwsim(iwd.AsyncOpAbstract):
_instances = WeakValueDictionary()
def __new__(cls, namespace=ctx):
key = id(namespace)
if key not in cls._instances.keys():
obj = object.__new__(cls)
obj._initialized = False
cls._instances[key] = obj
return cls._instances[key]
def __init__(self, namespace=ctx):
if self._initialized:
return
self._initialized = True
self._bus = namespace.get_bus()
self._rule_manager_if = dbus.Interface(
self._bus.get_object(HWSIM_SERVICE, '/'),
HWSIM_RULE_MANAGER_INTERFACE)
self._radio_manager_if = dbus.Interface(
self._bus.get_object(HWSIM_SERVICE, '/'),
HWSIM_RADIO_MANAGER_INTERFACE)
self._object_manager_if = dbus.Interface(
self._bus.get_object(HWSIM_SERVICE, '/'),
iwd.DBUS_OBJECT_MANAGER)
objects = self.object_manager.GetManagedObjects()
self._rules = RuleSet(self, objects)
self._radios = RadioList(self, objects)
@property
def rules(self):
return self._rules
@property
def rule_manager(self):
return self._rule_manager_if
@property
def radios(self):
return self._radios
@property
def radio_manager(self):
return self._radio_manager_if
@property
def object_manager(self):
return self._object_manager_if
def spoof_disassociate(self, radio, freq, station):
'''
Send a spoofed disassociate frame to a station
'''
frame = Dot11()/Dot11Disas(reason=7)
frame[Dot11].addr1 = station
frame[Dot11].addr2 = radio.addresses[0]
frame[Dot11].addr3 = radio.addresses[0]
self.spoof_frame(radio, freq, station, raw(frame))
def spoof_deauthenticate(self, radio, freq, station):
'''
Send a spoofed deauthenticate frame to a station
'''
frame = Dot11()/Dot11Deauth(reason=6)
frame[Dot11].addr1 = station
frame[Dot11].addr2 = radio.addresses[0]
frame[Dot11].addr3 = radio.addresses[0]
self.spoof_frame(radio, freq, station, raw(frame))
def spoof_eap_fail(self, radio, freq, station):
'''
Send a spoofed EAP-Failure frame to a station
'''
frame = Dot11(type="Data", subtype=0)
frame[Dot11].addr1 = station
frame[Dot11].addr2 = radio.addresses[0]
frame[Dot11].addr3 = radio.addresses[0]
frame /= LLC()/SNAP()/EAPOL( version="802.1X-2001" )
frame /= EAP( code="Failure" )
self.spoof_frame(radio, freq, station, raw(frame))
def spoof_invalid_ptk_1_of_4(self, radio, freq, station):
'''
Send a spoofed PTK 1/4 frame to a station
'''
frame = Dot11(type="Data", subtype=0)
frame[Dot11].addr1 = station
frame[Dot11].addr2 = radio.addresses[0]
frame[Dot11].addr3 = radio.addresses[0]
# NOTE: Expected key_info is 0x008a, with the install flag
# this becomes 0x00ca.
eapol = WPA_key( descriptor_type = 2,
key_info = 0x00ca, # Includes an invalid install flag!
replay_counter = struct.pack(">Q", 100))
frame /= LLC()/SNAP()/EAPOL(version="802.1X-2004", type="EAPOL-Key")
frame /= eapol
self.spoof_frame(radio, freq, station, raw(frame))
def spoof_frame(self, radio, freq, station, frame):
'''
Send a spoofed arbitrary frame to a station
'''
radio_path = None
objects = self.object_manager.GetManagedObjects()
for path in objects:
obj = objects[path]
for interface in obj:
if interface == HWSIM_INTERFACE_INTERFACE:
if obj[interface]['Address'] == radio.addresses[0] or \
obj[interface]['Address'] == radio.addresses[1]:
radio_path = path
break
if not radio_path:
raise Exception("Could not find radio %s" % radio.path)
iface = dbus.Interface(self._bus.get_object(HWSIM_SERVICE, radio_path),
HWSIM_INTERFACE_INTERFACE)
iface.SendFrame(dbus.ByteArray.fromhex(station.replace(':', '')),
freq, -30, frame)
def get_radio(self, name):
for path in self.radios:
radio = self.radios[path]
if radio.name == name:
return radio
return None