mirror of
https://git.kernel.org/pub/scm/network/wireless/iwd.git
synced 2025-01-05 12:52:37 +01:00
e44ccf3daa
Use scapy library which allows one to easily construct and fudge various network packets. This makes constructing spoofed packets much easier and more readable compared to hex-encoded, hand-crafted frames.
487 lines
15 KiB
Python
Executable File
487 lines
15 KiB
Python
Executable File
#!/usr/bin/python3
|
|
import dbus
|
|
import sys
|
|
import collections
|
|
|
|
from weakref import WeakValueDictionary
|
|
from abc import ABCMeta, abstractmethod
|
|
from enum import Enum
|
|
from scapy.all import *
|
|
from scapy.contrib.wpa_eapol import WPA_key
|
|
|
|
import iwd
|
|
from config import ctx
|
|
|
|
HWSIM_SERVICE = 'net.connman.hwsim'
|
|
HWSIM_RULE_MANAGER_INTERFACE = 'net.connman.hwsim.RuleManager'
|
|
HWSIM_RULE_INTERFACE = 'net.connman.hwsim.Rule'
|
|
HWSIM_RADIO_MANAGER_INTERFACE = 'net.connman.hwsim.RadioManager'
|
|
HWSIM_RADIO_INTERFACE = 'net.connman.hwsim.Radio'
|
|
HWSIM_INTERFACE_INTERFACE = 'net.connman.hwsim.Interface'
|
|
|
|
HWSIM_AGENT_MANAGER_PATH = '/'
|
|
|
|
class HwsimDBusAbstract(iwd.AsyncOpAbstract):
|
|
__metaclass__ = ABCMeta
|
|
|
|
def __init__(self, object_path, properties = None, namespace=ctx):
|
|
self._bus = namespace.get_bus()
|
|
self._object_path = object_path
|
|
proxy = self._bus.get_object(HWSIM_SERVICE, self._object_path)
|
|
self._iface = dbus.Interface(proxy, self._iface_name)
|
|
self._prop_proxy = dbus.Interface(proxy, iwd.DBUS_PROPERTIES)
|
|
|
|
if properties is None:
|
|
self._properties = self._prop_proxy.GetAll(self._iface_name)
|
|
else:
|
|
self._properties = properties
|
|
|
|
self._prop_proxy.connect_to_signal("PropertiesChanged",
|
|
self._property_changed_handler, path_keyword="path")
|
|
|
|
def _property_changed_handler(self, interface, changed, invalidated, path):
|
|
if interface == self._iface_name and path == self._object_path:
|
|
for name, value in changed.items():
|
|
self._properties[name] = value
|
|
|
|
@abstractmethod
|
|
def __str__(self):
|
|
pass
|
|
|
|
@property
|
|
def path(self):
|
|
return self._object_path
|
|
|
|
class Rule(HwsimDBusAbstract):
|
|
_iface_name = HWSIM_RULE_INTERFACE
|
|
|
|
@property
|
|
def source(self):
|
|
return self._properties['Source']
|
|
|
|
@source.setter
|
|
def source(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'Source', value, reply_handler=self._success,
|
|
error_handler=self._failure)
|
|
self._wait_for_async_op()
|
|
|
|
@property
|
|
def destination(self):
|
|
return self._properties['Destination']
|
|
|
|
@destination.setter
|
|
def destination(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'Destination', value, reply_handler=self._success,
|
|
error_handler=self._failure)
|
|
self._wait_for_async_op()
|
|
|
|
@property
|
|
def bidirectional(self):
|
|
return bool(self._properties['Bidirectional'])
|
|
|
|
@bidirectional.setter
|
|
def bidirectional(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'Bidirectional',
|
|
dbus.Boolean(value), reply_handler=self._success, error_handler=self._failure)
|
|
self._wait_for_async_op()
|
|
|
|
@property
|
|
def frequency(self):
|
|
return int(self._properties['Frequency'])
|
|
|
|
@frequency.setter
|
|
def frequency(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'Frequency',
|
|
dbus.UInt32(value), reply_handler=self._success, error_handler=self._failure)
|
|
self._wait_for_async_op()
|
|
|
|
@property
|
|
def priority(self):
|
|
return int(self._properties['Priority'])
|
|
|
|
@priority.setter
|
|
def priority(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'Priority',
|
|
dbus.Int16(value), reply_handler=self._success, error_handler=self._failure)
|
|
self._wait_for_async_op()
|
|
|
|
@property
|
|
def signal(self):
|
|
return int(self._properties['SignalStrength'])
|
|
|
|
@signal.setter
|
|
def signal(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'SignalStrength',
|
|
dbus.Int16(value), reply_handler=self._success, error_handler=self._failure)
|
|
self._wait_for_async_op()
|
|
|
|
@property
|
|
def drop(self):
|
|
return bool(self._properties['Drop'])
|
|
|
|
@drop.setter
|
|
def drop(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'Drop', dbus.Boolean(value),
|
|
reply_handler=self._success, error_handler=self._failure)
|
|
self._wait_for_async_op()
|
|
|
|
@property
|
|
def delay(self):
|
|
return int(self._properties['Delay'])
|
|
|
|
@delay.setter
|
|
def delay(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'Delay', dbus.UInt32(value),
|
|
reply_handler=self._success, error_handler=self._failure)
|
|
self._wait_for_async_op()
|
|
|
|
@property
|
|
def prefix(self):
|
|
return self._properties['Prefix']
|
|
|
|
@prefix.setter
|
|
def prefix(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'Prefix', dbus.ByteArray.fromhex(value),
|
|
reply_handler=self._success, error_handler=self._failure)
|
|
self._wait_for_async_op()
|
|
|
|
@property
|
|
def enabled(self):
|
|
return self._properties['Enabled']
|
|
|
|
@enabled.setter
|
|
def enabled(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'Enabled', value,
|
|
reply_handler=self._success, error_handler=self._failure)
|
|
self._wait_for_async_op()
|
|
|
|
@property
|
|
def match_times(self):
|
|
return self._properties['MatchTimes']
|
|
|
|
@match_times.setter
|
|
def match_times(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'MatchTimes', dbus.UInt16(value))
|
|
|
|
@property
|
|
def drop_ack(self):
|
|
return self._properties(['DropAck'])
|
|
|
|
@drop_ack.setter
|
|
def drop_ack(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'DropAck', value)
|
|
|
|
@property
|
|
def match(self):
|
|
return self._properties['MatchBytes']
|
|
|
|
@match.setter
|
|
def match(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'MatchBytes', dbus.ByteArray.fromhex(value))
|
|
|
|
@property
|
|
def match_offset(self):
|
|
return self._properties(['MatchBytesOffset'])
|
|
|
|
@match_offset.setter
|
|
def match_offset(self, value):
|
|
self._prop_proxy.Set(self._iface_name, 'MatchBytesOffset', dbus.UInt16(value))
|
|
|
|
def remove(self):
|
|
self._iface.Remove(reply_handler=self._success,
|
|
error_handler=self._failure)
|
|
|
|
self._wait_for_async_op()
|
|
|
|
def __del__(self):
|
|
self.remove()
|
|
|
|
def __str__(self, prefix = ''):
|
|
return prefix + 'Rule: ' + self.path + '\n' + \
|
|
prefix + '\tSource:\t\t' + self.source + '\n' + \
|
|
prefix + '\tDestination:\t' + self.destination + '\n' + \
|
|
prefix + '\tBidirectional:\t' + \
|
|
str(self.bidirectional) + '\n' + \
|
|
prefix + '\tPriority:\t' + str(self.priority) + '\n' +\
|
|
prefix + '\tFrequency:\t' + str(self.frequency) + '\n' + \
|
|
prefix + '\tApply rssi:\t' + str(self.signal) + '\n' + \
|
|
prefix + '\tApply drop:\t' + str(self.drop) + '\n' + \
|
|
prefix + '\tPrefix:\t' + str([hex(b) for b in self.prefix]) + '\n' + \
|
|
prefix + '\tDelay:\t' + str(self.delay) + '\n' + \
|
|
prefix + '\tEnabled:\t' + str(self.enabled) + '\n'
|
|
|
|
class RuleSet(collections.Mapping):
|
|
def __init__(self, hwsim, objects):
|
|
self._dict = {}
|
|
self._rule_manager = hwsim.rule_manager
|
|
|
|
hwsim.object_manager.connect_to_signal("InterfacesAdded",
|
|
self._interfaces_added_handler, HWSIM_RULE_INTERFACE)
|
|
hwsim.object_manager.connect_to_signal("InterfacesRemoved",
|
|
self._interfaces_removed_handler, HWSIM_RULE_INTERFACE)
|
|
|
|
for path in objects:
|
|
for interface in objects[path]:
|
|
if interface == HWSIM_RULE_INTERFACE:
|
|
self._dict[path] = Rule(path, objects[path][interface])
|
|
|
|
def __getitem__(self, key):
|
|
return self._dict.__getitem__(key)
|
|
|
|
def __iter__(self):
|
|
return self._dict.__iter__()
|
|
|
|
def __len__(self):
|
|
return self._dict.__len__()
|
|
|
|
def __delitem__(self, key):
|
|
self._dict.pop(key).remove()
|
|
|
|
def _interfaces_added_handler(self, path, interfaces):
|
|
self._dict[path] = Rule(interfaces[HWSIM_RULE_INTERFACE])
|
|
|
|
def _interfaces_removed_handler(self, path, interfaces):
|
|
del _dict[path]
|
|
|
|
def create(self):
|
|
path = self._rule_manager.AddRule()
|
|
obj = Rule(path)
|
|
self._dict[path] = obj
|
|
return obj
|
|
|
|
def remove_all(self):
|
|
for rule in self._dict.values():
|
|
rule.remove()
|
|
|
|
class Radio(HwsimDBusAbstract):
|
|
_iface_name = HWSIM_RADIO_INTERFACE
|
|
|
|
@property
|
|
def name(self):
|
|
return self._properties['Name']
|
|
|
|
@property
|
|
def addresses(self):
|
|
return [str(addr) for addr in self._properties['Addresses']]
|
|
|
|
def remove(self):
|
|
self._iface.Destroy(reply_handler=self._success,
|
|
error_handler=self._failure)
|
|
|
|
self._wait_for_async_op()
|
|
|
|
def __str__(self, prefix = ''):
|
|
return prefix + 'Radio: ' + self.path + '\n' + \
|
|
prefix + '\tName:\t\t' + self.name + '\n' + \
|
|
prefix + '\tAddresses:\t' + repr(self.destination) + '\n'
|
|
|
|
class RadioList(collections.Mapping):
|
|
def __init__(self, hwsim, objects):
|
|
self._dict = {}
|
|
self._radio_manager = hwsim.radio_manager
|
|
|
|
hwsim.object_manager.connect_to_signal("InterfacesAdded",
|
|
self._interfaces_added_handler, HWSIM_RADIO_INTERFACE)
|
|
hwsim.object_manager.connect_to_signal("InterfacesRemoved",
|
|
self._interfaces_removed_handler, HWSIM_RADIO_INTERFACE)
|
|
|
|
for path in objects:
|
|
for interface in objects[path]:
|
|
if interface == HWSIM_RADIO_INTERFACE:
|
|
self._dict[path] = Radio(path, objects[path][interface])
|
|
|
|
def __getitem__(self, key):
|
|
return self._dict.__getitem__(key)
|
|
|
|
def __iter__(self):
|
|
return self._dict.__iter__()
|
|
|
|
def __len__(self):
|
|
return self._dict.__len__()
|
|
|
|
def __delitem__(self, key):
|
|
self._dict.pop(key).remove()
|
|
|
|
def values(self):
|
|
return self._dict.values()
|
|
|
|
def _interfaces_added_handler(self, path, interfaces):
|
|
self._dict[path] = Radio(interfaces[HWSIM_RADIO_INTERFACE])
|
|
|
|
def _interfaces_removed_handler(self, path, interfaces):
|
|
del _dict[path]
|
|
|
|
def create(self, name, p2p_device=False, iftype_disable=None,
|
|
cipher_disable=None, wait=True):
|
|
args = dbus.Dictionary({
|
|
'Name': name,
|
|
'P2P': p2p_device,
|
|
'NoVirtualInterface': True,
|
|
}, signature='sv')
|
|
|
|
if iftype_disable:
|
|
args['InterfaceTypeDisable'] = iftype_disable
|
|
|
|
if cipher_disable:
|
|
args['CipherTypeDisable'] = cipher_disable
|
|
|
|
if not wait:
|
|
self._radio_manager.CreateRadio(args, reply_handler=self._success,
|
|
error_handler=self._failure)
|
|
return None
|
|
|
|
path = self._radio_manager.CreateRadio(args)
|
|
obj = Radio(path)
|
|
self._dict[path] = obj
|
|
return obj
|
|
|
|
def _success(self, bla):
|
|
pass
|
|
|
|
def _failure(self, ex):
|
|
pass
|
|
|
|
class Hwsim(iwd.AsyncOpAbstract):
|
|
_instances = WeakValueDictionary()
|
|
|
|
def __new__(cls, namespace=ctx):
|
|
key = id(namespace)
|
|
|
|
if key not in cls._instances.keys():
|
|
obj = object.__new__(cls)
|
|
obj._initialized = False
|
|
|
|
cls._instances[key] = obj
|
|
|
|
return cls._instances[key]
|
|
|
|
def __init__(self, namespace=ctx):
|
|
if self._initialized:
|
|
return
|
|
|
|
self._initialized = True
|
|
|
|
self._bus = namespace.get_bus()
|
|
|
|
self._rule_manager_if = dbus.Interface(
|
|
self._bus.get_object(HWSIM_SERVICE, '/'),
|
|
HWSIM_RULE_MANAGER_INTERFACE)
|
|
self._radio_manager_if = dbus.Interface(
|
|
self._bus.get_object(HWSIM_SERVICE, '/'),
|
|
HWSIM_RADIO_MANAGER_INTERFACE)
|
|
self._object_manager_if = dbus.Interface(
|
|
self._bus.get_object(HWSIM_SERVICE, '/'),
|
|
iwd.DBUS_OBJECT_MANAGER)
|
|
|
|
objects = self.object_manager.GetManagedObjects()
|
|
|
|
self._rules = RuleSet(self, objects)
|
|
self._radios = RadioList(self, objects)
|
|
|
|
@property
|
|
def rules(self):
|
|
return self._rules
|
|
|
|
@property
|
|
def rule_manager(self):
|
|
return self._rule_manager_if
|
|
|
|
@property
|
|
def radios(self):
|
|
return self._radios
|
|
|
|
@property
|
|
def radio_manager(self):
|
|
return self._radio_manager_if
|
|
|
|
@property
|
|
def object_manager(self):
|
|
return self._object_manager_if
|
|
|
|
def spoof_disassociate(self, radio, freq, station):
|
|
'''
|
|
Send a spoofed disassociate frame to a station
|
|
'''
|
|
frame = Dot11()/Dot11Disas(reason=7)
|
|
frame[Dot11].addr1 = station
|
|
frame[Dot11].addr2 = radio.addresses[0]
|
|
frame[Dot11].addr3 = radio.addresses[0]
|
|
|
|
self.spoof_frame(radio, freq, station, raw(frame))
|
|
|
|
def spoof_deauthenticate(self, radio, freq, station):
|
|
'''
|
|
Send a spoofed deauthenticate frame to a station
|
|
'''
|
|
frame = Dot11()/Dot11Deauth(reason=6)
|
|
frame[Dot11].addr1 = station
|
|
frame[Dot11].addr2 = radio.addresses[0]
|
|
frame[Dot11].addr3 = radio.addresses[0]
|
|
|
|
self.spoof_frame(radio, freq, station, raw(frame))
|
|
|
|
def spoof_eap_fail(self, radio, freq, station):
|
|
'''
|
|
Send a spoofed EAP-Failure frame to a station
|
|
'''
|
|
frame = Dot11(type="Data", subtype=0)
|
|
frame[Dot11].addr1 = station
|
|
frame[Dot11].addr2 = radio.addresses[0]
|
|
frame[Dot11].addr3 = radio.addresses[0]
|
|
frame /= LLC()/SNAP()/EAPOL( version="802.1X-2001" )
|
|
frame /= EAP( code="Failure" )
|
|
|
|
self.spoof_frame(radio, freq, station, raw(frame))
|
|
|
|
def spoof_invalid_ptk_1_of_4(self, radio, freq, station):
|
|
'''
|
|
Send a spoofed PTK 1/4 frame to a station
|
|
'''
|
|
frame = Dot11(type="Data", subtype=0)
|
|
frame[Dot11].addr1 = station
|
|
frame[Dot11].addr2 = radio.addresses[0]
|
|
frame[Dot11].addr3 = radio.addresses[0]
|
|
|
|
# NOTE: Expected key_info is 0x008a, with the install flag
|
|
# this becomes 0x00ca.
|
|
eapol = WPA_key( descriptor_type = 2,
|
|
key_info = 0x00ca, # Includes an invalid install flag!
|
|
replay_counter = struct.pack(">Q", 100))
|
|
frame /= LLC()/SNAP()/EAPOL(version="802.1X-2004", type="EAPOL-Key")
|
|
frame /= eapol
|
|
|
|
self.spoof_frame(radio, freq, station, raw(frame))
|
|
|
|
def spoof_frame(self, radio, freq, station, frame):
|
|
'''
|
|
Send a spoofed arbitrary frame to a station
|
|
'''
|
|
radio_path = None
|
|
objects = self.object_manager.GetManagedObjects()
|
|
|
|
for path in objects:
|
|
obj = objects[path]
|
|
for interface in obj:
|
|
if interface == HWSIM_INTERFACE_INTERFACE:
|
|
if obj[interface]['Address'] == radio.addresses[0] or \
|
|
obj[interface]['Address'] == radio.addresses[1]:
|
|
radio_path = path
|
|
break
|
|
|
|
if not radio_path:
|
|
raise Exception("Could not find radio %s" % radio.path)
|
|
|
|
iface = dbus.Interface(self._bus.get_object(HWSIM_SERVICE, radio_path),
|
|
HWSIM_INTERFACE_INTERFACE)
|
|
|
|
iface.SendFrame(dbus.ByteArray.fromhex(station.replace(':', '')),
|
|
freq, -30, frame)
|
|
|
|
def get_radio(self, name):
|
|
for path in self.radios:
|
|
radio = self.radios[path]
|
|
if radio.name == name:
|
|
return radio
|
|
|
|
return None
|