Commit c596c964 authored by hark's avatar hark
Browse files

screenshot + webrtc

parent 322e28e7
......@@ -36,3 +36,10 @@ repo with the other repos as submodules and some more stuff
* c-player: player that outputs to snowmix, tcp2rtsp, s2s
TODO:
- gstreamer webrtc thingie that outputs to obs / or over named pipes (computer -> computer , browser -> computer, computer -> browser)
- webrtc reflector
- figure out this new jack like thingie for video (pipewire)
- turbovnc + virtualgl
#!/usr/bin/env bash
cd ~/sand
ssh user@stdio /home/user/src/streaming-media-stuff/scripts/screenshot.sh
scp user@stdio:screenshot/latest.png ~/sand/latest.png
eog ~/sand/latest.png
## Terminology
### Client
A GStreamer-based application
### Browser
A JS application that runs in the browser and uses built-in browser webrtc APIs
### Peer
Any webrtc-using application that can participate in a call
### Signalling server
Basic websockets server implemented in Python that manages the peers list and shovels data between peers
## Overview
This is a basic protocol for doing 1-1 audio+video calls between a gstreamer app and a JS app in a browser.
## Peer registration
Peers must register with the signalling server before a call can be initiated. The server connection should stay open as long as the peer is available or in a call.
This protocol builds upon https://github.com/shanet/WebRTC-Example/
* Connect to the websocket server
* Send `HELLO <uid>` where `<uid>` is a string which will uniquely identify this peer
* Receive `HELLO`
* Any other message starting with `ERROR` is an error.
### 1-1 calls with a 'session'
* To connect to a single peer, send `SESSION <uid>` where `<uid>` identifies the peer to connect to, and receive `SESSION_OK`
* All further messages will be forwarded to the peer
* The call negotiation with the peer can be started by sending JSON encoded SDP and ICE
* Closure of the server connection means the call has ended; either because the other peer ended it or went away
* To end the call, disconnect from the server. You may reconnect again whenever you wish.
### Multi-party calls with a 'room'
* To create a multi-party call, you must first register (or join) a room. Send `ROOM <room_id>` where `<room_id>` is a unique room name
* Receive `ROOM_OK ` from the server if this is a new room, or `ROOM_OK <peer1_id> <peer2_id> ...` where `<peerN_id>` are unique identifiers for the peers already in the room
* To send messages to a specific peer within the room for call negotiation (or any other purpose, use `ROOM_PEER_MSG <peer_id> <msg>`
* When a new peer joins the room, you will receive a `ROOM_PEER_JOINED <peer_id>` message
- For the purposes of convention and to avoid overwhelming newly-joined peers, offers must only be sent by the newly-joined peer
* When a peer leaves the room, you will receive a `ROOM_PEER_LEFT <peer_id>` message
- You should stop sending/receiving media from/to this peer
* To get a list of all peers currently in the room, send `ROOM_PEER_LIST` and receive `ROOM_PEER_LIST <peer1_id> ...`
- This list will never contain your own `<uid>`
- In theory you should never need to use this since you are guaranteed to receive JOINED and LEFT messages for all peers in a room
* You may stay connected to a room for as long as you like
## Negotiation
Once a call has been setup with the signalling server, the peers must negotiate SDP and ICE candidates with each other.
The calling side must create an SDP offer and send it to the peer as a JSON object:
```json
{
"sdp": {
"sdp": "o=- [....]",
"type": "offer"
}
}
```
The callee must then reply with an answer:
```json
{
"sdp": {
"sdp": "o=- [....]",
"type": "answer"
}
}
```
ICE candidates must be exchanged similarly by exchanging JSON objects:
```json
{
"ice": {
"candidate": ...,
"sdpMLineIndex": ...,
...
}
}
```
Note that the structure of these is the same as that specified by the WebRTC spec.
#! /bin/bash
openssl req -x509 -newkey rsa:4096 -keyout key.pem -out cert.pem -days 365 -nodes
http://blog.nirbheek.in/2018/02/gstreamer-webrtc.html
https://cgit.freedesktop.org/gstreamer/gst-plugins-bad/tree/tests/examples/webrtc
#!/usr/bin/env python3
#
# Test client for simple room-based multi-peer p2p calling
#
# Copyright (C) 2017 Centricular Ltd.
#
# Author: Nirbheek Chauhan <nirbheek@centricular.com>
#
import sys
import ssl
import json
import uuid
import asyncio
import websockets
import argparse
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('--url', default='wss://localhost:8443', help='URL to connect to')
parser.add_argument('--room', default=None, help='the room to join')
options = parser.parse_args(sys.argv[1:])
SERVER_ADDR = options.url
PEER_ID = 'ws-test-client-' + str(uuid.uuid4())[:6]
ROOM_ID = options.room
if ROOM_ID is None:
print('--room argument is required')
sys.exit(1)
sslctx = False
if SERVER_ADDR.startswith(('wss://', 'https://')):
sslctx = ssl.create_default_context()
# FIXME
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
def get_answer_sdp(offer, peer_id):
# Here we'd parse the incoming JSON message for ICE and SDP candidates
print("Got: " + offer)
sdp = json.dumps({'sdp': 'reply sdp'})
answer = 'ROOM_PEER_MSG {} {}'.format(peer_id, sdp)
print("Sent: " + answer)
return answer
def get_offer_sdp(peer_id):
sdp = json.dumps({'sdp': 'initial sdp'})
offer = 'ROOM_PEER_MSG {} {}'.format(peer_id, sdp)
print("Sent: " + offer)
return offer
async def hello():
async with websockets.connect(SERVER_ADDR, ssl=sslctx) as ws:
await ws.send('HELLO ' + PEER_ID)
assert(await ws.recv() == 'HELLO')
await ws.send('ROOM {}'.format(ROOM_ID))
sent_offers = set()
# Receive messages
while True:
msg = await ws.recv()
if msg.startswith('ERROR'):
# On error, we bring down the webrtc pipeline, etc
print('{!r}, exiting'.format(msg))
return
if msg.startswith('ROOM_OK'):
print('Got ROOM_OK for room {!r}'.format(ROOM_ID))
_, *room_peers = msg.split()
for peer_id in room_peers:
print('Sending offer to {!r}'.format(peer_id))
# Create a peer connection for each peer and start
# exchanging SDP and ICE candidates
await ws.send(get_offer_sdp(peer_id))
sent_offers.add(peer_id)
continue
elif msg.startswith('ROOM_PEER'):
if msg.startswith('ROOM_PEER_JOINED'):
_, peer_id = msg.split(maxsplit=1)
print('Peer {!r} joined the room'.format(peer_id))
# Peer will send us an offer
continue
if msg.startswith('ROOM_PEER_LEFT'):
_, peer_id = msg.split(maxsplit=1)
print('Peer {!r} left the room'.format(peer_id))
continue
elif msg.startswith('ROOM_PEER_MSG'):
_, peer_id, msg = msg.split(maxsplit=2)
if peer_id in sent_offers:
print('Got answer from {!r}: {}'.format(peer_id, msg))
continue
print('Got offer from {!r}, replying'.format(peer_id))
await ws.send(get_answer_sdp(msg, peer_id))
continue
print('Unknown msg: {!r}, exiting'.format(msg))
return
print('Our uid is {!r}'.format(PEER_ID))
try:
asyncio.get_event_loop().run_until_complete(hello())
except websockets.exceptions.InvalidHandshake:
print('Invalid handshake: are you sure this is a websockets server?\n')
raise
except ssl.SSLError:
print('SSL Error: are you sure the server is using TLS?\n')
raise
#!/usr/bin/env python3
#
# Example 1-1 call signalling server
#
# Copyright (C) 2017 Centricular Ltd.
#
# Author: Nirbheek Chauhan <nirbheek@centricular.com>
#
import os
import sys
import ssl
import logging
import asyncio
import websockets
import argparse
import http
from concurrent.futures._base import TimeoutError
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
# See: host, port in https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.loop.create_server
parser.add_argument('--addr', default='', help='Address to listen on (default: all interfaces, both ipv4 and ipv6)')
parser.add_argument('--port', default=8443, type=int, help='Port to listen on')
parser.add_argument('--keepalive-timeout', dest='keepalive_timeout', default=30, type=int, help='Timeout for keepalive (in seconds)')
parser.add_argument('--cert-path', default=os.path.dirname(__file__))
parser.add_argument('--disable-ssl', default=False, help='Disable ssl', action='store_true')
parser.add_argument('--health', default='/health', help='Health check route')
options = parser.parse_args(sys.argv[1:])
ADDR_PORT = (options.addr, options.port)
KEEPALIVE_TIMEOUT = options.keepalive_timeout
############### Global data ###############
# Format: {uid: (Peer WebSocketServerProtocol,
# remote_address,
# <'session'|room_id|None>)}
peers = dict()
# Format: {caller_uid: callee_uid,
# callee_uid: caller_uid}
# Bidirectional mapping between the two peers
sessions = dict()
# Format: {room_id: {peer1_id, peer2_id, peer3_id, ...}}
# Room dict with a set of peers in each room
rooms = dict()
############### Helper functions ###############
async def health_check(path, request_headers):
if path == options.health:
return http.HTTPStatus.OK, [], b"OK\n"
async def recv_msg_ping(ws, raddr):
'''
Wait for a message forever, and send a regular ping to prevent bad routers
from closing the connection.
'''
msg = None
while msg is None:
try:
msg = await asyncio.wait_for(ws.recv(), KEEPALIVE_TIMEOUT)
except TimeoutError:
print('Sending keepalive ping to {!r} in recv'.format(raddr))
await ws.ping()
return msg
async def disconnect(ws, peer_id):
'''
Remove @peer_id from the list of sessions and close our connection to it.
This informs the peer that the session and all calls have ended, and it
must reconnect.
'''
global sessions
if peer_id in sessions:
del sessions[peer_id]
# Close connection
if ws and ws.open:
# Don't care about errors
asyncio.ensure_future(ws.close(reason='hangup'))
async def cleanup_session(uid):
if uid in sessions:
other_id = sessions[uid]
del sessions[uid]
print("Cleaned up {} session".format(uid))
if other_id in sessions:
del sessions[other_id]
print("Also cleaned up {} session".format(other_id))
# If there was a session with this peer, also
# close the connection to reset its state.
if other_id in peers:
print("Closing connection to {}".format(other_id))
wso, oaddr, _ = peers[other_id]
del peers[other_id]
await wso.close()
async def cleanup_room(uid, room_id):
room_peers = rooms[room_id]
if uid not in room_peers:
return
room_peers.remove(uid)
for pid in room_peers:
wsp, paddr, _ = peers[pid]
msg = 'ROOM_PEER_LEFT {}'.format(uid)
print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg))
await wsp.send(msg)
async def remove_peer(uid):
await cleanup_session(uid)
if uid in peers:
ws, raddr, status = peers[uid]
if status and status != 'session':
await cleanup_room(uid, status)
del peers[uid]
await ws.close()
print("Disconnected from peer {!r} at {!r}".format(uid, raddr))
############### Handler functions ###############
async def connection_handler(ws, uid):
global peers, sessions, rooms
raddr = ws.remote_address
peer_status = None
peers[uid] = [ws, raddr, peer_status]
print("Registered peer {!r} at {!r}".format(uid, raddr))
while True:
# Receive command, wait forever if necessary
msg = await recv_msg_ping(ws, raddr)
# Update current status
peer_status = peers[uid][2]
# We are in a session or a room, messages must be relayed
if peer_status is not None:
# We're in a session, route message to connected peer
if peer_status == 'session':
other_id = sessions[uid]
wso, oaddr, status = peers[other_id]
assert(status == 'session')
print("{} -> {}: {}".format(uid, other_id, msg))
await wso.send(msg)
# We're in a room, accept room-specific commands
elif peer_status:
# ROOM_PEER_MSG peer_id MSG
if msg.startswith('ROOM_PEER_MSG'):
_, other_id, msg = msg.split(maxsplit=2)
if other_id not in peers:
await ws.send('ERROR peer {!r} not found'
''.format(other_id))
continue
wso, oaddr, status = peers[other_id]
if status != room_id:
await ws.send('ERROR peer {!r} is not in the room'
''.format(other_id))
continue
msg = 'ROOM_PEER_MSG {} {}'.format(uid, msg)
print('room {}: {} -> {}: {}'.format(room_id, uid, other_id, msg))
await wso.send(msg)
elif msg == 'ROOM_PEER_LIST':
room_id = peers[peer_id][2]
room_peers = ' '.join([pid for pid in rooms[room_id] if pid != peer_id])
msg = 'ROOM_PEER_LIST {}'.format(room_peers)
print('room {}: -> {}: {}'.format(room_id, uid, msg))
await ws.send(msg)
else:
await ws.send('ERROR invalid msg, already in room')
continue
else:
raise AssertionError('Unknown peer status {!r}'.format(peer_status))
# Requested a session with a specific peer
elif msg.startswith('SESSION'):
print("{!r} command {!r}".format(uid, msg))
_, callee_id = msg.split(maxsplit=1)
if callee_id not in peers:
await ws.send('ERROR peer {!r} not found'.format(callee_id))
continue
if peer_status is not None:
await ws.send('ERROR peer {!r} busy'.format(callee_id))
continue
await ws.send('SESSION_OK')
wsc = peers[callee_id][0]
print('Session from {!r} ({!r}) to {!r} ({!r})'
''.format(uid, raddr, callee_id, wsc.remote_address))
# Register session
peers[uid][2] = peer_status = 'session'
sessions[uid] = callee_id
peers[callee_id][2] = 'session'
sessions[callee_id] = uid
# Requested joining or creation of a room
elif msg.startswith('ROOM'):
print('{!r} command {!r}'.format(uid, msg))
_, room_id = msg.split(maxsplit=1)
# Room name cannot be 'session', empty, or contain whitespace
if room_id == 'session' or room_id.split() != [room_id]:
await ws.send('ERROR invalid room id {!r}'.format(room_id))
continue
if room_id in rooms:
if uid in rooms[room_id]:
raise AssertionError('How did we accept a ROOM command '
'despite already being in a room?')
else:
# Create room if required
rooms[room_id] = set()
room_peers = ' '.join([pid for pid in rooms[room_id]])
await ws.send('ROOM_OK {}'.format(room_peers))
# Enter room
peers[uid][2] = peer_status = room_id
rooms[room_id].add(uid)
for pid in rooms[room_id]:
if pid == uid:
continue
wsp, paddr, _ = peers[pid]
msg = 'ROOM_PEER_JOINED {}'.format(uid)
print('room {}: {} -> {}: {}'.format(room_id, uid, pid, msg))
await wsp.send(msg)
else:
print('Ignoring unknown message {!r} from {!r}'.format(msg, uid))
async def hello_peer(ws):
'''
Exchange hello, register peer
'''
raddr = ws.remote_address
hello = await ws.recv()
hello, uid = hello.split(maxsplit=1)
if hello != 'HELLO':
await ws.close(code=1002, reason='invalid protocol')
raise Exception("Invalid hello from {!r}".format(raddr))
if not uid or uid in peers or uid.split() != [uid]: # no whitespace
await ws.close(code=1002, reason='invalid peer uid')
raise Exception("Invalid uid {!r} from {!r}".format(uid, raddr))
# Send back a HELLO
await ws.send('HELLO')
return uid
async def handler(ws, path):
'''
All incoming messages are handled here. @path is unused.
'''
raddr = ws.remote_address
print("Connected to {!r}".format(raddr))
peer_id = await hello_peer(ws)
try:
await connection_handler(ws, peer_id)
except websockets.ConnectionClosed:
print("Connection to peer {!r} closed, exiting handler".format(raddr))
finally:
await remove_peer(peer_id)
sslctx = None
if not options.disable_ssl:
# Create an SSL context to be used by the websocket server
certpath = options.cert_path
print('Using TLS with keys in {!r}'.format(certpath))
if 'letsencrypt' in certpath:
chain_pem = os.path.join(certpath, 'fullchain.pem')
key_pem = os.path.join(certpath, 'privkey.pem')
else:
chain_pem = os.path.join(certpath, 'cert.pem')
key_pem = os.path.join(certpath, 'key.pem')
sslctx = ssl.create_default_context()
try:
sslctx.load_cert_chain(chain_pem, keyfile=key_pem)
except FileNotFoundError:
print("Certificates not found, did you run generate_cert.sh?")
sys.exit(1)
# FIXME
sslctx.check_hostname = False
sslctx.verify_mode = ssl.CERT_NONE
print("Listening on https://{}:{}".format(*ADDR_PORT))
# Websocket server
wsd = websockets.serve(handler, *ADDR_PORT, ssl=sslctx, process_request=health_check,
# Maximum number of messages that websockets will pop
# off the asyncio and OS buffers per connection. See:
# https://websockets.readthedocs.io/en/stable/api.html#websockets.protocol.WebSocketCommonProtocol
max_queue=16)
logger = logging.getLogger('websockets.server')
logger.setLevel(logging.ERROR)
logger.addHandler(logging.StreamHandler())
asyncio.get_event_loop().run_until_complete(wsd)
asyncio.get_event_loop().run_forever()
import random
import ssl
import websockets
import asyncio
import os
import sys
import json
import argparse
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp
PIPELINE_DESC = '''
webrtcbin name=sendrecv bundle-policy=max-bundle
videotestsrc is-live=true pattern=ball ! videoconvert ! queue ! vp8enc deadline=1 ! rtpvp8pay !
queue ! application/x-rtp,media=video,encoding-name=VP8,payload=97 ! sendrecv.
audiotestsrc is-live=true wave=red-noise ! audioconvert ! audioresample ! queue ! opusenc ! rtpopuspay !
queue ! application/x-rtp,media=audio,encoding-name=OPUS,payload=96 ! sendrecv.
'''
class WebRTCClient:
def __init__(self, id_, peer_id, server):
self.id_ = id_
self.conn = None
self.pipe = None
self.webrtc = None
self.peer_id = peer_id
self.server = server or 'wss://webrtc.nirbheek.in:8443'
async def connect(self):
sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
self.conn = await websockets.connect(self.server, ssl=sslctx)
await self.conn.send('HELLO %d' % our_id)
async def setup_call(self):
await self.conn.send('SESSION {}'.format(self.peer_id))
def send_sdp_offer(self, offer):
text = offer.sdp.as_text()
print ('Sending offer:\n%s' % text)
msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
loop = asyncio.new_event_loop()
loop.run_until_complete(self.conn.send(msg))
def on_offer_created(self, promise, _, __):
promise.wait()
reply = promise.get_reply()
offer = reply['offer']
promise = Gst.Promise.new()
self.webrtc.emit('set-local-description', offer, promise)
promise.interrupt()
self.send_sdp_offer(offer)
def on_negotiation_needed(self, element):
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
element.emit('create-offer', None, promise)
def send_ice_candidate_message(self, _, mlineindex, candidate):
icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
loop = asyncio.new_event_loop()
loop.run_until_complete(self.conn.send(icemsg))
def on_incoming_decodebin_stream(self, _, pad):
if not pad.has_current_caps():
print (pad, 'has no caps, ignoring')
return
caps = pad.get_current_caps()
assert (len(caps))
s = caps[0]
name = s.get_name()
if name.startswith('video'):
q = Gst.ElementFactory.make('queue')
conv = Gst.ElementFactory.make('videoconvert')
sink = Gst.ElementFactory.make('autovideosink')
self.pipe.add(q, conv, sink)
self.pipe.sync_children_states()