diff --git a/Cargo.lock b/Cargo.lock index d5c45c8567fdb1169f35320ad36acfeb0ca6dac8..58108c57249f4392b3e2740c9fbc8c0f083bfe13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1010,68 +1010,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "gstreamer-app" -version = "0.16.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "330e3e40c0e27680e52e38c599353dddfb14c373a148dd9154243f7cf92ba3c2" -dependencies = [ - "bitflags", - "futures-core", - "futures-sink", - "glib", - "glib-sys", - "gobject-sys", - "gstreamer", - "gstreamer-app-sys", - "gstreamer-base", - "gstreamer-sys", - "libc", - "once_cell", -] - -[[package]] -name = "gstreamer-app-sys" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "813f64275c9e7b33b828b9efcf9dfa64b95996766d4de996e84363ac65b87e3d" -dependencies = [ - "glib-sys", - "gstreamer-base-sys", - "gstreamer-sys", - "libc", - "system-deps", -] - -[[package]] -name = "gstreamer-base" -version = "0.16.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "082f83c11a9d568e94e3a29d9fb22bf86dda44becfd4ba1d18446766b678f07c" -dependencies = [ - "bitflags", - "glib", - "glib-sys", - "gobject-sys", - "gstreamer", - "gstreamer-base-sys", - "gstreamer-sys", - "libc", -] - -[[package]] -name = "gstreamer-base-sys" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a4b7b6dc2d6e160a1ae28612f602bd500b3fa474ce90bf6bb2f08072682beef5" -dependencies = [ - "glib-sys", - "gobject-sys", - "gstreamer-sys", - "libc", - "system-deps", -] - [[package]] name = "gstreamer-sdp" version = "0.16.3" @@ -2550,7 +2488,6 @@ dependencies = [ "futures", "glib", "gstreamer", - "gstreamer-app", "gstreamer-webrtc", "log", "openssl", diff --git a/server/src/publisher.rs b/server/src/publisher.rs index 46b73019f0af38da633e047f1cc12ed4b0a37cc6..0962811db538a30c21f3e3bd73ba1cf54d5179bf 100644 --- a/server/src/publisher.rs +++ b/server/src/publisher.rs @@ -6,12 +6,12 @@ use crate::config::Config; use crate::rooms::{Room, Rooms}; use crate::subscriber::Subscriber; -use std::sync::{Arc, Mutex}; - -use anyhow::Error; +use anyhow::{format_err, Error}; -use actix::{Actor, Addr, AsyncContext, Handler, Message, StreamHandler}; +use std::sync::{Arc, Mutex}; +use actix::{Actor, Addr, Handler, Message, StreamHandler, WeakAddr}; +use actix_web::dev::ConnectionInfo; use actix_web_actors::ws; use log::{debug, trace}; @@ -20,29 +20,41 @@ use log::{debug, trace}; #[derive(Debug)] pub struct Publisher { cfg: Arc, - rooms: Addr, + rooms: WeakAddr, + remote_addr: String, room: Mutex>>, } impl Publisher { /// Create a new `Publisher` actor. - pub fn new(cfg: Arc, rooms: Addr) -> Self { - Publisher { + pub fn new( + cfg: Arc, + rooms: Addr, + connection_info: &ConnectionInfo, + ) -> Result { + debug!("Creating new publisher {:?}", connection_info); + + let remote_addr = connection_info + .realip_remote_addr() + .ok_or_else(|| format_err!("WebSocket connection without remote address"))?; + + Ok(Publisher { cfg, - rooms, + rooms: rooms.downgrade(), + remote_addr: String::from(remote_addr), room: Mutex::new(None), - } + }) } } impl Actor for Publisher { type Context = ws::WebsocketContext; - fn stopped(&mut self, ctx: &mut Self::Context) { + fn stopped(&mut self, _ctx: &mut Self::Context) { // Drop reference to the joined room, if any self.room.lock().unwrap().take(); - trace!("Publisher {:?} stopped", ctx.address()); + trace!("Publisher {} stopped", self.remote_addr); } } diff --git a/server/src/rooms.rs b/server/src/rooms.rs index df8f3990b0e7eb53815eb98f97f18313537f37ef..beaf042efcd16afa9e78a27926a298c3bf3db321 100644 --- a/server/src/rooms.rs +++ b/server/src/rooms.rs @@ -6,7 +6,9 @@ use std::collections::{HashMap, HashSet}; use std::fmt; use std::sync::Mutex; -use actix::{Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, MessageResult}; +use actix::{ + Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, MessageResult, WeakAddr, +}; use anyhow::{bail, Error}; use log::{debug, error, info, trace}; @@ -188,7 +190,7 @@ pub struct Room { name: String, description: Option, - rooms: Addr, + rooms: WeakAddr, publisher: Addr, subscribers: Mutex>>, @@ -203,7 +205,7 @@ impl Room { publisher: Addr, ) -> Self { Room { - rooms, + rooms: rooms.downgrade(), id: RoomId(uuid::Uuid::new_v4()), name, description, @@ -241,7 +243,10 @@ impl Handler for Room { subscriber.do_send(subscriber::RoomDeletedMessage); } } - self.rooms.do_send(RoomDeletedMessage { room_id: self.id }); + + if let Some(rooms) = self.rooms.upgrade() { + rooms.do_send(RoomDeletedMessage { room_id: self.id }); + } ctx.stop(); diff --git a/server/src/server.rs b/server/src/server.rs index 1c131e47f1e62e0d267e6601d05dfec6b910d53c..ec21795d3c5233bbf9451b2bdfec1f0de1dcf9bc 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -12,6 +12,8 @@ use actix_files::NamedFile; use actix_web::{web, App, HttpRequest, HttpResponse, HttpServer, Responder}; use actix_web_actors::ws; +use log::error; + /// Serve `index.html` for path `/`. async fn index(cfg: web::Data) -> Result { let full_path = cfg.static_files.join("index.html"); @@ -30,16 +32,32 @@ async fn ws( stream: web::Payload, ) -> impl Responder { match path.as_str() { - "publish" => ws::start( - Publisher::new(cfg.into_inner(), rooms.as_ref().clone()), - &req, - stream, - ), - "subscribe" => ws::start( - Subscriber::new(cfg.into_inner(), rooms.as_ref().clone()), - &req, - stream, - ), + "publish" => { + let publisher = Publisher::new( + cfg.into_inner(), + rooms.as_ref().clone(), + &req.connection_info(), + ) + .map_err(|err| { + error!("Failed to create publisher: {}", err); + HttpResponse::InternalServerError() + })?; + + ws::start(publisher, &req, stream) + } + "subscribe" => { + let subscriber = Subscriber::new( + cfg.into_inner(), + rooms.as_ref().clone(), + &req.connection_info(), + ) + .map_err(|err| { + error!("Failed to create subscriber: {}", err); + HttpResponse::InternalServerError() + })?; + + ws::start(subscriber, &req, stream) + } _ => Ok(HttpResponse::NotFound().finish()), } } diff --git a/server/src/subscriber.rs b/server/src/subscriber.rs index 131433fe40b351b9a5f79e291b3cd596f31c51c8..fe934a8193ca99b289847725c1f5e74dd302ee2d 100644 --- a/server/src/subscriber.rs +++ b/server/src/subscriber.rs @@ -5,10 +5,12 @@ use crate::config::Config; use crate::rooms::{Room, Rooms}; -use std::sync::{Arc, Mutex}; +use anyhow::{format_err, Error}; -use actix::{Actor, Addr, AsyncContext, Handler, Message, StreamHandler}; +use std::sync::{Arc, Mutex}; +use actix::{Actor, Addr, Handler, Message, StreamHandler, WeakAddr}; +use actix_web::dev::ConnectionInfo; use actix_web_actors::ws; use log::{debug, trace}; @@ -17,29 +19,41 @@ use log::{debug, trace}; #[derive(Debug)] pub struct Subscriber { cfg: Arc, - rooms: Addr, - room: Mutex>>, + rooms: WeakAddr, + remote_addr: String, + room: Mutex>>, } impl Subscriber { /// Create a new `Subscriber` actor. - pub fn new(cfg: Arc, rooms: Addr) -> Self { - Subscriber { + pub fn new( + cfg: Arc, + rooms: Addr, + connection_info: &ConnectionInfo, + ) -> Result { + debug!("Creating new subscriber {:?}", connection_info); + + let remote_addr = connection_info + .realip_remote_addr() + .ok_or_else(|| format_err!("WebSocket connection without remote address"))?; + + Ok(Subscriber { cfg, - rooms, + rooms: rooms.downgrade(), + remote_addr: String::from(remote_addr), room: Mutex::new(None), - } + }) } } impl Actor for Subscriber { type Context = ws::WebsocketContext; - fn stopped(&mut self, ctx: &mut Self::Context) { + fn stopped(&mut self, _ctx: &mut Self::Context) { // Drop reference to the joined room, if any self.room.lock().unwrap().take(); - trace!("Subscriber {:?} stopped", ctx.address()); + trace!("Subscriber {} stopped", self.remote_addr); } }