From e64de776a7653890466150876cf7ba601999d4a9 Mon Sep 17 00:00:00 2001 From: Andrew Zaborowski Date: Thu, 16 Jul 2020 02:12:27 +0200 Subject: [PATCH] test: Add a sample Wi-Fi Display source app --- test/wfd-source | 1318 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1318 insertions(+) create mode 100755 test/wfd-source diff --git a/test/wfd-source b/test/wfd-source new file mode 100755 index 00000000..557093a9 --- /dev/null +++ b/test/wfd-source @@ -0,0 +1,1318 @@ +#! /usr/bin/python3 +# +# Copyright (C) 2020 Intel Corporation +# +# A simplified WFD source that streams the X11 screen using gstreamer +# A more complete solution would create a virtual screen visible through the normal system calls, xrandr, etc., +# with its pixel aspect ratio, EDID data and what not. This would allow the user to configure it like a real +# display in mirror mode or side-by-side mode. + +import sys +import dbus +import dbus.mainloop.glib +import socket +import collections +import collections.abc +import random +import dataclasses + +import gi +gi.require_version('GLib', '2.0') +gi.require_version('Gst', '1.0') +gi.require_version('Gtk', '3.0') +from gi.repository import GLib, Gst, Gtk, Gdk, Pango + +class WFDRTSPServer: + class RTSPException(Exception): + pass + + def __init__(self, port, state_handler, error_handler): + # Should start the TCP server only on the P2P connection's local IP but we won't + # know the IP or interface name until after the connection is established. At that + # time the sink may try to make the TCP connection at any time so our listen + # socket should be up before this. + server_address = ('0.0.0.0', port) + self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) + self.server.bind(server_address) + self.server.listen(1) + GLib.io_add_watch(self.server, GLib.IO_IN, self.handle_connection) + self.conn = None + self.tx_queue = [] + self.rx_queue = b'' + + self.state_handler = state_handler + self.error_handler = error_handler + self.sm_init() + + def handle_data_out(self, conn, *args): + try: + cmd = self.tx_queue.pop(0) + sent = self.conn.send(cmd) + + if sent < len(cmd): + self.tx_queue.insert(0, cmd[sent:]) + + return len(self.tx_queue) > 0 + except Exception as e: + self.error_handler(e) + return False + + def tx_queue_append(self, cmd): + if not self.tx_queue: + GLib.io_add_watch(self.conn.fileno(), GLib.IO_OUT, self.handle_data_out) + + self.tx_queue.append(cmd.encode('utf-8')) + self.debug('queued cmd: ' + cmd) + + def handle_data_hup(self, conn, *args): + try: + self.debug('HUP') + self.error('Disconnected') + except Exception as e: + self.error_handler(e) + return False + + def handle_data_in(self, conn, *args): + try: + newdata = self.conn.recv(4096) + if len(newdata) == 0: + self.debug('recv returned 0 bytes') + # Disconnect from P2P + self.error('Disconnected') + return False + + self.debug('received data: ' + str(newdata)) + self.rx_queue += newdata + + while b'\r\n\r\n' in self.rx_queue: + msg, content = self.rx_queue.split(b'\r\n\r\n', 1) + lines = msg.split(b'\r\n') + + headers = {} + for line in lines[1:]: + if b':' not in line: + # Bad syntax + rxbuf = b'' + return True + + name, value = line.decode('utf8').split(':', 1) + name = name.lower() + while len(value) and value[0] == ' ': + value = value[1:] + + if name in headers: + # Duplicate + rxbuf = b'' + return True + + headers[name] = value + + cl = 0 + if 'content-length' in headers: + try: + cl = int(headers['content-length']) + if cl < 1 or cl > 1000: + raise Exception('') + except: + # Bad syntax + rxbuf = b'' + return True + + if len(content) < cl: + # Wait for more data + return True + + top_line = lines[0].decode('utf8').split(None, 2) + self.rx_queue = self.rx_queue[len(msg) + 4 + cl:] + content = content[:cl] + + if top_line[2] == 'RTSP/1.0': + self.source_handle_message(method=top_line[0], target=top_line[1], headers=headers, content=content) + elif top_line[0] == 'RTSP/1.0': + try: + status = int(top_line[1]) + if status < 1 or status > 999: + raise Exception('Status out of range') + except: + self.error('Couldn\'t parse response status') + + self.source_handle_message(status=status, reason=top_line[2], headers=headers, content=content) + else: + # Bad protocol + self.error('Unknown protocol in ' + str(top_line)) + + return True + except Exception as e: + self.error_handler(e) + return False + + def handle_connection(self, sock, *args): + try: + if self.conn: + return False + self.conn, addr = sock.accept() + self.debug('RTSP connection from: ' + str(addr)) + self.remote_ip = addr[0] + + if self.expected_remote_ip and self.remote_ip != self.expected_remote_ip: + self.conn.close() + self.conn = None + self.debug('Connection refused, bad source address') + return True + + sock.close() + self.server = None + GLib.io_add_watch(self.conn.fileno(), GLib.IO_IN, self.handle_data_in) + GLib.io_add_watch(self.conn.fileno(), GLib.IO_HUP, self.handle_data_hup) + + self._state = 'init' + self.source_handle_message() + return False + except Exception as e: + self.error_handler(e) + return False + + def error(self, msg): + self.enter_state('failed') + e = WFDRTSPServer.RTSPException('State ' + self._state + ': ' + msg) + self.debug('error: ' + msg) + raise e + + def warning(self, msg): + self.debug('warning: ' + msg) + print('Warning: ' + msg + '\n') + + def debug(self, msg): + pass + + @property + def state(self): + return self._state + + def enter_state(self, new_state): + self.debug('state change: ' + self._state + ' -> ' + new_state) + self._state = new_state + self.state_handler() + + @property + def ready(self): + return self._state in ['streaming', 'paused'] + + def sm_init(self): + self._state = 'waiting-rtsp' + self.local_params = { + 'wfd_video_formats': '00 00 01 08 00000000 00000000 00000040 00 0000 0000 00 none none' + } + self.remote_params = {} + self.local_methods = [ 'org.wfa.wfd1.0', 'SET_PARAMETER', 'GET_PARAMETER', 'PLAY', 'SETUP', 'TEARDOWN' ] + self.presentation_url = [ 'rtsp://127.0.0.1/wfd1.0/streamid=0', 'none' ] # Table 88 + self.session_stream_url = None + self.session_id = None + self.session_timeout = 60 + self.local_cseq = 0 + self.remote_cseq = None + self.last_method = None + self.last_require = [] + self.last_params = [] + self.remote_rtp_port = None + self.remote_rtcp_port = None + self.local_rtp_port = None + self.local_rtcp_port = None + self.use_tcp = None + self.rtp_pipeline = None + self.rtsp_keepalive = None + self.rtsp_keepalive_timeout = None + self.expected_remote_ip = None + self.remote_ip = None + + def close(self): + # Avoid passing self to io watches so that the refcount can ever reach 0 and + # all this can be done in __del__ + if self.rtsp_keepalive: + GLib.source_remove(self.rtsp_keepalive) + self.rtsp_keepalive = None + if self.rtsp_keepalive_timeout: + GLib.source_remove(self.rtsp_keepalive_timeout) + self.rtsp_keepalive_timeout = None + if self.rtp_pipeline: + self.rtp_pipeline.set_state(Gst.State.NULL) + self.rtp_pipeline = None + if self.server: + self.server.close() + self.server = None + if self.conn: + self.conn.close() + self.conn = None + + def set_local_interface(self, new_value): + pass + + def set_remote_ip(self, new_value): + self.expected_remote_ip = new_value + + if self.conn and self.remote_ip != self.expected_remote_ip: + self.error_handler(WFDRTSPServer.RTSPException('Connection was from a wrong IP')) # TODO: do this in an idle cb + + def validate_msg(self, method, expected_method, status, reason, headers, target, content): + if expected_method is None: + # Expected a response, not a request + if method is not None: + self.error('Received a "' + method + '" request where a response was expected') + if status < 200 or status > 299: + self.error('Response status ' + str(status) + ' and reason: ' + reason) + if status != 200: + self.warning('Response status was ' + str(status) + ' ("' + reason + '") in state ' + self._state) + + try: + if int(headers['cseq']) != self.local_cseq: + self.error('Response CSeq doesn\'t match') + except: + self.error('Missing or unparsable CSeq in a response') + + if self.last_method == 'OPTIONS': + if 'public' not in headers: + self.error('Missing "Public" header in OPTIONS response') + public = [ m.strip() for m in headers['public'].split(',') ] + missing = [ m for m in self.last_require if m not in public ] + if missing: + self.error('Missing required method(s) "' + '", "'.join(missing) + '" in OPTIONS response') + + if self.last_method == 'GET_PARAMETER': + params = {} + for line in content.split(b'\r\n'): + if b':' not in line: + continue + k, v = line.decode('utf8').split(':', 1) + if k.strip() in params: + self.error('Duplicate key "' + k + '" in GET_PARAMETER response') + params[k.strip()] = v.strip() + missing = [ p for p in self.last_params if p not in params ] + if missing: # Not an error + self.warning('Missing key(s) "' + '", "'.join(missing) + '" in GET_PARAMETER response') + self.remote_params.update(params) + + return + + if method is None: + self.error('Received an RTSP response where a ' + expected_method + ' was expected') + + if method != expected_method: + self.error('Received a "' + method + '" request where a ' + expected_method + ' was expected') + try: + if self.remote_cseq is not None and int(headers['cseq']) <= self.remote_cseq: + self.error('Unchanged CSeq in a new request') + self.remote_cseq = int(headers['cseq']) + except: + self.error('Missing or unparsable CSeq in a new request') + if method == 'OPTIONS' and 'require' not in headers: + self.error('Missing "Require" header in OPTIONS request') + elif method == 'SETUP' and 'transport' not in headers: + self.error('Missing "Transport" header in SETUP request') + elif method == 'SETUP' and (target not in self.presentation_url or target == 'none'): + self.error('Unknown target "' + target + '" in SETUP request') + elif method == 'PLAY' and ('session' not in headers or headers['session'] != self.session_id): + self.error('Missing or invalid "Session" header in PLAY request') + elif method == 'PLAY' and target != self.session_stream_url: + self.error('Unknown target "' + target + '" in PLAY request') + elif method == 'PAUSE' and 'session' not in headers: + self.error('Missing "Session" header in PAUSE request') + elif method == 'PAUSE' and target != self.session_stream_url: + self.error('Unknown target "' + target + '" in PAUSE request') + elif method == 'TEARDOWN' and 'session' not in headers: + self.error('Missing "Session" header in TEARDOWN request') + elif method == 'TEARDOWN' and target != self.session_stream_url: + self.error('Unknown target "' + target + '" in TEARDOWN request') + elif method == 'SET_PARAMETER': + pass + + def request(self, method, target, require=[], params=[]): + content = '' + cmd = method + ' ' + target + ' RTSP/1.0\r\n' + + self.local_cseq += 1 + cmd += 'CSeq: ' + str(self.local_cseq) + '\r\n' + + if require: + cmd += 'Require: ' + ', '.join(require) + '\r\n' + + if params: + if isinstance(params, collections.abc.Mapping): + content = ''.join([ k + ': ' + params[k] + '\r\n' for k in params ]) + else: + content = ''.join([ k + '\r\n' for k in params ]) + content_type = 'text/parameters' + + if content: + cmd += 'Content-Type: ' + content_type + '\r\n' + cmd += 'Content-Length: ' + str(len(content)) + '\r\n' + + cmd += '\r\n' + self.tx_queue_append(cmd + content) + self.last_method = method + self.last_require = require + self.last_params = params + + def response(self, public=[], session=None, transport=None): + cmd = 'RTSP/1.0 200 OK\r\n' + + cmd += 'CSeq: ' + str(self.remote_cseq) + '\r\n' + + if public: + cmd += 'Public: ' + ', '.join(public) + '\r\n' + if session is not None: + cmd += 'Session: ' + session + '\r\n' + if transport is not None: + cmd += 'Transport: ' + transport + '\r\n' + + cmd += '\r\n' + self.tx_queue_append(cmd) + + def parse_video_formats(self, value): + # TODO + pass + + def parse_client_rtp_ports(self, value): + profile, rtp_p0_str, rtp_p1_str, mode = value.split() + try: + rtp_p0 = int(rtp_p0_str) + rtp_p1 = int(rtp_p1_str) + except: + self.error('Can\'t parse rtp-port in wfd-client-rtp-ports: ' + value) + if rtp_p0 < 1 or rtp_p0 > 65535: + self.error('rtp-port0 not valid for Primary Sink: ' + rtp_p0_str) + if rtp_p1 != 0: # Table 90 + self.error('rtp-port1 not valid for Primary Sink: ' + rtp_p1_str) + if profile not in ['RTP/AVP/UDP;unicast', 'RTP/AVP/TCP;unicast']: + self.error('Unknown RTP transport in wfd-client-rtp-ports: ' + profile) + if mode != 'mode=play': + self.error('Unknown mode in wfd-client-rtp-ports: ' + mode) + self.remote_rtp_port = rtp_p0 + self.use_tcp = (profile == 'RTP/AVP/TCP;unicast') + + def parse_transport(self, value): + params = value.split(';') + if len(params) < 3: + self.error('Can\'t split SETUP Transport header into profile and port numbers: ' + value) + profile = ';'.join(params[0:2]) + if profile not in ['RTP/AVP/UDP;unicast', 'RTP/AVP/TCP;unicast']: + self.error('Unknown RTP transport in SETUP Transport header: ' + profile) + if self.use_tcp != (profile == 'RTP/AVP/TCP;unicast'): + self.error('RTP transport in SETUP Transport header different from what we sent in M4: ' + profile) + client_port_strs = [p for p in params[2:] if p.startswith('client_port=')] + if len(client_port_strs) != 1: + self.error('Can\'t find client-port in SETUP Transport header: ' + value) + client_ports = client_port_strs[0].split('=', 1)[1].split('-') + try: + rtp_port = int(client_ports[0]) + if len(client_ports) > 1: + rtcp_port = int(client_ports[1]) + except: + self.error('Can\'t parse client-port in SETUP Transport header: ' + client_port_strs[0]) + if rtp_port != self.remote_rtp_port: + self.error('client-port in SETUP Transport header doesn\'t match what we sent in M4: ' + str(rtp_port)) + if len(client_ports) > 1: + if rtcp_port < 1 or rtcp_port > 65535 or rtcp_port == rtp_port: # Actually must be rtp_port + 1... + self.error('Optional RTCP port not valid in SETUP Transport header: ' + str(rtcp_port)) + self.remote_rtcp_port = rtcp_port + + def on_gst_message(self, bus, message): + t = message.type + if t == Gst.MessageType.EOS: + self.error('Gstreamer end-of-stream') + elif t == Gst.MessageType.STATE_CHANGED: + old, new, pending = message.parse_state_changed() + self.debug('Gstreamer state change for ' + message.src.name + ' from ' + str(old) + ' to ' + str(new) + ', pending=' + str(pending)) + elif t == Gst.MessageType.INFO: + err, debug = message.parse_info() + self.debug('Gstreamer info for ' + message.src.name + ': ' + str(err) + '\nDebug: ' + str(debug)) + elif t == Gst.MessageType.WARNING: + err, debug = message.parse_warning() + self.debug('Gstreamer warning for ' + message.src.name + ': ' + str(err) + '\nDebug: ' + str(debug)) + elif t == Gst.MessageType.ERROR: + err, debug = message.parse_error() + self.error('Gstreamer error for ' + message.src.name + ': ' + str(err) + '\nDebug: ' + str(debug)) + else: + self.debug('Gstreamer message of type ' + str(t) + ' for ' + message.src.name + ': ' + str(message)) + return True + + def gst_force_keyframe(self): + enc = self.rtp_pipeline.get_by_name('videnc') + sink = enc.get_static_pad('sink') + timestamp = Gst.CLOCK_TIME_NONE # can/should we use sink.query_position? + + s = Gst.Structure('GstForceKeyUnit') + s.set_value('timestamp', timestamp, 'uint64') + s.set_value('stream-time', timestamp, 'uint64') + s.set_value('all-headers', True) + # TODO: can we also send this event directly to the element instead of the pad? + sink.send_event(Gst.event_new_custom(Gst.EVENT_CUSTOM_DOWNSTREAM, s)) + + def rtsp_keepalive_timeout_cb(self): + try: + self.rtsp_keepalive_timeout = None + self.error('Keep-alive response timed out') + except Exception as e: + self.error_handler(e) + return False + + def rtsp_keepalive_cb(self): + try: + # Send M16 + # May need to start being careful with other requests that may be running... + self.request('GET_PARAMETER', 'rtsp://localhost/wfd1.0') + self.rtsp_keepalive_timeout = GLib.timeout_add_seconds(5, self.rtsp_keepalive_timeout_cb) + return True + except Exception as e: + self.error_handler(e) + return False + + def source_handle_message(self, method=None, target=None, status=None, reason=None, headers={}, content=None): + # TODO: check the 6s timeouts as per Section 6.5 + # Source side M1-M8 simplified state machine + if self._state == 'init': + # Send M1 + self.request('OPTIONS', '*', require=['org.wfa.wfd1.0']) + self.enter_state('M1') + elif self._state == 'M1': + # Validate M1 response + self.validate_msg(method, None, status, reason, headers, None, content) + methods = [ m.strip() for m in headers['public'].split(',') ] + required = [ 'org.wfa.wfd1.0', 'SET_PARAMETER', 'GET_PARAMETER' ] + missing = [ m for m in required if m not in methods ] + if missing: + self.error('Missing required method(s) "' + '", "'.join(missing) + '" in OPTIONS response') + self.enter_state('M2') + elif self._state == 'M2': + # Validate M2 + self.validate_msg(method, 'OPTIONS', status, reason, headers, target, content) + if target not in [ '*' ] + self.presentation_url: + self.error('Unknown OPTIONS target "' + target + '"') + required = [ m.strip() for m in headers['require'].split(',') ] + missing = [ m for m in required if m not in self.local_methods ] + if missing: + self.error('Required methods in OPTIONS request that we don\'t support: ' + ','.join(missing)) + + # Send M2 response + self.response(public=self.local_methods) + # Send M3 + self.request('GET_PARAMETER', 'rtsp://localhost/wfd1.0', params=['wfd_audio_codecs', 'wfd_video_formats', 'wfd_client_rtp_ports', 'wfd_display_edid', 'wfd_uibc_capability']) + self.enter_state('M3') + elif self._state == 'M3': + # Validate M3 response + self.validate_msg(method, None, status, reason, headers, None, content) + if 'wfd_video_formats' not in self.remote_params or 'wfd_client_rtp_ports' not in self.remote_params: + self.error('Required parameters missing from GET_PARAMETER response') + self.parse_video_formats(self.remote_params['wfd_video_formats']) + self.parse_client_rtp_ports(self.remote_params['wfd_client_rtp_ports']) + # Send M4 + params = { + 'wfd_video_formats': self.local_params['wfd_video_formats'], + 'wfd_client_rtp_ports': self.remote_params['wfd_client_rtp_ports'], + 'wfd_presentation_URL': self.presentation_url[0] + ' ' + self.presentation_url[1], + # TODO: include wfd_audio_codecs if audio present, make video optional, too + # TODO: support wfd2_video_formats and wfd2_audio_codecs + } + self.request('SET_PARAMETER', 'rtsp://localhost/wfd1.0', params=params) + self.enter_state('M4') + elif self._state == 'M4': + # Validate M4 response + self.validate_msg(method, None, status, reason, headers, None, content) + # Send M5 + self.request('SET_PARAMETER', 'rtsp://localhost/wfd1.0', params={'wfd_trigger_method': 'SETUP'}) + self.enter_state('M5') + elif self._state == 'M5': + # Validate M5 response + self.validate_msg(method, None, status, reason, headers, None, content) + self.enter_state('M6') + elif self._state == 'M6': + # Validate M6 + self.validate_msg(method, 'SETUP', status, reason, headers, target, content) + self.parse_transport(headers['transport']) + self.session_stream_url = target + self.session_id = str(random.randint(a=1, b=999999)) + self.local_rtp_port = random.randint(a=20000, b=30000) + if self.remote_rtcp_port is not None: + self.local_rtcp_port = self.local_rtp_port + 1 + profile ='RTP/AVP/TCP;unicast' if self.use_tcp else 'RTP/AVP/UDP;unicast' + client_port = str(self.remote_rtp_port) + (('-' + str(self.remote_rtcp_port)) if self.remote_rtcp_port is not None else '') + server_port = str(self.local_rtp_port) + (('-' + str(self.local_rtcp_port)) if self.local_rtcp_port is not None else '') + transport = profile + ';client_port' + client_port + ';server_port=' + server_port + # Section B.1 + pipeline = ('ximagesrc name=src use-damage=false do-timestamp=true ! capsfilter name=fps caps=video/x-raw,framerate=10/1' + + ' ! videoscale method=0 ! capsfilter name=res caps=video/x-raw,width=800,height=600' + + ' ! videoconvert ! video/x-raw,format=I420 ! x264enc tune=zerolatency speed-preset=ultrafast name=videnc' + + ' ! queue' + # TODO: add leaky=downstream + ' ! mpegtsmux name=mux' + + ' ! rtpmp2tpay pt=33 mtu=1472 ! .send_rtp_sink rtpsession name=session .send_rtp_src' + + ' ! udpsink host=' + self.remote_ip + ' port=' + str(self.remote_rtp_port) + ' bind-port=' + str(self.local_rtp_port)) # TODO: bind-address + + if self.local_rtcp_port is not None: + pipeline += ' session.send_rtcp_src ! udpsink name=rtcp_sink host=' + self.remote_ip + \ + ' port=' + str(self.remote_rtcp_port) + ' bind-port=' + str(self.local_rtcp_port) # TODO: bind-address + + self.rtp_pipeline = Gst.parse_launch(pipeline) + bus = self.rtp_pipeline.get_bus() + bus.enable_sync_message_emission() + bus.add_signal_watch() + bus.connect('sync-message', self.on_gst_message) + + # Send M6 response + self.response(session=self.session_id + ';timeout=' + str(self.session_timeout), transport=transport) + self.enter_state('M7') + elif self._state in ['M7', 'paused']: + # Validate M7 + self.validate_msg(method, 'PLAY', status, reason, headers, target, content) + # Send M7 response + self.response() + self.rtp_pipeline.set_state(Gst.State.PLAYING) + # Set up the keep-alive timer, interval must be less than timeout minus 5 seconds + self.rtsp_keepalive = GLib.timeout_add_seconds(self.session_timeout - 10, self.rtsp_keepalive_cb) + self.enter_state('streaming') + elif self._state == 'streaming': + if method is None: + if self.rtsp_keepalive_timeout: + # The M16 response is not to be validated (Section 6.4.16) + GLib.source_remove(self.rtsp_keepalive_timeout) + self.rtsp_keepalive_timeout = None + return + self.error('Received an RTSP response where a request was expected') + if method == 'PAUSE': + self.validate_msg(method, 'PAUSE', status, reason, headers, target, content) + self.rtp_pipeline.set_state(Gst.State.PAUSED) + self.enter_state('paused') + self.response() + return + if method == 'SET_PARAMETER': + # TODO: parse the stuff, on 'wfd-idr-request\r\n' (no semicolon) call the following: + self.gst_force_keyframe() + self.response() + return + if method == 'TEARDOWN': + # The spec suggests a more graceful teardown but we just close the connection + self.error('Teardown requested') + self.error('Unsupported method "' + method + '"') + +WIPHY_IF = 'net.connman.iwd.Adapter' +DEVICE_IF = 'net.connman.iwd.p2p.Device' +PEER_IF = 'net.connman.iwd.p2p.Peer' +WSC_IF = 'net.connman.iwd.SimpleConfiguration' +WFD_IF = 'net.connman.iwd.p2p.Display' +SVC_MGR_IF = 'net.connman.iwd.p2p.ServiceManager' + +class WFDSource(Gtk.Window): + @dataclasses.dataclass + class Device: + props: dict + dev_proxy: dbus.Interface + props_proxy: dbus.Interface + peers: dict + sorted_peers: list + widget: Gtk.Widget + expanded: bool + scan_request: bool + selected_peer: object + connecting_peer: object + disconnecting_peer: object + connected: list + dbus_call: dbus.lowlevel.PendingCall + + @dataclasses.dataclass + class Peer: + peer_proxy: dbus.Interface + wfd_proxy: dbus.Interface + wsc_proxy: dbus.Interface + widget: Gtk.Widget + rtsp: WFDRTSPServer + + def __init__(self): + Gtk.Window.__init__(self, type=Gtk.WindowType.TOPLEVEL, title='WFD Source') + self.set_decorated(True) + self.set_resizable(False) + self.connect('destroy', self.on_destroy, "WM destroy") + self.set_size_request(900, 300) + self.device_box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) + leftscroll = Gtk.ScrolledWindow(hscrollbar_policy=Gtk.PolicyType.NEVER) + leftscroll.add(self.device_box) + self.infolabel1 = Gtk.Label() + self.infolabel1.set_ellipsize(Pango.EllipsizeMode.START) + infopane = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL) + infopane.pack_start(self.infolabel1, False, False, padding=10) + rightscroll = Gtk.ScrolledWindow(hscrollbar_policy=Gtk.PolicyType.NEVER, vscrollbar_policy=Gtk.PolicyType.NEVER) + rightscroll.add(infopane) + paned = Gtk.Paned(orientation=Gtk.Orientation.HORIZONTAL) + paned.pack1(leftscroll, True, True) + paned.pack2(rightscroll, False, False) + paned.set_wide_handle(True) + paned.props.position = 400 + paned.props.position_set = True + self.add(paned) + self.show_all() + self.connect('notify::is-active', self.on_notify_is_active) + + self.rtsp_port = 7236 + self.devices = None + self.objects = {} + self.dbus = dbus.SystemBus() + self.dbus.watch_name_owner('net.connman.iwd', self.on_name_owner_change) + self.on_name_owner_change('dummy' if self.dbus.name_has_owner('net.connman.iwd') else '') + + def on_name_owner_change(self, new_name): + if not new_name: + if self.devices is None: + return True + + for dev_path in self.devices: + device = self.devices[dev_path] + if device.connecting_peer or device.disconnecting_peer: + device.dbus_call.cancel() + + for peer_path in device.peers: + peer = device.peers[peer_path] + if peer.rtsp: + peer.rtsp.close() + + self.devices = None + self.objects = {} + self.populate_devices() + self.dbus.remove_signal_receiver(self.on_properties_changed) + self.dbus.remove_signal_receiver(self.on_interfaces_added) + self.dbus.remove_signal_receiver(self.on_interfaces_removed) + return True + + if self.devices is not None: + return True + + manager = dbus.Interface(self.dbus.get_object('net.connman.iwd', '/'), 'org.freedesktop.DBus.ObjectManager') + self.devices = {} + self.objects = manager.GetManagedObjects() + + for path in self.objects: + if DEVICE_IF in self.objects[path]: + self.add_dev(path) + for path in self.objects: + if PEER_IF in self.objects[path]: + self.add_peer(path) + + self.populate_devices() + + self.dbus.add_signal_receiver(self.on_properties_changed, + bus_name="net.connman.iwd", + dbus_interface="org.freedesktop.DBus.Properties", + signal_name="PropertiesChanged", + path_keyword="path") + self.dbus.add_signal_receiver(self.on_interfaces_added, + bus_name="net.connman.iwd", + dbus_interface="org.freedesktop.DBus.ObjectManager", + signal_name="InterfacesAdded") + self.dbus.add_signal_receiver(self.on_interfaces_removed, + bus_name="net.connman.iwd", + dbus_interface="org.freedesktop.DBus.ObjectManager", + signal_name="InterfacesRemoved") + + svc_mgr = dbus.Interface(self.dbus.get_object('net.connman.iwd', '/net/connman/iwd'), SVC_MGR_IF) + svc_mgr.RegisterDisplayService({ + 'Source': True, + 'Port': dbus.UInt16(self.rtsp_port) + }) + + return True + + def add_dev(self, path): + obj_proxy = self.dbus.get_object('net.connman.iwd', path) + # Default to expanded for first device found + expanded = len(self.devices) == 0 + self.devices[path] = WFDSource.Device( + props=self.objects[path][DEVICE_IF], + dev_proxy=dbus.Interface(obj_proxy, DEVICE_IF), + props_proxy=dbus.Interface(obj_proxy, 'org.freedesktop.DBus.Properties'), + peers={}, + sorted_peers=[], + widget=None, + expanded=expanded, + scan_request=False, + selected_peer=None, + connecting_peer=None, + disconnecting_peer=None, + connected=[], + dbus_call=None) + + def add_peer(self, path): + dev_path = self.objects[path][PEER_IF]['Device'] + if dev_path not in self.devices or path in self.devices[dev_path].peers: + return False + + self.devices[dev_path].peers[path] = WFDSource.Peer( + peer_proxy=None, + wfd_proxy=None, + wsc_proxy=None, + widget=None, + rtsp=None) + return True + + def on_properties_changed(self, interface, changed, invalidated, path): + if path not in self.objects: + self.objects[path] = {} + if interface not in self.objects[path]: + self.objects[path][interface] = {} + + self.objects[path][interface].update(changed) + for prop in invalidated: + if prop in self.objects[path][interface]: + del self.objects[path][interface][prop] + + if path in self.devices: + self.update_dev_props(path) + if interface == DEVICE_IF and 'AvailableConnections' in changed: + self.update_selected_peer(path) + + if PEER_IF in self.objects[path]: + dev_path = self.objects[path][PEER_IF]['Device'] + if dev_path in self.devices: + device = self.devices[dev_path] + if path in device.peers: + peer = device.peers[path] + if interface == PEER_IF and 'Connected' in changed: + if changed['Connected'] and peer not in device.connected: + device.connected.append(peer) + elif not changed['Connected'] and peer in device.connected: + device.connected.remove(peer) + self.update_dev_props(dev_path) + self.update_peer_props(dev_path, path) + if peer != device.selected_peer: + self.update_selected_peer(dev_path) + if interface == PEER_IF and peer.rtsp: + if 'ConnectedInterface' in changed: + peer.rtsp.set_local_interface(changed['ConnectedInterface']) + if 'ConnectedIp' in changed: + peer.rtsp.set_remote_ip(changed['ConnectedIp']) + + self.update_peer_props(dev_path, path) + + return True + + def on_interfaces_added(self, path, interfaces): + if path not in self.objects: + self.objects[path] = {} + self.objects[path].update(interfaces) + + if DEVICE_IF in interfaces: + self.add_dev(path) + # This should happen rarely enough that we can repopulate the whole list + self.populate_devices() + + update_dev_props = False + if PEER_IF in interfaces: + update_dev_props = self.add_peer(path) + + if PEER_IF in self.objects[path]: + dev_path = self.objects[path][PEER_IF]['Device'] + if dev_path in self.devices: + if update_dev_props: + # Update device's peer count + self.update_dev_props(dev_path) + self.update_peer_props(dev_path, path) + + def on_interfaces_removed(self, path, interfaces): + if path not in self.objects: + return + + dev_path = None + if PEER_IF in interfaces or WFD_IF in interfaces or WSC_IF in interfaces: + if PEER_IF in self.objects[path]: + dev_path = self.objects[path][PEER_IF]['Device'] + + for i in interfaces: + if i in self.objects[path]: + del self.objects[path][i] + if len(self.objects[path]) == 0: + del self.objects[path] + + if DEVICE_IF in interfaces and path in self.devices: + device = self.devices[path] + if device.connecting_peer or device.disconnecting_peer: + device.dbus_call.cancel() + # TODO: check if connected + del self.devices[path] + # This should happen rarely enough that we can repopulate the whole list + self.populate_devices() + + if dev_path is not None and dev_path in self.devices: + device = self.devices[dev_path] + if path in device.peers: + # Make sure the widget is removed + self.update_peer_props(dev_path, path) + if PEER_IF in interfaces: + del device.peers[path] + # Update device's peer count + self.update_dev_props(dev_path) + + def populate_devices(self): + self.device_box.foreach(lambda x, y: self.device_box.remove(x), None) + + if self.devices is None: + label = Gtk.Label(label="Not connected to IWD") + self.device_box.pack_start(label, expand=True, fill=True, padding=0) + self.device_box.show_all() + return + + if len(self.devices) == 0: + label = Gtk.Label(label="No P2P-capable adapters :-(") + self.device_box.pack_start(label, expand=True, fill=True, padding=0) + self.device_box.show_all() + return + + for path in self.devices: + label = Gtk.Label() + label.set_halign(Gtk.Align.START) + label.set_line_wrap(False) + label.set_single_line_mode(False) + label.set_ellipsize(Pango.EllipsizeMode.END) + switch = Gtk.Switch() + switch.connect('state-set', self.on_dev_enabled, path) + switch.set_halign(Gtk.Align.END) + switch.set_valign(Gtk.Align.START) + box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL) + box.pack_start(label, expand=True, fill=True, padding=0) + box.pack_end(switch, expand=False, fill=False, padding=0) + peer_list = Gtk.ListBox() # can also use an IconView.. or make it switchable + peer_list.set_size_request(150, 120) + peer_list.set_selection_mode(Gtk.SelectionMode.SINGLE) + peer_list.set_placeholder(Gtk.Label(label='No Wi-Fi Displays discovered yet...')) + peer_list.connect('row-selected', self.on_peer_selected, path) + frame = Gtk.Frame() + frame.props.margin = 10 + frame.add(peer_list) + expander = Gtk.Expander() + expander.set_label_fill(True) + expander.set_expanded(self.devices[path].expanded) + expander.set_label_widget(box) + expander.add(frame) + expander.connect('notify::expanded', self.on_dev_expanded, path) + expander.show_all() + self.device_box.add(expander) + self.devices[path].widget = expander + self.update_dev_props(path) + GLib.idle_add(self.expander_workaround, expander) + + for peer_path in self.devices[path].peers: + self.update_peer_props(path, peer_path) + + # Basically implement Gtk.Expander's set_label_fill which for some reason + # doesn't do anything. Use size-allocate because configure-event doesn't work either... + self.margin_left = None + def on_exp_resize(widget, event): + if self.margin_left is None: + self.margin_left = box.get_allocation().x + posx, posy = expander.translate_coordinates(self, 0, 0) + # Add posx to force the label widget (box) to be aligned to the left side of the + # window even if GTK already decided to push the expander off the left side with a + # negative allocation.x. This way it won't push it any further left as the available + # space shrinks. + box.set_size_request(max(posx + expander.get_allocated_width() - self.margin_left - 1, 0), -1) + return False + expander.connect('size-allocate', on_exp_resize) + + def expander_workaround(self, widget): + box = widget.get_label_widget() + widget.set_label_widget(None) + widget.set_label_widget(box) + return False + + def update_dev_props(self, path): + device = self.devices[path] + if not device.props['Enabled']: + state = 'disabled' + elif device.disconnecting_peer is not None: + state = 'disconnecting...' + elif device.connecting_peer is not None: + state = 'connecting...' + elif len(device.connected) > 0: + if all([not peer.rtsp or peer.rtsp.ready for peer in device.connected]): + state = 'connected' + else: + state = 'negotiating...' + elif device.scan_request: + state = 'discovering... (' + str(len(device.peers)) + ')' + else: + state = 'idle' + + label, switch = device.widget.get_label_widget().get_children() + dev_str = self.get_dev_string(path) + name = str(device.props['Name']) + label.set_markup(dev_str + '\n' + ('Local name: ' + name + '\n' if dev_str != name else '') + 'State: ' + state + '') + switch.set_active(device.props['Enabled']) + + def update_peer_props(self, dev_path, path): + device = self.devices[dev_path] + peer = device.peers[path] + props = self.objects[path] if path in self.objects else {} + peer_list = device.widget.get_child().get_child() + if peer.widget is None: + if PEER_IF not in props or WFD_IF not in props or WSC_IF not in props: + return + if not props[WFD_IF]['Sink']: + return + + name = str(props[PEER_IF]['Name']) + device.sorted_peers.append(name) + device.sorted_peers.sort() + index = device.sorted_peers.index(name) + + obj_proxy = self.dbus.get_object('net.connman.iwd', path) + peer.peer_proxy=dbus.Interface(obj_proxy, PEER_IF) + peer.wfd_proxy=dbus.Interface(obj_proxy, WFD_IF) + peer.wsc_proxy=dbus.Interface(obj_proxy, WSC_IF) + label = Gtk.Label() + label.set_halign(Gtk.Align.START) + label.set_single_line_mode(True) + label.set_ellipsize(Pango.EllipsizeMode.END) + event_box = Gtk.EventBox() + event_box.add(label) + event_box.connect('button-press-event', self.on_peer_click, (dev_path, path)) + button = Gtk.Button() + button.set_use_stock(True) + button.connect('clicked', self.on_peer_button, (dev_path, path)) + box = Gtk.Box(orientation=Gtk.Orientation.HORIZONTAL) + box.props.margin = 5; + box.pack_start(event_box, expand=True, fill=True, padding=0) + box.pack_end(button, expand=False, fill=False, padding=0) + peer.widget = Gtk.ListBoxRow() + peer.widget.add(box) + peer_list.insert(peer.widget, index) + peer.widget.show_all() + elif (PEER_IF not in props or WFD_IF not in props or WSC_IF not in props or not props[WFD_IF]['Sink']) and peer.widget: + del device.sorted_peers[peer.widget.get_index()] + peer_list.remove(peer.widget) + if peer == device.selected_peer: + device.selected_peer = None + self.update_info(dev_path, None) + if peer == device.connecting_peer: + device.dbus_call.cancel() + device.connecting_peer = None + self.update_selected_peer(dev_path) + if peer == device.disconnecting_peer: + device.dbus_call.cancel() + device.disconnecting_peer = None + self.update_selected_peer(dev_path) + if peer in device.connected: + device.connected.remove(peer) + self.update_selected_peer(dev_path) + peer.peer_proxy = None + peer.wfd_proxy = None + peer.wsc_proxy = None + peer.widget = None + if peer.rtsp: + peer.rtsp.close() + peer.rtsp = None + return + + subcat = 'unknown type' + if 'DeviceSubcategory' in props[PEER_IF]: + subcat = props[PEER_IF]['DeviceSubcategory'] + + weight = 'heavy' if peer in device.connected else 'normal' + box = peer.widget.get_child() + event_box, button = box.get_children() + label, = event_box.get_children() + label.set_markup('' + props[PEER_IF]['Name'] + ' ' + subcat + '') + + if device.disconnecting_peer or (device.connecting_peer and peer != device.connecting_peer): + # This peer's row should not have any buttons + button.hide() + elif peer == device.connecting_peer: + button.set_label('Cancel') + button.show() + elif peer in device.connected: + if not peer.rtsp or peer.rtsp.ready: + button.set_label('Disconnect') + else: + button.set_label('Cancel') + button.show() + elif peer == device.selected_peer and device.props['AvailableConnections'] > 0: + button.set_label('Connect') + button.show() + else: + button.hide() + + if peer == device.selected_peer: + self.update_info(dev_path, path) + + def update_selected_peer(self, dev_path): + device = self.devices[dev_path] + if device.selected_peer: + sel_path = self.get_peer_path(device, device.selected_peer) + self.update_peer_props(dev_path, sel_path) + + def update_info(self, dev_path, path): + device = self.devices[dev_path] + if path is None: + self.infolabel1.set_text('') + return + + peer = device.peers[path] + + if peer == device.connecting_peer: + state = 'IWD connecting' + elif peer == device.disconnecting_peer: + state = 'disconnecting' + elif peer in device.connected: + if peer.rtsp is not None: + if peer.rtsp.ready: + state = peer.rtsp.state + else: + state = 'RTSP negotiation: ' + peer.rtsp.state + else: + state = 'connected' + else: + state = 'not connected' + + subcat = 'unknown' + if 'DeviceSubcategory' in self.objects[path][PEER_IF]: + subcat = self.objects[path][PEER_IF]['DeviceSubcategory'] + + text = ('Connection state: ' + state + '\n' + + 'Device category: ' + self.objects[path][PEER_IF]['DeviceCategory'] + '\n' + 'Device subcategory: ' + subcat + '\n') + + if WFD_IF in self.objects[path]: + if self.objects[path][WFD_IF]['Source']: + if self.objects[path][WFD_IF]['Sink']: + t = 'dual-role' + else: + t = 'source' + else: + t = 'sink' + text += 'WFD device type: ' + t + '\n' + + if self.objects[path][WFD_IF]['Sink']: + text += 'Audio: ' + ('yes' if self.objects[path][WFD_IF]['HasAudio'] else 'no') + '\n' + + text += 'UIBC: ' + ('yes' if self.objects[path][WFD_IF]['HasUIBC'] else 'no') + '\n' + + text += 'Content protection: ' + ('yes' if self.objects[path][WFD_IF]['HasContentProtection'] else 'no') + '\n' + + self.infolabel1.set_text(text) + # TODO: more info in labels 2 and so on + + # Direct method calls on dbus.Interface's don't return dbus.lowlevel.PendingCall objects so + # we have to use bus.call_async to make cancellable async calls + def async_call(self, proxy, method, signature='', *args, **kwargs): + return self.dbus.call_async(proxy.bus_name, proxy.object_path, proxy.dbus_interface, method, signature, args, **kwargs) + + def connect_peer(self, dev_path, path): + device = self.devices[dev_path] + peer = device.peers[path] + + def on_reply(): + device.connected.append(peer) + device.connecting_peer = None + # Local interface and remote IP get set in the PropertiesChanged handler + self.update_dev_props(dev_path) + self.update_peer_props(dev_path, path) + if peer != device.selected_peer: + self.update_selected_peer(dev_path) + + def on_error(excp): + device.connecting_peer = None + if peer.rtsp: + peer.rtsp.close() + peer.rtsp = None + self.update_dev_props(dev_path) + self.update_peer_props(dev_path, path) + if peer != device.selected_peer: + self.update_selected_peer(dev_path) + dialog = Gtk.MessageDialog(parent=self, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.OK, text='Connection failed') + dialog.format_secondary_text('Connection to ' + self.objects[path][PEER_IF]['Name'] + ' failed: ' + repr(excp)) + dialog.show() + + def on_ok(response, *args): + dialog.destroy() + + dialog.connect('response', on_ok) + + def on_rtsp_state(): + self.update_dev_props(dev_path) + self.update_peer_props(dev_path, path) + if peer != device.selected_peer: + self.update_selected_peer(dev_path) + + def on_rtsp_error(excp): + self.disconnect_peer(dev_path, path) + dialog = Gtk.MessageDialog(parent=self, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.OK, text='Negotiation failed') + dialog.format_secondary_text('RTSP error when talking to ' + self.objects[path][PEER_IF]['Name'] + ': ' + repr(excp)) + dialog.show() + + def on_ok(response, *args): + dialog.destroy() + + dialog.connect('response', on_ok) + + # Cannot use peer.wsc_proxy.PushButton() + device.dbus_call = self.async_call(peer.wsc_proxy, 'PushButton', reply_handler=on_reply, error_handler=on_error, timeout=120) + device.connecting_peer = peer + # Create the RTSP server now so it's ready as soon as the P2P connection succeeds even if + # we haven't received the DBus reply yet + peer.rtsp = WFDRTSPServer(self.rtsp_port, on_rtsp_state, on_rtsp_error) + self.update_dev_props(dev_path) + self.update_peer_props(dev_path, path) + if peer != device.selected_peer: + self.update_selected_peer(dev_path) + + def disconnect_peer(self, dev_path, path): + device = self.devices[dev_path] + peer = device.peers[path] + + def on_reply(): + device.disconnecting_peer = None + self.update_dev_props(dev_path) + self.update_peer_props(dev_path, path) + if peer != device.selected_peer: + self.update_selected_peer(dev_path) + + def on_error(excp): + device.disconnecting_peer = None + self.update_dev_props(dev_path) + self.update_peer_props(dev_path, path) + if peer != device.selected_peer: + self.update_selected_peer(dev_path) + + if isinstance(excp, dbus.exceptions.DBusException) and excp.get_dbus_name() == 'net.connman.iwd.NotConnected': + return + + dialog = Gtk.MessageDialog(parent=self, message_type=Gtk.MessageType.ERROR, buttons=Gtk.ButtonsType.OK, text='Disconnecting failed') + dialog.format_secondary_text('Disconnecting from ' + self.objects[path][PEER_IF]['Name'] + ' failed: ' + repr(excp)) + dialog.show() + + def on_ok(response, *args): + dialog.destroy() + + dialog.connect('response', on_ok) + + if peer == device.connecting_peer: + device.dbus_call.cancel() + device.connecting_peer = None + + if peer in device.connected: + device.connected.remove(peer) + + if peer.rtsp: + peer.rtsp.close() + peer.rtsp = None + + device.dbus_call = self.async_call(peer.peer_proxy, 'Disconnect', reply_handler=on_reply, error_handler=on_error) + device.disconnecting_peer = peer + self.update_dev_props(dev_path) + self.update_peer_props(dev_path, path) + if peer != device.selected_peer: + self.update_selected_peer(dev_path) + + def on_peer_click(self, widget, event, data): + if event.button != 1 or event.type != Gdk.EventType._2BUTTON_PRESS: + return False + dev_path, path = data + device = self.devices[dev_path] + if device.disconnecting_peer: + return True + if device.connecting_peer or not device.props['AvailableConnections']: + # Should we auto-disconnect from the connected peer? Show an "Are you sure?" dialog? + return True + self.connect_peer(dev_path, path) + return True + + def on_peer_button(self, widget, data): + dev_path, path = data + action = widget.get_label() + device = self.devices[dev_path] + if device.disconnecting_peer: + return True + if action == 'Connect': + self.connect_peer(dev_path, path) + elif action in ['Disconnect', 'Cancel']: + self.disconnect_peer(dev_path, path) + return True + + def get_peer_path(self, device, peer): + for path in device.peers: + if device.peers[path] == device.selected_peer: + return path + return None + + def on_peer_selected(self, widget, row, dev_path): + device = self.devices[dev_path] + + if device.selected_peer is not None: + if device.selected_peer.widget == row: + return True + + path = self.get_peer_path(device, device.selected_peer) + device.selected_peer = None + self.update_peer_props(dev_path, path) + self.update_info(dev_path, None) + + if row is None: + return True + + for path in device.peers: + if device.peers[path].widget == row: + device.selected_peer = device.peers[path] + self.update_peer_props(dev_path, path) + return True + + def update_dev_scan_request(self, path): + device = self.devices[path] + should_request = device.expanded and self.is_active() + if device.scan_request == should_request: + return + + device.scan_request = should_request + if device.scan_request: + device.dev_proxy.RequestDiscovery() + else: + device.dev_proxy.ReleaseDiscovery() + self.update_dev_props(path) + + def on_notify_is_active(self, window, value): + if self.devices is None: + return True + + for path in self.devices: + self.update_dev_scan_request(path) + return True + + def on_dev_enabled(self, switch, state, path): + device = self.devices[path] + if device.props['Enabled'] == state: + return + device.props['Enabled'] = state + device.props_proxy.Set(DEVICE_IF, 'Enabled', state) + return True + + def on_dev_expanded(self, expander, value, path): + device = self.devices[path] + device.expanded = expander.get_expanded() + self.update_dev_scan_request(path) + return True + + def get_dev_string(self, path): + wiphy = self.objects[path][WIPHY_IF] + if 'Model' in wiphy: + return wiphy['Model'] + if 'Vendor' in wiphy: + return wiphy['Vendor'] + return wiphy['Name'] + + def on_destroy(self, widget, data): + global mainloop + if self.devices is not None: + svc_mgr = dbus.Interface(self.dbus.get_object('net.connman.iwd', '/net/connman/iwd'), SVC_MGR_IF) + svc_mgr.UnregisterDisplayService() + self.on_name_owner_change('') + mainloop.quit() + return False + +dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) +Gst.init(None) +WFDSource() +mainloop = GLib.MainLoop() +mainloop.run()