From bdde201550510288d32f8d657a4c99eac9953370 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 14 Sep 2020 17:26:28 +0300 Subject: [PATCH] Minor cleanup of Rooms management actors --- server/src/publisher.rs | 11 ++++++-- server/src/rooms.rs | 55 ++++++++++++++++++++++++++-------------- server/src/subscriber.rs | 14 ++++++++-- 3 files changed, 57 insertions(+), 23 deletions(-) diff --git a/server/src/publisher.rs b/server/src/publisher.rs index e66c134..46b7301 100644 --- a/server/src/publisher.rs +++ b/server/src/publisher.rs @@ -10,11 +10,11 @@ use std::sync::{Arc, Mutex}; use anyhow::Error; -use actix::{Actor, Addr, Handler, Message, StreamHandler}; +use actix::{Actor, Addr, AsyncContext, Handler, Message, StreamHandler}; use actix_web_actors::ws; -use log::debug; +use log::{debug, trace}; /// Actor that represents a WebRTC publisher. #[derive(Debug)] @@ -37,6 +37,13 @@ impl Publisher { impl Actor for Publisher { type Context = ws::WebsocketContext; + + fn stopped(&mut self, ctx: &mut Self::Context) { + // Drop reference to the joined room, if any + self.room.lock().unwrap().take(); + + trace!("Publisher {:?} stopped", ctx.address()); + } } impl StreamHandler> for Publisher { diff --git a/server/src/rooms.rs b/server/src/rooms.rs index 1e53cac..8fdc635 100644 --- a/server/src/rooms.rs +++ b/server/src/rooms.rs @@ -7,7 +7,7 @@ use std::sync::Mutex; use actix::{Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, MessageResult}; use anyhow::{bail, Error}; -use log::{debug, error, info}; +use log::{debug, error, info, trace}; use crate::publisher::{self, Publisher}; use crate::subscriber::{self, Subscriber}; @@ -218,32 +218,37 @@ impl Room { impl Actor for Room { type Context = Context; + + fn stopped(&mut self, _ctx: &mut Self::Context) { + trace!("Room {:?} stopped", self.id); + } } impl Handler for Room { type Result = Result<(), Error>; fn handle(&mut self, msg: DeleteRoomMessage, ctx: &mut Context) -> Self::Result { - if self.publisher == msg.publisher { - info!("Deleting room {:?}", self.id); - - { - let subscribers = self.subscribers.lock().unwrap(); - for subscriber in &*subscribers { - subscriber.do_send(subscriber::RoomDeletedMessage); - } - } - self.rooms.do_send(RoomDeletedMessage { room_id: self.id }); - - ctx.stop(); - Ok(()) - } else { + if self.publisher != msg.publisher { error!( "Tried to delete room {:?} from wrong publisher {:?}", self.id, msg.publisher ); - bail!("Deleting room {:?} not permitted", self.id) + bail!("Deleting room {:?} not permitted", self.id); + } + + info!("Deleting room {:?}", self.id); + + { + let mut subscribers = self.subscribers.lock().unwrap(); + for subscriber in subscribers.drain() { + subscriber.do_send(subscriber::RoomDeletedMessage); + } } + self.rooms.do_send(RoomDeletedMessage { room_id: self.id }); + + ctx.stop(); + + Ok(()) } } @@ -276,13 +281,25 @@ impl Handler for Room { self.id, msg.subscriber ); + { + let mut subscribers = self.subscribers.lock().unwrap(); + if !subscribers.remove(&msg.subscriber) { + error!( + "Room {:?} didn't have subscriber {:?}", + self.id, msg.subscriber + ); + bail!( + "Room {:?} didn't have subscriber {:?}", + self.id, + msg.subscriber + ); + } + } + self.publisher.do_send(publisher::LeavingSubscriberMessage { subscriber: msg.subscriber.clone(), }); - let mut subscribers = self.subscribers.lock().unwrap(); - subscribers.remove(&msg.subscriber); - Ok(()) } } diff --git a/server/src/subscriber.rs b/server/src/subscriber.rs index 8348f43..131433f 100644 --- a/server/src/subscriber.rs +++ b/server/src/subscriber.rs @@ -7,11 +7,11 @@ use crate::rooms::{Room, Rooms}; use std::sync::{Arc, Mutex}; -use actix::{Actor, Addr, Handler, Message, StreamHandler}; +use actix::{Actor, Addr, AsyncContext, Handler, Message, StreamHandler}; use actix_web_actors::ws; -use log::debug; +use log::{debug, trace}; /// Actor that represents a WebRTC subscriber. #[derive(Debug)] @@ -34,6 +34,13 @@ impl Subscriber { impl Actor for Subscriber { type Context = ws::WebsocketContext; + + fn stopped(&mut self, ctx: &mut Self::Context) { + // Drop reference to the joined room, if any + self.room.lock().unwrap().take(); + + trace!("Subscriber {:?} stopped", ctx.address()); + } } impl StreamHandler> for Subscriber { @@ -60,6 +67,9 @@ impl Handler for Subscriber { ) -> Self::Result { debug!("Room deleted"); + // Drop reference to the joined room + self.room.lock().unwrap().take(); + // TODO } } -- GitLab