diff --git a/Cargo.lock b/Cargo.lock index 78c8b77ec1631e51345b3963779a8138be2f7ee7..e81bc81ed77d86af1ae06314474c2aac1e4af8ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2517,8 +2517,10 @@ dependencies = [ "futures", "glib", "gstreamer", + "gstreamer-sdp", "gstreamer-webrtc", "log", + "openssl", "serde", "serde_json", "structopt", diff --git a/publisher/Cargo.toml b/publisher/Cargo.toml index 26299856bd12b52babfe1921159c866f6e08775f..28ebc96ede59545e7e9d3e5341e9d2a2bc51e60b 100644 --- a/publisher/Cargo.toml +++ b/publisher/Cargo.toml @@ -9,6 +9,7 @@ license = "MIT" anyhow = "1" gst = { version = "0.16", package = "gstreamer" } gst-webrtc = { version = "0.16", package = "gstreamer-webrtc" } +gst-sdp = { version = "0.16", package = "gstreamer-sdp" } glib = "0.10" futures = "0.3" log = "0.4" @@ -16,7 +17,8 @@ env_logger = "0.7" serde = "1" serde_json = "1" structopt = "0.3" -tokio = "0.2" +tokio = { version = "0.2", features = ["signal"] } async-tungstenite = { version = "0.8", features = ["tokio-runtime", "tokio-openssl"] } +openssl = "0.10" uuid = "0.8" webrtc-audio-publishing = { path = "../common" } diff --git a/publisher/src/config.rs b/publisher/src/config.rs index f72400f2e4d0e7b39d0265a419fa0fb332987896..c97ff7301895d52e582da8c3a2494039e699deb9 100644 --- a/publisher/src/config.rs +++ b/publisher/src/config.rs @@ -6,6 +6,8 @@ use anyhow::{bail, Error}; use structopt::StructOpt; +use std::path::PathBuf; + #[derive(Debug, Copy, Clone, PartialEq, Eq)] pub enum ChannelConfiguration { Mono, @@ -47,6 +49,10 @@ pub struct Config { #[structopt(long)] pub server_room_description: Option, + /// TLS Certificate chain file. + #[structopt(long)] + pub certificate_file: Option, + /// TURN server to use, e.g. turn://user:password@foo.bar.com:3478. #[structopt(long)] pub turn_server: Option, diff --git a/publisher/src/main.rs b/publisher/src/main.rs index 1fbc0704961b897fedb8a38529b2d24a27a4bc85..f77bbaf9b484505a7cac2e4f6925f628a1b523cc 100644 --- a/publisher/src/main.rs +++ b/publisher/src/main.rs @@ -5,12 +5,12 @@ use anyhow::Error; use structopt::StructOpt; +use log::info; + mod config; use config::Config; - -async fn run(_cfg: &Config) -> Result<(), Error> { - Ok(()) -} +mod publisher; +use publisher::Publisher; fn main() -> Result<(), Error> { let cfg = Config::from_args(); @@ -27,5 +27,17 @@ fn main() -> Result<(), Error> { .enable_all() .build()?; - runtime.block_on(run(&cfg)) + runtime.block_on(async move { + let (publisher, join_handle) = Publisher::run(cfg).await?; + + // Stop cleanly on ctrl+c + let mut publisher_clone = publisher.clone(); + tokio::spawn(async move { + let _ = tokio::signal::ctrl_c().await; + info!("Received ctrl+c"); + let _ = publisher_clone.stop(); + }); + + join_handle.await + }) } diff --git a/publisher/src/publisher.rs b/publisher/src/publisher.rs new file mode 100644 index 0000000000000000000000000000000000000000..a65b37068a0ac0061b9da7017b6b570fd769fdd1 --- /dev/null +++ b/publisher/src/publisher.rs @@ -0,0 +1,524 @@ +// Copyright (C) 2020 Sebastian Dröge +// +// Licensed under the MIT license, see the LICENSE file or + +use anyhow::{bail, format_err, Context, Error}; +use std::sync::{atomic, Arc}; + +use log::{debug, error, info, trace, warn}; + +use async_tungstenite::tungstenite; +use tungstenite::Message as WsMessage; + +use futures::channel::mpsc; +use futures::prelude::*; + +use gst::prelude::*; + +use webrtc_audio_publishing::publisher::{PublisherMessage, ServerMessage}; + +use crate::config::Config; + +/// Publisher control handle. +#[derive(Debug, Clone)] +pub struct Publisher { + event_sender: mpsc::UnboundedSender, + stopped: Arc, +} + +/// Future that can be awaited on to wait for the publisher to stop or error out. +#[derive(Debug)] +pub struct PublisherJoinHandle { + handle: tokio::task::JoinHandle>, +} + +/// Simply wrapping around the tokio `JoinHandle` +impl std::future::Future for PublisherJoinHandle { + type Output = Result<(), Error>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + ctx: &mut std::task::Context, + ) -> std::task::Poll> { + match self.as_mut().handle.poll_unpin(ctx) { + std::task::Poll::Pending => std::task::Poll::Pending, + std::task::Poll::Ready(Err(err)) => { + std::task::Poll::Ready(Err(Error::from(err).context("Joining Publisher"))) + } + std::task::Poll::Ready(Ok(ok)) => std::task::Poll::Ready(ok), + } + } +} + +/// Events for the Publisher event loop. +#[derive(Debug)] +enum PublisherEvent { + /// Sent from the websocket receiver. + WebSocket(ServerMessage), + /// Sent from the GStreamer bus. + GStreamer(gst::Message), + /// Sent from anywhere if an error happens to report back. + Error(anyhow::Error), + /// Sent from stop() and other places. + Close, +} + +/// Wrapper around a `gst::Pipeline` to shut it down when dropped. +#[derive(Debug)] +struct PipelineWrapper(gst::Pipeline); + +impl std::ops::Deref for PipelineWrapper { + type Target = gst::Pipeline; + + fn deref(&self) -> &gst::Pipeline { + &self.0 + } +} + +impl Drop for PipelineWrapper { + fn drop(&mut self) { + debug!("Setting pipeline to Null state"); + let _ = self.0.set_state(gst::State::Null); + } +} + +impl Publisher { + /// Run a new publisher in the background. + /// + /// This tries to connect to the configured server and tries to + /// create the configured room, and otherwise fails directly. + /// + /// If the room can be created successfully it runs the publisher + /// in the background until an error happens or it is stopped. + pub async fn run(cfg: Config) -> Result<(Publisher, PublisherJoinHandle), Error> { + let ws = Self::connect(&cfg).await.context("Connecting to server")?; + + // Channel for the publishers event loop + let (event_sender, mut event_receiver) = mpsc::unbounded::(); + + // Channel for asynchronously sending out websocket message + let (mut ws_sink, mut ws_stream) = ws.split(); + let (websocket_sender, mut websocket_receiver) = mpsc::unbounded::(); + let websocket_send_task = tokio::spawn(async move { + while let Some(msg) = websocket_receiver.next().await { + trace!("Sending websocket message {:?}", msg); + ws_sink + .send(WsMessage::Text( + serde_json::to_string(&msg).expect("Failed to serialize message"), + )) + .await?; + } + + debug!("Closing websocket"); + ws_sink.send(WsMessage::Close(None)).await?; + debug!("Closed websocket"); + ws_sink.close().await?; + + Ok::<(), Error>(()) + }); + + // Read websocket messages and pass them as events to the publisher + let mut event_sender_clone = event_sender.clone(); + tokio::spawn(async move { + while let Some(msg) = ws_stream.next().await { + match msg { + Ok(WsMessage::Text(msg)) => { + let msg = match serde_json::from_str::(&msg) { + Ok(msg) => msg, + Err(err) => { + warn!("Failed to deserialize server message: {:?}", err); + continue; + } + }; + + trace!("Received server message {:?}", msg); + if event_sender_clone + .send(PublisherEvent::WebSocket(msg)) + .await + .is_err() + { + break; + } + } + Ok(WsMessage::Close(reason)) => { + debug!("Websocket closed, reason: {:?}", reason); + let _ = event_sender_clone.send(PublisherEvent::Close).await; + break; + } + Ok(_) => { + warn!("Unsupported websocket message {:?}", msg); + } + Err(err) => { + let _ = event_sender_clone + .send(PublisherEvent::Error( + Error::from(err).context("Receiving websocket message"), + )) + .await; + break; + } + } + } + + debug!("Stopped websocket receiving"); + }); + + // To remember if we already stopped before + let stopped = Arc::new(atomic::AtomicBool::new(false)); + + // Create pipeline and set up notifications with channels. + let pipeline = PipelineWrapper( + Publisher::new(&cfg, event_sender.clone(), websocket_sender.clone()) + .context("Creating publisher")?, + ); + + // Spawn our event loop + let loop_join_handle = tokio::spawn(async move { + // Start up pipeline asynchronously + debug!("Setting publisher pipeline to Playing"); + + pipeline + .call_async_future(|pipeline| pipeline.set_state(gst::State::Playing)) + .await + .context("Starting publisher pipeline")?; + + info!("Publisher running"); + + // Handle all the events + while let Some(event) = event_receiver.next().await { + match event { + PublisherEvent::GStreamer(msg) => { + use gst::message::MessageView; + + match msg.view() { + MessageView::Error(err) => { + let src = err.get_src().map(|src| src.get_path_string()); + let src = src.as_deref().unwrap_or("UNKNOWN"); + let dbg = err.get_debug(); + let err = err.get_error(); + + if let Some(ref dbg) = dbg { + error!("Got error from element {}: {} ({})", src, err, dbg); + } else { + error!("Got error from element {}: {}", src, err); + } + + bail!( + "Error from element {}: {} ({})", + src, + err, + dbg.unwrap_or_else(|| String::from("None")), + ); + } + MessageView::Warning(warn) => { + let src = warn.get_src().map(|src| src.get_path_string()); + let src = src.as_deref().unwrap_or("UNKNOWN"); + let dbg = warn.get_debug(); + let err = warn.get_error(); + + if let Some(ref dbg) = dbg { + warn!("Got warning from element {}: {} ({})", src, err, dbg); + } else { + warn!("Got warning from element {}: {}", src, err); + } + } + _ => (), + } + } + PublisherEvent::WebSocket(msg) => match msg { + ServerMessage::Sdp { type_, sdp } => match type_.as_str() { + "answer" => { + let sdp = gst_sdp::SDPMessage::parse_buffer(sdp.as_bytes()) + .map_err(|_| format_err!("Failed to parse SDP answer"))?; + + debug!("Received SDP answer: {:#?}", sdp); + + let answer = gst_webrtc::WebRTCSessionDescription::new( + gst_webrtc::WebRTCSDPType::Answer, + sdp, + ); + + let webrtcbin = pipeline + .get_by_name("webrtcbin") + .expect("Can't find webrtcbin"); + webrtcbin + .emit( + "set-remote-description", + &[&answer, &None::], + ) + .expect("Failed to emit set-remote-description signal"); + } + _ => { + error!("Unsupported SDP type {}", type_); + break; + } + }, + ServerMessage::Ice { + sdp_mline_index, + candidate, + } => { + debug!( + "Adding remote ICE candidate: {} at SDP mline index {}", + candidate, sdp_mline_index + ); + + let webrtcbin = pipeline + .get_by_name("webrtcbin") + .expect("Can't find webrtcbin"); + webrtcbin + .emit("add-ice-candidate", &[&sdp_mline_index, &candidate]) + .expect("Failed to emit add-ice-candidate signal"); + } + ServerMessage::Error { message } => { + error!("Got server error: {}", message); + bail!("Server error: {}", message); + } + ServerMessage::RoomCreated { .. } => { + error!("Unexpected RoomCreated server message"); + continue; + } + }, + PublisherEvent::Error(err) => { + error!("Received error {:?}, stopping", err); + return Err(err); + } + PublisherEvent::Close => { + info!("Shutting down"); + websocket_sender.close_channel(); + event_receiver.close(); + websocket_send_task.await.context("Closing websocket")??; + break; + } + } + } + + // Stop pipeline asynchronously + debug!("Setting publisher pipeline to Null"); + let _ = pipeline + .call_async_future(|pipeline| pipeline.set_state(gst::State::Null)) + .await; + + Ok(()) + }); + + Ok(( + Publisher { + event_sender, + stopped, + }, + PublisherJoinHandle { + handle: loop_join_handle, + }, + )) + } + + /// Connect to the WebSocket server and create a room. + async fn connect( + cfg: &Config, + ) -> Result, Error> { + debug!("Connecting to {}", cfg.server); + + // Connect to the configured server and create a room + let (mut ws, _) = if let Some(ref certificate_file) = cfg.certificate_file { + use openssl::ssl::{SslConnector, SslMethod}; + + let mut builder = SslConnector::builder(SslMethod::tls())?; + builder.set_ca_file(certificate_file)?; + + let connector = builder.build().configure()?; + + async_tungstenite::tokio::connect_async_with_tls_connector(&cfg.server, Some(connector)) + .await? + } else { + async_tungstenite::tokio::connect_async(&cfg.server).await? + }; + + debug!( + "Connected to {}, creating room {} (description: {:?})", + cfg.server, cfg.server_room, cfg.server_room_description + ); + + ws.send(WsMessage::Text( + serde_json::to_string(&PublisherMessage::CreateRoom { + name: cfg.server_room.clone(), + description: cfg.server_room_description.clone(), + }) + .expect("Failed to serialize create room message"), + )) + .await?; + + let msg = ws + .next() + .await + .ok_or_else(|| format_err!("didn't receive anything"))??; + + let msg = match msg { + WsMessage::Text(msg) => serde_json::from_str::(&msg)?, + _ => bail!("Unexpected websocket message: {:?}", msg), + }; + + match msg { + ServerMessage::RoomCreated { id } => { + info!("Created room {}", id); + } + ServerMessage::Error { message } => { + bail!("Error creating room: {}", message); + } + _ => bail!("Unexpected server message: {:?}", msg), + } + + Ok(ws) + } + + /// Create the pipeline and set up notifications. + fn new( + cfg: &Config, + event_sender: mpsc::UnboundedSender, + websocket_sender: mpsc::UnboundedSender, + ) -> Result { + // Create the GStreamer pipeline + let pipeline = gst::parse_launch( + "audiotestsrc is-live=true ! opusenc ! rtpopuspay pt=96 ! webrtcbin name=webrtcbin", + )? + .downcast::() + .expect("Not a pipeline"); + + let webrtcbin = pipeline + .get_by_name("webrtcbin") + .expect("Can't find webrtcbin"); + + if let Some(ref stun_server) = &cfg.stun_server { + webrtcbin + .set_property("stun-server", stun_server) + .expect("Failed to set stun-server property"); + } + if let Some(ref turn_server) = &cfg.turn_server { + webrtcbin + .set_property("turn-server", turn_server) + .expect("Failed to set turn-server property"); + } + webrtcbin.set_property_from_str("bundle-policy", "max-bundle"); + + // Spawn task for forwarding all GStreamer messages to our event loop + let bus = pipeline.get_bus().expect("Pipeline without bus"); + let mut bus_stream = bus.stream(); + let mut event_sender_clone = event_sender.clone(); + tokio::spawn(async move { + while let Some(msg) = bus_stream.next().await { + if event_sender_clone + .send(PublisherEvent::GStreamer(msg)) + .await + .is_err() + { + break; + } + } + }); + + // Connect to on-negotiation-needed to handle sending an offer SDP + let handle = tokio::runtime::Handle::current(); + let websocket_sender_clone = websocket_sender.clone(); + let event_sender_clone = event_sender.clone(); + webrtcbin + .connect("on-negotiation-needed", false, move |values| { + let webrtc = values[0] + .get::() + .expect("Invalid argument") + .unwrap(); + + let mut event_sender_clone = event_sender_clone.clone(); + let websocket_sender_clone = websocket_sender_clone.clone(); + handle.spawn(async move { + if let Err(err) = Self::start_negotiation(webrtc, websocket_sender_clone).await + { + let _ = event_sender_clone.send(PublisherEvent::Error(err)).await; + } + }); + + None + }) + .unwrap(); + + let websocket_sender_clone = websocket_sender.clone(); + webrtcbin + .connect("on-ice-candidate", false, move |values| { + let _webrtc = values[0].get::().expect("Invalid argument"); + let sdp_mline_index = values[1].get_some::().expect("Invalid argument"); + let candidate = values[2] + .get::() + .expect("Invalid argument") + .unwrap(); + + let _ = websocket_sender_clone.unbounded_send(PublisherMessage::Ice { + candidate, + sdp_mline_index, + }); + + None + }) + .unwrap(); + + Ok(pipeline) + } + + /// Stops the publisher. + pub fn stop(&mut self) -> Result<(), Error> { + if !self + .stopped + .compare_and_swap(false, true, atomic::Ordering::SeqCst) + { + info!("Stopping publisher"); + self.event_sender + .unbounded_send(PublisherEvent::Close) + .context("Stopping publisher") + } else { + Ok(()) + } + } + + /// Starts negotiation, creates an offer and sends it to the peer. + async fn start_negotiation( + webrtcbin: gst::Element, + mut websocket_sender: mpsc::UnboundedSender, + ) -> Result<(), Error> { + // Create an offer + debug!("Creating offer"); + let (promise, fut) = gst::Promise::new_future(); + webrtcbin + .emit("create-offer", &[&None::, &promise]) + .expect("Failed to emit create-offer signal"); + let reply = fut.await; + + // Chech if we got a valid offer + let reply = match reply { + Ok(Some(reply)) => reply, + Ok(None) => { + bail!("Offer creation got no reponse"); + } + Err(err) => { + bail!("Offer creation got error reponse: {:?}", err); + } + }; + + let offer = reply + .get_value("offer") + .expect("Invalid argument") + .get::() + .expect("Invalid argument") + .unwrap(); + + debug!("Created offer {:#?}", offer.get_sdp()); + + webrtcbin + .emit("set-local-description", &[&offer, &None::]) + .expect("Failed to emit set-local-description signal"); + + let _ = websocket_sender + .send(PublisherMessage::Sdp { + type_: String::from("offer"), + sdp: offer.get_sdp().as_text().expect("Invalid offer"), + }) + .await; + + Ok(()) + } +}