3
0
mirror of https://git.kernel.org/pub/scm/network/wireless/iwd.git synced 2024-12-22 13:02:44 +01:00
iwd/test/wfd-source
Andrew Zaborowski fb4b7e7a0b wfd-source: Fix some races on iwd name owner change
Subscribe to InterfacesAdded/Removed/PropertiesChanged signals before
using GetManagedObjects.  For some reason when iwd starts after the
client, we consistently get the managed objects list from before Adapter
interfaces are added but we miss the subsequent InterfacesAdded
signals, probably has to do with the GetManagedObjects and the AddMatch
calls all being synchronous.

Secondly call self.populate_devices() on init as it won't be called if
IWD is not on the bus.
2020-08-04 10:30:00 -05:00

1671 lines
71 KiB
Python
Executable File

#! /usr/bin/python3
#
# Copyright (C) 2020 Intel Corporation
#
# A simplified WFD source that streams the X11 screen using gstreamer
# A more complete solution would create a virtual screen visible through the normal system calls, xrandr, etc.,
# with its pixel aspect ratio, EDID data and what not. This would allow the user to configure it like a real
# display in mirror mode or side-by-side mode.
import sys
import dbus
import dbus.mainloop.glib
import socket
import collections
import collections.abc
import random
import dataclasses
import traceback
import codecs
try:
import netifaces
except:
pass
import gi
gi.require_version('GLib', '2.0')
gi.require_version('Gst', '1.0')
gi.require_version('Gtk', '3.0')
from gi.repository import GLib, Gst, Gtk, Gdk, Pango, GObject
class WFDRTSPServer:
class RTSPException(Exception):
pass
Prop = collections.namedtuple('Prop', ['name', 'desc', 'getter', 'setter', 'type', 'vals'])
def __init__(self, port, state_handler, error_handler, init_values, prop_handler):
# Should start the TCP server only on the P2P connection's local IP but we won't
# know the IP or interface name until after the connection is established. At that
# time the sink may try to make the TCP connection at any time so our listen
# socket should be up before this.
server_address = ('0.0.0.0', port)
self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
self.server.bind(server_address)
self.server.listen(1)
GLib.io_add_watch(self.server, GLib.IO_IN, self.handle_connection)
self.conn = None
self.tx_queue = []
self.rx_queue = b''
self.state_handler = state_handler
self.error_handler = error_handler
self.prop_handler = prop_handler
self.sm_init(init_values)
def handle_data_out(self, conn, *args):
try:
cmd = self.tx_queue.pop(0)
sent = self.conn.send(cmd)
if sent < len(cmd):
self.tx_queue.insert(0, cmd[sent:])
return len(self.tx_queue) > 0
except Exception as e:
self.error_handler(e)
return False
def tx_queue_append(self, cmd):
if not self.tx_queue:
GLib.io_add_watch(self.conn.fileno(), GLib.IO_OUT, self.handle_data_out)
self.tx_queue.append(cmd.encode('utf-8'))
self.debug('queued cmd: ' + cmd)
def handle_data_hup(self, conn, *args):
try:
self.debug('HUP')
self.error('Disconnected')
except Exception as e:
self.error_handler(e)
return False
def handle_data_in(self, conn, *args):
try:
newdata = self.conn.recv(4096)
if len(newdata) == 0:
self.debug('recv returned 0 bytes')
# Disconnect from P2P
self.error('Disconnected')
return False
self.debug('received data: ' + str(newdata))
self.rx_queue += newdata
while b'\r\n\r\n' in self.rx_queue:
msg, content = self.rx_queue.split(b'\r\n\r\n', 1)
lines = msg.split(b'\r\n')
headers = {}
for line in lines[1:]:
if b':' not in line:
# Bad syntax
rxbuf = b''
return True
name, value = line.decode('utf8').split(':', 1)
name = name.lower()
while len(value) and value[0] == ' ':
value = value[1:]
if name in headers:
# Duplicate
rxbuf = b''
return True
headers[name] = value
cl = 0
if 'content-length' in headers:
try:
cl = int(headers['content-length'])
if cl < 1 or cl > 1000:
raise Exception('')
except:
# Bad syntax
rxbuf = b''
return True
if len(content) < cl:
# Wait for more data
return True
top_line = lines[0].decode('utf8').split(None, 2)
self.rx_queue = self.rx_queue[len(msg) + 4 + cl:]
content = content[:cl]
if top_line[2] == 'RTSP/1.0':
self.source_handle_message(method=top_line[0], target=top_line[1], headers=headers, content=content)
elif top_line[0] == 'RTSP/1.0':
try:
status = int(top_line[1])
if status < 1 or status > 999:
raise Exception('Status out of range')
except:
self.error('Couldn\'t parse response status')
self.source_handle_message(status=status, reason=top_line[2], headers=headers, content=content)
else:
# Bad protocol
self.error('Unknown protocol in ' + str(top_line))
return True
except Exception as e:
self.error_handler(e)
return False
def handle_connection(self, sock, *args):
try:
if self.conn:
return False
self.conn, addr = sock.accept()
self.debug('RTSP connection from: ' + str(addr))
self.remote_ip = addr[0]
if self.expected_remote_ip and self.remote_ip != self.expected_remote_ip:
self.conn.close()
self.conn = None
self.debug('Connection refused, bad source address')
return True
sock.close()
self.server = None
GLib.io_add_watch(self.conn.fileno(), GLib.IO_IN, self.handle_data_in)
GLib.io_add_watch(self.conn.fileno(), GLib.IO_HUP, self.handle_data_hup)
self._state = 'init'
self.source_handle_message()
return False
except Exception as e:
self.error_handler(e)
return False
def error(self, msg):
self.enter_state('failed')
e = WFDRTSPServer.RTSPException('State ' + self._state + ': ' + msg)
self.debug('error: ' + msg)
raise e
def warning(self, msg):
self.debug('warning: ' + msg)
print('Warning: ' + msg + '\n')
def debug(self, msg):
pass
@property
def state(self):
return self._state
def enter_state(self, new_state):
self.debug('state change: ' + self._state + ' -> ' + new_state)
self._state = new_state
self.state_handler()
@property
def ready(self):
return self._state in ['streaming', 'paused']
@property
def props(self):
return self._props
def sm_init(self, init_values):
self._state = 'waiting-rtsp'
self.local_params = {
'wfd_video_formats': '00 00 01 08 00000000 00000000 00000040 00 0000 0000 00 none none'
}
self.remote_params = {}
self.local_methods = [ 'org.wfa.wfd1.0', 'SET_PARAMETER', 'GET_PARAMETER', 'PLAY', 'SETUP', 'TEARDOWN' ]
self.presentation_url = [ 'rtsp://127.0.0.1/wfd1.0/streamid=0', 'none' ] # Table 88
self.alternative_urls = [ 'rtsp://localhost/wfd1.0/streamid=0' ]
self.session_stream_url = None
self.session_id = None
self.session_timeout = 60
self.local_cseq = 0
self.remote_cseq = None
self.last_method = None
self.last_require = []
self.last_params = []
self.remote_rtp_port = None
self.remote_rtcp_port = None
self.local_rtp_port = None
self.local_rtcp_port = None
self.use_tcp = None
self.rtp_pipeline = None
self.rtsp_keepalive = None
self.rtsp_keepalive_timeout = None
self.expected_remote_ip = None
self.remote_ip = None
self.init_width = init_values['width']
self.init_height = init_values['height']
self.rtcp_enabled = init_values['rtcp_enabled']
self._props = []
@staticmethod
def get_init_props():
props = []
values = {
'width': 800,
'height': 600,
'rtcp_enabled': True
}
def set_val(key, val):
values[key] = val
props.append(WFDRTSPServer.Prop('Output width', 'Scale the video stream to this X resolution for sending',
lambda: values['width'], lambda x: set_val('width', x), int, (640, 1920)))
props.append(WFDRTSPServer.Prop('Output height', 'Scale the video stream to this Y resolution for sending',
lambda: values['height'], lambda x: set_val('height', x), int, (480, 1080)))
props.append(WFDRTSPServer.Prop('Enable RTCP', 'Use RTCP if the Sink requests it during setup',
lambda: values['rtcp_enabled'], lambda x: set_val('rtcp_enabled', x), bool, None))
# TODO: Enable Audio
# TODO: Audio source
return props, values
def close(self):
# Avoid passing self to io watches so that the refcount can ever reach 0 and
# all this can be done in __del__
if self.rtsp_keepalive:
GLib.source_remove(self.rtsp_keepalive)
self.rtsp_keepalive = None
if self.rtsp_keepalive_timeout:
GLib.source_remove(self.rtsp_keepalive_timeout)
self.rtsp_keepalive_timeout = None
if self.rtp_pipeline:
self.rtp_pipeline.set_state(Gst.State.NULL)
self.rtp_pipeline = None
if self.server:
self.server.close()
self.server = None
if self.conn:
self.conn.close()
self.conn = None
def set_local_interface(self, new_value):
try:
local_ip = netifaces.ifaddresses(new_value)[netifaces.AF_INET][0]['addr']
self.alternative_urls.append('rtsp://' + local_ip + '/wfd1.0/streamid=0')
except:
pass
def set_remote_ip(self, new_value):
self.expected_remote_ip = new_value
if self.conn and self.remote_ip != self.expected_remote_ip:
self.error_handler(WFDRTSPServer.RTSPException('Connection was from a wrong IP')) # TODO: do this in an idle cb
def validate_msg(self, method, expected_method, status, reason, headers, target, content):
if expected_method is None:
# Expected a response, not a request
if method is not None:
self.error('Received a "' + method + '" request where a response was expected')
if status < 200 or status > 299:
self.error('Response status ' + str(status) + ' and reason: ' + reason)
if status != 200:
self.warning('Response status was ' + str(status) + ' ("' + reason + '") in state ' + self._state)
try:
if int(headers['cseq']) != self.local_cseq:
self.error('Response CSeq doesn\'t match')
except:
self.error('Missing or unparsable CSeq in a response')
if self.last_method == 'OPTIONS':
if 'public' not in headers:
self.error('Missing "Public" header in OPTIONS response')
public = [ m.strip() for m in headers['public'].split(',') ]
missing = [ m for m in self.last_require if m not in public ]
if missing:
self.error('Missing required method(s) "' + '", "'.join(missing) + '" in OPTIONS response')
if self.last_method == 'GET_PARAMETER':
params = {}
for line in content.split(b'\r\n'):
if b':' not in line:
continue
k, v = line.decode('utf8').split(':', 1)
if k.strip() in params:
self.error('Duplicate key "' + k + '" in GET_PARAMETER response')
params[k.strip()] = v.strip()
missing = [ p for p in self.last_params if p not in params ]
if missing: # Not an error
self.warning('Missing key(s) "' + '", "'.join(missing) + '" in GET_PARAMETER response')
self.remote_params.update(params)
return
if method is None:
self.error('Received an RTSP response where a ' + expected_method + ' was expected')
if method != expected_method:
self.error('Received a "' + method + '" request where a ' + expected_method + ' was expected')
try:
if self.remote_cseq is not None and int(headers['cseq']) <= self.remote_cseq:
self.error('Unchanged CSeq in a new request')
self.remote_cseq = int(headers['cseq'])
except:
self.error('Missing or unparsable CSeq in a new request')
if method == 'OPTIONS' and 'require' not in headers:
self.error('Missing "Require" header in OPTIONS request')
elif method == 'SETUP' and 'transport' not in headers:
self.error('Missing "Transport" header in SETUP request')
elif method == 'SETUP' and (target not in self.presentation_url + self.alternative_urls or target == 'none'):
self.error('Unknown target "' + target + '" in SETUP request')
elif method == 'PLAY' and ('session' not in headers or headers['session'] != self.session_id):
self.error('Missing or invalid "Session" header in PLAY request')
elif method == 'PLAY' and target != self.session_stream_url:
self.error('Unknown target "' + target + '" in PLAY request')
elif method == 'PAUSE' and 'session' not in headers:
self.error('Missing "Session" header in PAUSE request')
elif method == 'PAUSE' and target != self.session_stream_url:
self.error('Unknown target "' + target + '" in PAUSE request')
elif method == 'TEARDOWN' and 'session' not in headers:
self.error('Missing "Session" header in TEARDOWN request')
elif method == 'TEARDOWN' and target != self.session_stream_url:
self.error('Unknown target "' + target + '" in TEARDOWN request')
elif method == 'SET_PARAMETER':
params = []
names = []
for line in content.split(b'\r\n'):
param = (line.decode('utf8').strip(), None)
if not param[0]:
continue
if ':' in param[0]:
k, v = param[0].split(':', 1)
param = (k.strip(), v.strip())
if param[0] in names:
self.error('Duplicate key "' + param[0] + '" in SET_PARAMETER response')
names.append(param[0])
params.append(param)
self.last_params = params
def request(self, method, target, require=[], params=[]):
content = ''
cmd = method + ' ' + target + ' RTSP/1.0\r\n'
self.local_cseq += 1
cmd += 'CSeq: ' + str(self.local_cseq) + '\r\n'
if require:
cmd += 'Require: ' + ', '.join(require) + '\r\n'
if params:
if isinstance(params, collections.abc.Mapping):
content = ''.join([ k + ': ' + params[k] + '\r\n' for k in params ])
else:
content = ''.join([ k + '\r\n' for k in params ])
content_type = 'text/parameters'
if content:
cmd += 'Content-Type: ' + content_type + '\r\n'
cmd += 'Content-Length: ' + str(len(content)) + '\r\n'
cmd += '\r\n'
self.tx_queue_append(cmd + content)
self.last_method = method
self.last_require = require
self.last_params = params
def response(self, public=[], session=None, transport=None):
cmd = 'RTSP/1.0 200 OK\r\n'
cmd += 'CSeq: ' + str(self.remote_cseq) + '\r\n'
if public:
cmd += 'Public: ' + ', '.join(public) + '\r\n'
if session is not None:
cmd += 'Session: ' + session + '\r\n'
if transport is not None:
cmd += 'Transport: ' + transport + '\r\n'
cmd += '\r\n'
self.tx_queue_append(cmd)
def parse_video_formats(self, value):
# TODO
pass
def parse_client_rtp_ports(self, value):
profile, rtp_p0_str, rtp_p1_str, mode = value.split()
try:
rtp_p0 = int(rtp_p0_str)
rtp_p1 = int(rtp_p1_str)
except:
self.error('Can\'t parse rtp-port in wfd-client-rtp-ports: ' + value)
if rtp_p0 < 1 or rtp_p0 > 65535:
self.error('rtp-port0 not valid for Primary Sink: ' + rtp_p0_str)
if rtp_p1 != 0: # Table 90
self.error('rtp-port1 not valid for Primary Sink: ' + rtp_p1_str)
if profile not in ['RTP/AVP/UDP;unicast', 'RTP/AVP/TCP;unicast']:
self.error('Unknown RTP transport in wfd-client-rtp-ports: ' + profile)
if mode != 'mode=play':
self.error('Unknown mode in wfd-client-rtp-ports: ' + mode)
self.remote_rtp_port = rtp_p0
self.use_tcp = (profile == 'RTP/AVP/TCP;unicast')
def parse_transport(self, value):
params = value.split(';')
if len(params) < 3:
self.error('Can\'t split SETUP Transport header into profile and port numbers: ' + value)
profile = ';'.join(params[0:2])
if profile not in ['RTP/AVP/UDP;unicast', 'RTP/AVP/TCP;unicast']:
self.error('Unknown RTP transport in SETUP Transport header: ' + profile)
if self.use_tcp != (profile == 'RTP/AVP/TCP;unicast'):
self.error('RTP transport in SETUP Transport header different from what we sent in M4: ' + profile)
client_port_strs = [p for p in params[2:] if p.startswith('client_port=')]
if len(client_port_strs) != 1:
self.error('Can\'t find client-port in SETUP Transport header: ' + value)
client_ports = client_port_strs[0].split('=', 1)[1].split('-')
try:
rtp_port = int(client_ports[0])
if len(client_ports) > 1:
rtcp_port = int(client_ports[1])
except:
self.error('Can\'t parse client-port in SETUP Transport header: ' + client_port_strs[0])
if rtp_port != self.remote_rtp_port:
self.error('client-port in SETUP Transport header doesn\'t match what we sent in M4: ' + str(rtp_port))
if len(client_ports) > 1:
if rtcp_port < 1 or rtcp_port > 65535 or rtcp_port == rtp_port: # Actually must be rtp_port + 1...
self.error('Optional RTCP port not valid in SETUP Transport header: ' + str(rtcp_port))
self.remote_rtcp_port = rtcp_port
self._props.append(WFDRTSPServer.Prop('RTP transport', '', lambda: 'TCP' if self.use_tcp else 'UDP', None, str, None))
self._props.append(WFDRTSPServer.Prop('Remote RTP port', '', lambda: self.remote_rtp_port, None, int, None))
self._props.append(WFDRTSPServer.Prop('Remote RTCP port', '', lambda: self.remote_rtcp_port, None, int, None))
def parse_display_edid(self):
try:
len_str, hex_str = self.remote_params['wfd_display_edid'].split(' ', 1)
if len(len_str.strip()) != 4:
raise Exception('edid-block-count length is not 4 hex digits')
blocks = int(len_str, 16)
edid = codecs.decode(hex_str.strip(), 'hex')
if blocks < 1 or blocks > 256 or blocks * 128 != len(edid):
raise Exception('edid-block-count value wrong')
except:
edid = None
self._props.append(WFDRTSPServer.Prop('EDID info', 'Remote display\'s EDID data', lambda: edid, None, bytes, None))
def create_running_props(self):
src = self.rtp_pipeline.get_by_name('src')
fps = self.rtp_pipeline.get_by_name('fps')
enc = self.rtp_pipeline.get_by_name('videnc')
res = self.rtp_pipeline.get_by_name('res')
sink = self.rtp_pipeline.get_by_name('sink')
self.pipeline_props = []
srcpadcaps = src.srcpads[0].get_allowed_caps()
width = srcpadcaps[0]['width']
height = srcpadcaps[0]['height']
props = []
props.append(WFDRTSPServer.Prop('Local width', 'Local screen X resolution', lambda: width, None, int, None))
props.append(WFDRTSPServer.Prop('Local height', 'Local screen Y resolution', lambda: height, None, int, None))
def set_use_damage(val):
src.props.use_damage = val
props.append(WFDRTSPServer.Prop('Use XDamage', 'Try to use XDamage to reduce bandwidth usage',
lambda: src.props.use_damage, set_use_damage, bool, None))
src.props.endx = width
src.props.endy = height
def set_startx(val):
src.set_property('startx', min(val, src.props.endx - 1))
def set_starty(val):
src.set_property('starty', min(val, src.props.endy - 1))
def set_endx(val):
src.set_property('endx', max(val, src.props.startx + 1))
def set_endy(val):
src.set_property('endy', max(val, src.props.starty + 1))
props.append(WFDRTSPServer.Prop('Window min X', 'Skip this many pixels on the left side of the local screen',
lambda: src.props.startx, set_startx, int, (0, width - 1)))
props.append(WFDRTSPServer.Prop('Window min Y', 'Skip this many pixels on the top of the local screen',
lambda: src.props.starty, set_starty, int, (0, height - 1)))
props.append(WFDRTSPServer.Prop('Window max X', 'Send screen contents only up to this X coordinate',
lambda: src.props.endx, set_endx, int, (1, width)))
props.append(WFDRTSPServer.Prop('Window max Y', 'Send screen contents only up to this Y coordinate',
lambda: src.props.endy, set_endy, int, (1, height)))
def set_framerate(val):
fps.props.caps[0]['framerate'] = Gst.Fraction(val)
def set_width(val):
res.props.caps[0]['width'] = val
def set_height(val):
res.props.caps[0]['height'] = val
props.append(WFDRTSPServer.Prop('Framerate', 'Try to output this many frames per second',
lambda: int(fps.props.caps[0]['framerate'].num), set_framerate, int, (1, 30)))
props.append(WFDRTSPServer.Prop('Output width', 'Scale the video stream to this X resolution for sending',
lambda: res.props.caps[0]['width'], set_width, int, (640, 1920)))
props.append(WFDRTSPServer.Prop('Output height', 'Scale the video stream to this Y resolution for sending',
lambda: res.props.caps[0]['height'], set_height, int, (480, 1080)))
preset_values = ['veryslow', 'slower', 'slow', 'medium', 'fast', 'faster', 'veryfast', 'superfast', 'ultrafast', 'placebo']
preset_map = {'veryslow': 9, 'slower': 8, 'slow': 7, 'medium': 6, 'fast': 5, 'faster': 4, 'veryfast': 3, 'superfast': 2, 'ultrafast': 1, 'placebo': 10}
def set_speed_preset(val):
enc.props.speed_preset = preset_map[val]
props.append(WFDRTSPServer.Prop('H.264 speed preset', 'Speed/quality setting of the H.264 encoder to optimise bandwidth/latency',
lambda: enc.props.speed_preset.value_nick, set_speed_preset, str, preset_values))
def set_max_lateness(val):
if val <= 0:
sink.props.max_lateness = -1
else:
sink.props.max_lateness = val * 1000000 # milliseconds to nanoseconds
props.append(WFDRTSPServer.Prop('Max lateness', 'Maximum number of milliseconds that a buffer can be late before it is dropped, or 0 for unlimited',
lambda: 0 if sink.props.max_lateness == -1 else sink.props.max_lateness / 1000000, set_max_lateness, int, (-1, 3000)))
return props
def on_gst_message(self, bus, message):
t = message.type
if t == Gst.MessageType.EOS:
self.error('Gstreamer end-of-stream')
elif t == Gst.MessageType.STATE_CHANGED:
old, new, pending = message.parse_state_changed()
self.debug('Gstreamer state change for ' + message.src.name + ' from ' + str(old) + ' to ' + str(new) + ', pending=' + str(pending))
if message.src == self.rtp_pipeline:
self.prop_handler()
elif t == Gst.MessageType.INFO:
err, debug = message.parse_info()
self.debug('Gstreamer info for ' + message.src.name + ': ' + str(err) + '\nDebug: ' + str(debug))
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
self.debug('Gstreamer warning for ' + message.src.name + ': ' + str(err) + '\nDebug: ' + str(debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
self.error('Gstreamer error for ' + message.src.name + ': ' + str(err) + '\nDebug: ' + str(debug))
else:
self.debug('Gstreamer message of type ' + str(t) + ' for ' + message.src.name + ': ' + str(message))
return True
def force_keyframe(self):
enc = self.rtp_pipeline.get_by_name('videnc')
sink = enc.get_static_pad('sink')
timestamp = Gst.CLOCK_TIME_NONE # can/should we use sink.query_position?
s = Gst.Structure('GstForceKeyUnit')
s.set_value('timestamp', GObject.Value(GObject.TYPE_UINT64, timestamp))
s.set_value('stream-time', GObject.Value(GObject.TYPE_UINT64, timestamp))
s.set_value('all-headers', GObject.Value(GObject.TYPE_BOOLEAN, True))
# TODO: can we also send this event directly to the element instead of the pad?
sink.send_event(Gst.Event.new_custom(Gst.EventType.CUSTOM_DOWNSTREAM, s))
def reset_stream(self):
if self.rtp_pipeline.get_state(timeout=0)[1] == Gst.State.PLAYING:
self.rtp_pipeline.set_state(Gst.State.PAUSED)
self.rtp_pipeline.set_state(Gst.State.PLAYING)
def rtsp_keepalive_timeout_cb(self):
try:
self.rtsp_keepalive_timeout = None
self.error('Keep-alive response timed out')
except Exception as e:
self.error_handler(e)
return False
def rtsp_keepalive_cb(self):
try:
# Send M16
# May need to start being careful with other requests that may be running...
self.request('GET_PARAMETER', 'rtsp://localhost/wfd1.0')
self.rtsp_keepalive_timeout = GLib.timeout_add_seconds(5, self.rtsp_keepalive_timeout_cb)
return True
except Exception as e:
self.error_handler(e)
return False
def source_handle_message(self, method=None, target=None, status=None, reason=None, headers={}, content=None):
# TODO: check the 6s timeouts as per Section 6.5
# Source side M1-M8 simplified state machine
if self._state == 'init':
# Send M1
self.request('OPTIONS', '*', require=['org.wfa.wfd1.0'])
self.enter_state('M1')
elif self._state == 'M1':
# Validate M1 response
self.validate_msg(method, None, status, reason, headers, None, content)
methods = [ m.strip() for m in headers['public'].split(',') ]
required = [ 'org.wfa.wfd1.0', 'SET_PARAMETER', 'GET_PARAMETER' ]
missing = [ m for m in required if m not in methods ]
if missing:
self.error('Missing required method(s) "' + '", "'.join(missing) + '" in OPTIONS response')
self.enter_state('M2')
elif self._state == 'M2':
# Validate M2
self.validate_msg(method, 'OPTIONS', status, reason, headers, target, content)
if target not in [ '*' ] + self.presentation_url:
self.error('Unknown OPTIONS target "' + target + '"')
required = [ m.strip() for m in headers['require'].split(',') ]
missing = [ m for m in required if m not in self.local_methods ]
if missing:
self.error('Required methods in OPTIONS request that we don\'t support: ' + ','.join(missing))
# Send M2 response
self.response(public=self.local_methods)
# Send M3
params = ['wfd_audio_codecs', 'wfd_video_formats', 'wfd_client_rtp_ports', 'wfd_display_edid', 'wfd_uibc_capability']
self.request('GET_PARAMETER', 'rtsp://localhost/wfd1.0', params=params)
self.enter_state('M3')
elif self._state == 'M3':
# Validate M3 response
self.validate_msg(method, None, status, reason, headers, None, content)
if 'wfd_video_formats' not in self.remote_params or 'wfd_client_rtp_ports' not in self.remote_params:
self.error('Required parameters missing from GET_PARAMETER response')
self.parse_video_formats(self.remote_params['wfd_video_formats'])
self.parse_client_rtp_ports(self.remote_params['wfd_client_rtp_ports'])
self.parse_display_edid()
self.prop_handler()
# Send M4
params = {
'wfd_video_formats': self.local_params['wfd_video_formats'],
'wfd_client_rtp_ports': self.remote_params['wfd_client_rtp_ports'],
'wfd_presentation_URL': self.presentation_url[0] + ' ' + self.presentation_url[1],
# TODO: include wfd_audio_codecs if audio present, make video optional, too
# TODO: support wfd2_video_formats and wfd2_audio_codecs
}
self.request('SET_PARAMETER', 'rtsp://localhost/wfd1.0', params=params)
self.enter_state('M4')
elif self._state == 'M4':
# Validate M4 response
self.validate_msg(method, None, status, reason, headers, None, content)
# Send M5
self.request('SET_PARAMETER', 'rtsp://localhost/wfd1.0', params={'wfd_trigger_method': 'SETUP'})
self.enter_state('M5')
elif self._state == 'M5':
# Validate M5 response
self.validate_msg(method, None, status, reason, headers, None, content)
self.enter_state('M6')
elif self._state == 'M6':
# Validate M6
self.validate_msg(method, 'SETUP', status, reason, headers, target, content)
self.parse_transport(headers['transport'])
self.session_stream_url = target
self.session_id = str(random.randint(a=1, b=999999))
self.local_rtp_port = random.randint(a=20000, b=30000)
if self.remote_rtcp_port is not None and self.rtcp_enabled:
self.local_rtcp_port = self.local_rtp_port + 1
profile ='RTP/AVP/TCP;unicast' if self.use_tcp else 'RTP/AVP/UDP;unicast'
client_port = str(self.remote_rtp_port) + (('-' + str(self.remote_rtcp_port)) if self.remote_rtcp_port is not None else '')
server_port = str(self.local_rtp_port) + (('-' + str(self.local_rtcp_port)) if self.local_rtcp_port is not None else '')
transport = profile + ';client_port' + client_port + ';server_port=' + server_port
# Section B.1
pipeline = ('ximagesrc name=src use-damage=false do-timestamp=true ! capsfilter name=fps caps=video/x-raw,framerate=10/1' +
' ! videoscale method=0 ! capsfilter name=res caps=video/x-raw,width=' + str(self.init_width) + ',height=' + str(self.init_height) +
' ! videoconvert ! video/x-raw,format=I420 ! x264enc tune=zerolatency speed-preset=ultrafast name=videnc' +
' ! queue' + # TODO: add leaky=downstream
' ! mpegtsmux name=mux' +
' ! rtpmp2tpay pt=33 mtu=1472 ! .send_rtp_sink rtpsession name=session .send_rtp_src' +
' ! udpsink name=sink host=' + self.remote_ip + ' port=' + str(self.remote_rtp_port) + ' bind-port=' + str(self.local_rtp_port)) # TODO: bind-address
if self.local_rtcp_port is not None:
pipeline += ' session.send_rtcp_src ! udpsink name=rtcp_sink host=' + self.remote_ip + \
' port=' + str(self.remote_rtcp_port) + ' bind-port=' + str(self.local_rtcp_port) # TODO: bind-address
self._props.append(WFDRTSPServer.Prop('RTCP enabled', 'Whether we\'re currently sending RTCP data',
lambda: self.local_rtcp_port is not None, None, bool, None))
self.rtp_pipeline = Gst.parse_launch(pipeline)
bus = self.rtp_pipeline.get_bus()
bus.enable_sync_message_emission()
bus.add_signal_watch()
bus.connect('message', self.on_gst_message)
self._props += self.create_running_props()
self.prop_handler()
# Send M6 response
self.response(session=self.session_id + ';timeout=' + str(self.session_timeout), transport=transport)
self.enter_state('M7')
elif self._state in ['M7', 'paused']:
# Validate M7
self.validate_msg(method, 'PLAY', status, reason, headers, target, content)
# Send M7 response
self.response()
self.rtp_pipeline.set_state(Gst.State.PLAYING)
# Set up the keep-alive timer, interval must be less than timeout minus 5 seconds
self.rtsp_keepalive = GLib.timeout_add_seconds(self.session_timeout - 10, self.rtsp_keepalive_cb)
self.enter_state('streaming')
elif self._state == 'streaming':
if method is None:
if self.rtsp_keepalive_timeout:
# The M16 response is not to be validated (Section 6.4.16)
GLib.source_remove(self.rtsp_keepalive_timeout)
self.rtsp_keepalive_timeout = None
return
self.error('Received an RTSP response where a request was expected')
if method == 'PAUSE':
self.validate_msg(method, 'PAUSE', status, reason, headers, target, content)
self.rtp_pipeline.set_state(Gst.State.PAUSED)
self.enter_state('paused')
self.response()
return
if method == 'SET_PARAMETER':
self.validate_msg(method, 'SET_PARAMETER', status, reason, headers, target, content)
for k, v in self.last_params:
if k == 'wfd_idr_request' and v is None:
self.force_keyframe()
self.response()
else:
self.error('Unknown request "' + k + '" with value ' + repr(v))
return
if method == 'TEARDOWN':
# The spec suggests a more graceful teardown but we just close the connection
self.error('Teardown requested')
self.error('Unsupported method "' + method + '"')
WIPHY_IF = 'net.connman.iwd.Adapter'
DEVICE_IF = 'net.connman.iwd.p2p.Device'
PEER_IF = 'net.connman.iwd.p2p.Peer'
WSC_IF = 'net.connman.iwd.SimpleConfiguration'
WFD_IF = 'net.connman.iwd.p2p.Display'
SVC_MGR_IF = 'net.connman.iwd.p2p.ServiceManager'
class WFDSource(Gtk.Window):
@dataclasses.dataclass
class Device:
props: dict
dev_proxy: dbus.Interface
props_proxy: dbus.Interface
peers: dict
sorted_peers: list
widget: Gtk.Widget
expanded: bool
scan_request: bool
selected_peer: object
connecting_peer: object
disconnecting_peer: object
connected: list
dbus_call: dbus.lowlevel.PendingCall
@dataclasses.dataclass
class Peer:
peer_proxy: dbus.Interface
wfd_proxy: dbus.Interface
wsc_proxy: dbus.Interface
widget: Gtk.Widget
rtsp: WFDRTSPServer
indent = '\xbb '
def __init__(self):
Gtk.Window.__init__(self, type=Gtk.WindowType.TOPLEVEL, title='WFD Source')
self.set_decorated(True)
self.set_resizable(False)
self.connect('destroy', self.on_destroy, "WM destroy")
self.set_size_request(900, 300)
self.device_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
leftscroll = Gtk.ScrolledWindow(hscrollbar_policy=Gtk.PolicyType.NEVER)
leftscroll.add(self.device_box)
self.infopane = Gtk.FlowBox(orientation=Gtk.Orientation.VERTICAL)
self.infopane.set_selection_mode(Gtk.SelectionMode.NONE)
self.infopane.set_max_children_per_line(20)
self.infopane.set_min_children_per_line(3)
self.infopane.set_column_spacing(20)
self.infopane.set_row_spacing(5)
self.infopane.set_valign(Gtk.Align.START)
self.infopane.set_halign(Gtk.Align.START)
rightscroll = Gtk.ScrolledWindow(vscrollbar_policy=Gtk.PolicyType.NEVER)
rightscroll.add(self.infopane)
paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL)
paned.pack1(leftscroll, True, True)
paned.pack2(rightscroll, False, False)
paned.set_wide_handle(True)
paned.props.position = 400
paned.props.position_set = True
self.add(paned)
self.show_all()
self.connect('notify::is-active', self.on_notify_is_active)
self.rtsp_props = None
self.rtsp_init_values = {}
self.rtsp_port = 7236
self.devices = None
self.objects = {}
self.populate_devices()
self.dbus = dbus.SystemBus()
self.dbus.watch_name_owner('net.connman.iwd', self.on_name_owner_change)
self.on_name_owner_change('dummy' if self.dbus.name_has_owner('net.connman.iwd') else '')
def on_name_owner_change(self, new_name):
if not new_name:
if self.devices is None:
return True
for dev_path in self.devices:
device = self.devices[dev_path]
if device.connecting_peer or device.disconnecting_peer:
device.dbus_call.cancel()
for peer_path in device.peers:
peer = device.peers[peer_path]
if peer.rtsp:
peer.rtsp.close()
self.devices = None
self.objects = {}
self.populate_devices()
self.dbus.remove_signal_receiver(self.on_properties_changed)
self.dbus.remove_signal_receiver(self.on_interfaces_added)
self.dbus.remove_signal_receiver(self.on_interfaces_removed)
return True
if self.devices is not None:
return True
manager = dbus.Interface(self.dbus.get_object('net.connman.iwd', '/'), 'org.freedesktop.DBus.ObjectManager')
self.dbus.add_signal_receiver(self.on_properties_changed,
bus_name="net.connman.iwd",
dbus_interface="org.freedesktop.DBus.Properties",
signal_name="PropertiesChanged",
path_keyword="path")
self.dbus.add_signal_receiver(self.on_interfaces_added,
bus_name="net.connman.iwd",
dbus_interface="org.freedesktop.DBus.ObjectManager",
signal_name="InterfacesAdded")
self.dbus.add_signal_receiver(self.on_interfaces_removed,
bus_name="net.connman.iwd",
dbus_interface="org.freedesktop.DBus.ObjectManager",
signal_name="InterfacesRemoved")
self.objects = manager.GetManagedObjects()
self.devices = {}
for path in self.objects:
if DEVICE_IF in self.objects[path]:
self.add_dev(path)
for path in self.objects:
if PEER_IF in self.objects[path]:
self.add_peer(path)
self.populate_devices()
svc_mgr = dbus.Interface(self.dbus.get_object('net.connman.iwd', '/net/connman/iwd'), SVC_MGR_IF)
svc_mgr.RegisterDisplayService({
'Source': True,
'Port': dbus.UInt16(self.rtsp_port)
})
return True
def add_dev(self, path):
obj_proxy = self.dbus.get_object('net.connman.iwd', path)
# Default to expanded for first device found
expanded = len(self.devices) == 0
self.devices[path] = WFDSource.Device(
props=self.objects[path][DEVICE_IF],
dev_proxy=dbus.Interface(obj_proxy, DEVICE_IF),
props_proxy=dbus.Interface(obj_proxy, 'org.freedesktop.DBus.Properties'),
peers={},
sorted_peers=[],
widget=None,
expanded=expanded,
scan_request=False,
selected_peer=None,
connecting_peer=None,
disconnecting_peer=None,
connected=[],
dbus_call=None)
def add_peer(self, path):
dev_path = self.objects[path][PEER_IF]['Device']
if dev_path not in self.devices or path in self.devices[dev_path].peers:
return False
self.devices[dev_path].peers[path] = WFDSource.Peer(
peer_proxy=None,
wfd_proxy=None,
wsc_proxy=None,
widget=None,
rtsp=None)
return True
def on_properties_changed(self, interface, changed, invalidated, path):
if path not in self.objects:
self.objects[path] = {}
if interface not in self.objects[path]:
self.objects[path][interface] = {}
self.objects[path][interface].update(changed)
for prop in invalidated:
if prop in self.objects[path][interface]:
del self.objects[path][interface][prop]
if path in self.devices:
self.update_dev_props(path)
if interface == DEVICE_IF and 'AvailableConnections' in changed:
self.update_selected_peer(path)
if PEER_IF in self.objects[path]:
dev_path = self.objects[path][PEER_IF]['Device']
if dev_path in self.devices:
device = self.devices[dev_path]
if path in device.peers:
peer = device.peers[path]
if interface == PEER_IF and 'Connected' in changed:
if changed['Connected'] and peer not in device.connected:
device.connected.append(peer)
elif not changed['Connected'] and peer in device.connected:
device.connected.remove(peer)
self.update_dev_props(dev_path)
self.update_peer_props(dev_path, path)
if peer != device.selected_peer:
self.update_selected_peer(dev_path)
if interface == PEER_IF and peer.rtsp:
if 'ConnectedInterface' in changed:
peer.rtsp.set_local_interface(changed['ConnectedInterface'])
if 'ConnectedIp' in changed:
peer.rtsp.set_remote_ip(changed['ConnectedIp'])
self.update_peer_props(dev_path, path)
return True
def on_interfaces_added(self, path, interfaces):
if path not in self.objects:
self.objects[path] = {}
self.objects[path].update(interfaces)
if DEVICE_IF in interfaces:
self.add_dev(path)
# This should happen rarely enough that we can repopulate the whole list
self.populate_devices()
update_dev_props = False
if PEER_IF in interfaces:
update_dev_props = self.add_peer(path)
if PEER_IF in self.objects[path]:
dev_path = self.objects[path][PEER_IF]['Device']
if dev_path in self.devices:
if update_dev_props:
# Update device's peer count
self.update_dev_props(dev_path)
self.update_peer_props(dev_path, path)
def on_interfaces_removed(self, path, interfaces):
if path not in self.objects:
return
dev_path = None
if PEER_IF in interfaces or WFD_IF in interfaces or WSC_IF in interfaces:
if PEER_IF in self.objects[path]:
dev_path = self.objects[path][PEER_IF]['Device']
for i in interfaces:
if i in self.objects[path]:
del self.objects[path][i]
if len(self.objects[path]) == 0:
del self.objects[path]
if DEVICE_IF in interfaces and path in self.devices:
device = self.devices[path]
if device.connecting_peer or device.disconnecting_peer:
device.dbus_call.cancel()
# TODO: check if connected
del self.devices[path]
# This should happen rarely enough that we can repopulate the whole list
self.populate_devices()
if dev_path is not None and dev_path in self.devices:
device = self.devices[dev_path]
if path in device.peers:
# Make sure the widget is removed
self.update_peer_props(dev_path, path)
if PEER_IF in interfaces:
del device.peers[path]
# Update device's peer count
self.update_dev_props(dev_path)
def populate_devices(self):
self.device_box.foreach(lambda x, y: self.device_box.remove(x), None)
if self.devices is None:
label = Gtk.Label(label="Not connected to IWD")
self.device_box.pack_start(label, expand=True, fill=True, padding=0)
self.device_box.show_all()
return
if len(self.devices) == 0:
label = Gtk.Label(label="No P2P-capable adapters :-(")
self.device_box.pack_start(label, expand=True, fill=True, padding=0)
self.device_box.show_all()
return
for path in self.devices:
label = Gtk.Label()
label.set_halign(Gtk.Align.START)
label.set_line_wrap(False)
label.set_single_line_mode(False)
label.set_ellipsize(Pango.EllipsizeMode.END)
switch = Gtk.Switch()
switch.connect('state-set', self.on_dev_enabled, path)
switch.set_halign(Gtk.Align.END)
switch.set_valign(Gtk.Align.START)
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
box.pack_start(label, expand=True, fill=True, padding=0)
box.pack_end(switch, expand=False, fill=False, padding=0)
peer_list = Gtk.ListBox() # can also use an IconView.. or make it switchable
peer_list.set_size_request(150, 120)
peer_list.set_selection_mode(Gtk.SelectionMode.SINGLE)
peer_list.set_placeholder(Gtk.Label(label='No Wi-Fi Displays discovered yet...'))
peer_list.connect('row-selected', self.on_peer_selected, path)
frame = Gtk.Frame()
frame.props.margin = 10
frame.add(peer_list)
expander = Gtk.Expander()
expander.set_label_fill(True)
expander.set_expanded(self.devices[path].expanded)
expander.set_label_widget(box)
expander.add(frame)
expander.connect('notify::expanded', self.on_dev_expanded, path)
expander.show_all()
self.device_box.add(expander)
self.devices[path].widget = expander
self.update_dev_props(path)
GLib.idle_add(self.expander_workaround, expander)
for peer_path in self.devices[path].peers:
self.update_peer_props(path, peer_path)
# Basically implement Gtk.Expander's set_label_fill which for some reason
# doesn't do anything. Use size-allocate because configure-event doesn't work either...
self.margin_left = None
def on_exp_resize(widget, event):
if self.margin_left is None:
self.margin_left = box.get_allocation().x
posx, posy = expander.translate_coordinates(self, 0, 0)
# Add posx to force the label widget (box) to be aligned to the left side of the
# window even if GTK already decided to push the expander off the left side with a
# negative allocation.x. This way it won't push it any further left as the available
# space shrinks.
box.set_size_request(max(posx + expander.get_allocated_width() - self.margin_left - 1, 0), -1)
return False
expander.connect('size-allocate', on_exp_resize)
def expander_workaround(self, widget):
box = widget.get_label_widget()
widget.set_label_widget(None)
widget.set_label_widget(box)
return False
def update_dev_props(self, path):
device = self.devices[path]
if not device.props['Enabled']:
state = 'disabled'
elif device.disconnecting_peer is not None:
state = 'disconnecting...'
elif device.connecting_peer is not None:
state = 'connecting...'
elif len(device.connected) > 0:
if all([not peer.rtsp or peer.rtsp.ready for peer in device.connected]):
state = 'connected'
else:
state = 'negotiating...'
elif device.scan_request:
state = 'discovering... (' + str(len(device.peers)) + ')'
else:
state = 'idle'
label, switch = device.widget.get_label_widget().get_children()
dev_str = self.get_dev_string(path)
name = str(device.props['Name'])
label.set_markup(dev_str + '\n<small>' + ('Local name: ' + name + '\n' if dev_str != name else '') + 'State: ' + state + '</small>')
switch.set_active(device.props['Enabled'])
def update_peer_props(self, dev_path, path):
device = self.devices[dev_path]
peer = device.peers[path]
props = self.objects[path] if path in self.objects else {}
peer_list = device.widget.get_child().get_child()
if peer.widget is None:
if PEER_IF not in props or WFD_IF not in props or WSC_IF not in props:
return
if not props[WFD_IF]['Sink']:
return
name = str(props[PEER_IF]['Name'])
device.sorted_peers.append(name)
device.sorted_peers.sort()
index = device.sorted_peers.index(name)
obj_proxy = self.dbus.get_object('net.connman.iwd', path)
peer.peer_proxy=dbus.Interface(obj_proxy, PEER_IF)
peer.wfd_proxy=dbus.Interface(obj_proxy, WFD_IF)
peer.wsc_proxy=dbus.Interface(obj_proxy, WSC_IF)
label = Gtk.Label()
label.set_halign(Gtk.Align.START)
label.set_single_line_mode(True)
label.set_ellipsize(Pango.EllipsizeMode.END)
event_box = Gtk.EventBox()
event_box.add(label)
event_box.connect('button-press-event', self.on_peer_click, (dev_path, path))
button = Gtk.Button()
button.set_use_stock(True)
button.connect('clicked', self.on_peer_button, (dev_path, path))
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
box.props.margin = 5;
box.pack_start(event_box, expand=True, fill=True, padding=0)
box.pack_end(button, expand=False, fill=False, padding=0)
peer.widget = Gtk.ListBoxRow()
peer.widget.add(box)
peer_list.insert(peer.widget, index)
peer.widget.show_all()
elif (PEER_IF not in props or WFD_IF not in props or WSC_IF not in props or not props[WFD_IF]['Sink']) and peer.widget:
tmp = peer.widget
peer.widget = None
del device.sorted_peers[tmp.get_index()]
peer_list.remove(tmp)
if peer == device.selected_peer:
device.selected_peer = None
self.update_info_pane(dev_path, None)
if peer == device.connecting_peer:
device.dbus_call.cancel()
device.connecting_peer = None
self.update_selected_peer(dev_path)
if peer == device.disconnecting_peer:
device.dbus_call.cancel()
device.disconnecting_peer = None
self.update_selected_peer(dev_path)
if peer in device.connected:
device.connected.remove(peer)
self.update_selected_peer(dev_path)
peer.peer_proxy = None
peer.wfd_proxy = None
peer.wsc_proxy = None
if peer.rtsp:
peer.rtsp.close()
peer.rtsp = None
return
subcat = 'unknown type'
if 'DeviceSubcategory' in props[PEER_IF]:
subcat = props[PEER_IF]['DeviceSubcategory']
weight = 'heavy' if peer in device.connected else 'normal'
box = peer.widget.get_child()
event_box, button = box.get_children()
label, = event_box.get_children()
label.set_markup('<span weight="' + weight + '">' + props[PEER_IF]['Name'] + '</span> <span foreground="grey" size="small">' + subcat + '</span>')
if device.disconnecting_peer or (device.connecting_peer and peer != device.connecting_peer):
# This peer's row should not have any buttons
button.hide()
elif peer == device.connecting_peer:
button.set_label('Cancel')
button.show()
elif peer in device.connected:
if not peer.rtsp or peer.rtsp.ready:
button.set_label('Disconnect')
else:
button.set_label('Cancel')
button.show()
elif peer == device.selected_peer and device.props['AvailableConnections'] > 0:
button.set_label('Connect')
button.show()
else:
button.hide()
if peer == device.selected_peer:
self.update_info_pane(dev_path, path)
def update_selected_peer(self, dev_path):
device = self.devices[dev_path]
if device.selected_peer:
sel_path = self.get_peer_path(device, device.selected_peer)
self.update_peer_props(dev_path, sel_path)
def add_info(self, name, desc, valuewidget):
namelabel = Gtk.Label(label=name + ':', xalign=0)
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
box.pack_start(namelabel, expand=False, fill=False, padding=3)
if valuewidget:
box.pack_end(valuewidget, expand=False, fill=False, padding=3)
if desc:
box.set_tooltip_text(desc)
self.infopane.add(box)
def add_info_str(self, name, value):
vlabel = Gtk.Label(xalign=0)
vlabel.set_markup('<span weight="bold">' + value + '</span>')
self.add_info(name, None, vlabel)
def add_info_prop(self, prop):
val = prop.getter()
if prop.setter is None:
if val is None:
return
if prop.type == bool:
vals = prop.vals if prop.vals is not None else ['no', 'yes']
text = vals[val]
elif prop.name == 'EDID info':
text = WFDSource.edid_to_text(val)
if isinstance(text, collections.abc.Sequence):
self.add_info(prop.name, prop.desc, None)
for name, val in text:
if val:
v = Gtk.Label(xalign=0)
v.set_markup('<span weight="bold">' + str(val) + '</span>')
else:
v = None
self.add_info(self.indent + name, prop.desc, v)
return
else:
text = str(val)
v = Gtk.Label(xalign=0)
v.set_markup('<span weight="bold">' + text + '</span>')
elif val is None:
return
elif prop.type == bool:
v = Gtk.Switch()
v.set_active(val)
v.connect('state-set', lambda switch, state: prop.setter(state))
elif prop.type == int:
v = Gtk.SpinButton.new_with_range(min=prop.vals[0], max=prop.vals[1], step=prop.vals[2] if len(prop.vals) > 2 else 1)
v.set_value(val)
v.connect('value-changed', lambda sb: prop.setter(int(sb.get_value())))
elif prop.type == str:
if prop.vals:
v = Gtk.ComboBoxText()
for option in prop.vals:
v.append(option, option)
v.set_active_id(val)
v.connect('changed', lambda entry: prop.setter(entry.get_active_text()))
else:
v = Gtk.Entry(text=val)
v.connect('changed', lambda entry: prop.setter(entry.get_text()))
self.add_info(prop.name, prop.desc, v)
def update_info_pane(self, dev_path, path):
self.infopane.foreach(lambda x, y: self.infopane.remove(x), None)
if path is None:
return
device = self.devices[dev_path]
peer = device.peers[path]
if peer == device.connecting_peer:
state = 'IWD connecting'
elif peer == device.disconnecting_peer:
state = 'disconnecting'
elif peer in device.connected:
if peer.rtsp is not None:
if peer.rtsp.ready:
state = peer.rtsp.state
else:
state = 'RTSP negotiation: ' + peer.rtsp.state
else:
state = 'connected'
else:
state = 'not connected'
self.add_info_str('Connection state', state)
subcat = 'unknown'
if 'DeviceSubcategory' in self.objects[path][PEER_IF]:
subcat = self.objects[path][PEER_IF]['DeviceSubcategory']
self.add_info_str('Peer category', self.objects[path][PEER_IF]['DeviceCategory'])
self.add_info_str('Peer subcategory', subcat)
if WFD_IF in self.objects[path]:
if self.objects[path][WFD_IF]['Source']:
if self.objects[path][WFD_IF]['Sink']:
t = 'dual-role'
else:
t = 'source'
else:
t = 'sink'
self.add_info_str('Peer WFD type', t)
if self.objects[path][WFD_IF]['Sink']:
self.add_info_str('Peer audio support', 'yes' if self.objects[path][WFD_IF]['HasAudio'] else 'no')
self.add_info_str('Peer UIBC support', 'yes' if self.objects[path][WFD_IF]['HasUIBC'] else 'no')
self.add_info_str('Peer content protection', 'yes' if self.objects[path][WFD_IF]['HasContentProtection'] else 'no')
if peer.rtsp is not None and peer.rtsp.ready:
def force_keyframe(widget):
peer.rtsp.force_keyframe()
return True
def reset_stream(widget):
peer.rtsp.reset_stream()
return True
# The idea for these buttons is to make sure any parameter changes get fully applied
button1 = Gtk.Button()
button1.set_label('Force keyframe')
button1.connect('clicked', force_keyframe)
button2 = Gtk.Button()
button2.set_label('Reset stream')
button2.connect('clicked', reset_stream)
box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL)
box.pack_start(button1, expand=False, fill=False, padding=3)
box.pack_start(button2, expand=False, fill=False, padding=3)
self.infopane.add(box)
if self.rtsp_props is None:
self.rtsp_props, self.rtsp_init_values = WFDRTSPServer.get_init_props()
if peer.rtsp is not None:
props = peer.rtsp.props
else:
props = self.rtsp_props
for prop in props:
self.add_info_prop(prop)
self.infopane.show_all()
# Direct method calls on dbus.Interface's don't return dbus.lowlevel.PendingCall objects so
# we have to use bus.call_async to make cancellable async calls
def async_call(self, proxy, method, signature='', *args, **kwargs):
return self.dbus.call_async(proxy.bus_name, proxy.object_path, proxy.dbus_interface, method, signature, args, **kwargs)
def connect_peer(self, dev_path, path):
device = self.devices[dev_path]
peer = device.peers[path]
def on_reply():
device.connected.append(peer)
device.connecting_peer = None
# Local interface and remote IP get set in the PropertiesChanged handler
self.update_dev_props(dev_path)
self.update_peer_props(dev_path, path)
if peer != device.selected_peer:
self.update_selected_peer(dev_path)
def on_error(excp):
device.connecting_peer = None
if peer.rtsp:
peer.rtsp.close()
peer.rtsp = None
self.update_dev_props(dev_path)
self.update_peer_props(dev_path, path)
if peer != device.selected_peer:
self.update_selected_peer(dev_path)
dialog = Gtk.MessageDialog(parent=self, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.OK, text='Connection failed')
dialog.format_secondary_text('Connection to ' + self.objects[path][PEER_IF]['Name'] + ' failed: ' + repr(excp))
dialog.show()
def on_ok(response, *args):
dialog.destroy()
dialog.connect('response', on_ok)
def on_rtsp_state():
self.update_dev_props(dev_path)
self.update_peer_props(dev_path, path)
if peer != device.selected_peer:
self.update_selected_peer(dev_path)
def on_rtsp_error(excp):
self.disconnect_peer(dev_path, path)
tb = ''
try:
tb = '\n\nDebug info: ' + traceback.format_exc()
except:
pass
dialog = Gtk.MessageDialog(parent=self, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.OK, text='Negotiation failed')
dialog.format_secondary_text('RTSP error when talking to ' + self.objects[path][PEER_IF]['Name'] + ': ' + repr(excp) + tb)
dialog.show()
def on_ok(response, *args):
dialog.destroy()
dialog.connect('response', on_ok)
def on_rtsp_props_changed():
# Should also check if the infopane is currently showing a selected peer on another device...
if peer == device.selected_peer:
self.update_info_pane(dev_path, path)
# Cannot use peer.wsc_proxy.PushButton()
device.dbus_call = self.async_call(peer.wsc_proxy, 'PushButton', reply_handler=on_reply, error_handler=on_error, timeout=120)
device.connecting_peer = peer
# Create the RTSP server now so it's ready as soon as the P2P connection succeeds even if
# we haven't received the DBus reply yet
peer.rtsp = WFDRTSPServer(self.rtsp_port, on_rtsp_state, on_rtsp_error, self.rtsp_init_values, on_rtsp_props_changed)
self.update_dev_props(dev_path)
self.update_peer_props(dev_path, path)
if peer != device.selected_peer:
self.update_selected_peer(dev_path)
def disconnect_peer(self, dev_path, path):
device = self.devices[dev_path]
peer = device.peers[path]
def on_reply():
device.disconnecting_peer = None
self.update_dev_props(dev_path)
self.update_peer_props(dev_path, path)
if peer != device.selected_peer:
self.update_selected_peer(dev_path)
def on_error(excp):
device.disconnecting_peer = None
self.update_dev_props(dev_path)
self.update_peer_props(dev_path, path)
if peer != device.selected_peer:
self.update_selected_peer(dev_path)
if isinstance(excp, dbus.exceptions.DBusException) and excp.get_dbus_name() == 'net.connman.iwd.NotConnected':
return
dialog = Gtk.MessageDialog(parent=self, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.OK, text='Disconnecting failed')
dialog.format_secondary_text('Disconnecting from ' + self.objects[path][PEER_IF]['Name'] + ' failed: ' + repr(excp))
dialog.show()
def on_ok(response, *args):
dialog.destroy()
dialog.connect('response', on_ok)
if peer == device.connecting_peer:
device.dbus_call.cancel()
device.connecting_peer = None
if peer in device.connected:
device.connected.remove(peer)
if peer.rtsp:
peer.rtsp.close()
peer.rtsp = None
device.dbus_call = self.async_call(peer.peer_proxy, 'Disconnect', reply_handler=on_reply, error_handler=on_error)
device.disconnecting_peer = peer
self.update_dev_props(dev_path)
self.update_peer_props(dev_path, path)
if peer != device.selected_peer:
self.update_selected_peer(dev_path)
def on_peer_click(self, widget, event, data):
if event.button != 1 or event.type != Gdk.EventType._2BUTTON_PRESS:
return False
dev_path, path = data
device = self.devices[dev_path]
if device.disconnecting_peer:
return True
if device.connecting_peer or not device.props['AvailableConnections']:
# Should we auto-disconnect from the connected peer? Show an "Are you sure?" dialog?
return True
self.connect_peer(dev_path, path)
return True
def on_peer_button(self, widget, data):
dev_path, path = data
action = widget.get_label()
device = self.devices[dev_path]
if device.disconnecting_peer:
return True
if action == 'Connect':
self.connect_peer(dev_path, path)
elif action in ['Disconnect', 'Cancel']:
self.disconnect_peer(dev_path, path)
return True
def get_peer_path(self, device, peer):
for path in device.peers:
if device.peers[path] == device.selected_peer:
return path
return None
def on_peer_selected(self, widget, row, dev_path):
device = self.devices[dev_path]
if device.selected_peer is not None:
if device.selected_peer.widget == row:
return True
path = self.get_peer_path(device, device.selected_peer)
device.selected_peer = None
self.update_peer_props(dev_path, path)
self.update_info_pane(dev_path, None)
if row is None:
return True
for path in device.peers:
if device.peers[path].widget == row:
device.selected_peer = device.peers[path]
self.update_peer_props(dev_path, path)
return True
def update_dev_scan_request(self, path):
device = self.devices[path]
should_request = device.expanded and self.is_active()
if device.scan_request == should_request:
return
device.scan_request = should_request
if device.scan_request:
device.dev_proxy.RequestDiscovery()
else:
device.dev_proxy.ReleaseDiscovery()
self.update_dev_props(path)
def on_notify_is_active(self, window, value):
if self.devices is None:
return True
for path in self.devices:
self.update_dev_scan_request(path)
return True
def on_dev_enabled(self, switch, state, path):
device = self.devices[path]
if device.props['Enabled'] == state:
return
device.props['Enabled'] = state
device.props_proxy.Set(DEVICE_IF, 'Enabled', state)
return True
def on_dev_expanded(self, expander, value, path):
device = self.devices[path]
device.expanded = expander.get_expanded()
self.update_dev_scan_request(path)
return True
def get_dev_string(self, path):
wiphy = self.objects[path][WIPHY_IF]
if 'Model' in wiphy:
return wiphy['Model']
if 'Vendor' in wiphy:
return wiphy['Vendor']
return wiphy['Name']
def on_destroy(self, widget, data):
global mainloop
if self.devices is not None:
svc_mgr = dbus.Interface(self.dbus.get_object('net.connman.iwd', '/net/connman/iwd'), SVC_MGR_IF)
svc_mgr.UnregisterDisplayService()
self.on_name_owner_change('')
mainloop.quit()
return False
@staticmethod
def edid_to_text(edid):
if edid is None:
return 'unavailable'
if len(edid) < 128:
return 'invalid (too short)'
if edid[0:8] != b'\0\xff\xff\xff\xff\xff\xff\0':
return 'invalid (bad magic)'
if sum(edid[0:128]) & 255 != 0:
return 'invalid (bad checksum)'
header = edid[0:20]
manf_id = (header[8] << 8) + header[9]
text = [('Header', '')]
text.append((WFDSource.indent + 'Version', str(header[18]) + '.' + str(header[19])))
text.append((WFDSource.indent + 'Manufacturer ID', chr(64 + ((manf_id >> 10) & 31)) + chr(64 + ((manf_id >> 5) & 31)) + chr(64 + ((manf_id >> 0) & 31))))
text.append((WFDSource.indent + 'Product code', hex((header[11] << 8) + header[10])))
text.append((WFDSource.indent + 'Serial', hex((header[15] << 24) +(header[14] << 16) + (header[13] << 8) + header[12])))
text.append((WFDSource.indent + 'Manufactured', str(1990 + header[17]) + ' week ' + str(header[16])))
basic_params = edid[20:25]
text.append(('Basic parameters', ''))
if basic_params[0] & 0x80:
intf_table = {
2: 'HDMIa',
3: 'HDMIb',
4: 'MDDI',
5: 'DisplayPort'
}
dt_table = {
0: 'RGB 4:4:4',
1: 'RGB 4:4:4 + YCrCb 4:4:4',
2: 'RGB 4:4:4 + YCrCb 4:2:2',
3: 'RGB 4:4:4 + YCrCb 4:4:4 + YCrCb 4:2:2'
}
bpp = (basic_params[0] >> 4) & 7
intf = (basic_params[0] >> 0) & 7
text.append((WFDSource.indent + 'Video input type', 'digital'))
text.append((WFDSource.indent + 'Bit depth', 'undefined' if bpp in [0, 7] else str(4 + bpp * 2)))
text.append((WFDSource.indent + 'Interface', 'undefined' if intf not in intf_table else intf_table[intf]))
else:
level_table = {
0: '+0.7 / -0.3 V',
1: '+0.714 / -0.286 V',
2: '+1.0 / -0.4 V',
3: '+0.7 / 0 V'
}
dt_table = {
0: 'monochrome/grayscale',
1: 'RGB color',
2: 'non-RGB color',
3: 'undefined'
}
text.append((WFDSource.indent + 'Video input type', 'analog'))
text.append((WFDSource.indent + 'Video white/sync level', level_table[(basic_parmas[0] >> 5) & 3]))
if basic_params[1] and basic_params[2]:
text.append((WFDSource.indent + 'Screen width', str(basic_params[1]) + ' cm'))
text.append((WFDSource.indent + 'Screen height', str(basic_params[2]) + ' cm'))
elif basic_params[2] == 0:
text.append((WFDSource.indent + 'Landscape aspect ratio', str((basic_params[1] + 99) * 0.01)))
else:
text.append((WFDSource.indent + 'Portrait aspect ratio', str(100.0 / (basic_params[2] + 99))))
text.append((WFDSource.indent + 'Gamma', str((basic_params[3] + 100) * 0.01)))
text.append((WFDSource.indent + 'DPMS Standby', 'supported' if (basic_params[4] >> 7) & 1 else 'unsupported'))
text.append((WFDSource.indent + 'DPMS Suspend', 'supported' if (basic_params[4] >> 6) & 1 else 'unsupported'))
text.append((WFDSource.indent + 'DPMS Active-off', 'supported' if (basic_params[4] >> 5) & 1 else 'unsupported'))
text.append((WFDSource.indent + 'Color type', dt_table[(basic_params[4] >> 3) & 3]))
text.append((WFDSource.indent + 'sRGB color space', 'yes' if (basic_params[4] >> 2) & 1 else 'no'))
text.append((WFDSource.indent + 'Continuous timings', 'yes' if (basic_params[4] >> 0) & 1 else 'no'))
# TODO: timing information and extensions
return text
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
Gst.init(None)
WFDSource()
mainloop = GLib.MainLoop()
mainloop.run()