Snippets Groups Projects
Commit bdde2015 authored by Sebastian Dröge's avatar Sebastian Dröge
Browse files

Minor cleanup of Rooms management actors

parent 072192da
No related merge requests found
......@@ -10,11 +10,11 @@ use std::sync::{Arc, Mutex};
use anyhow::Error;
use actix::{Actor, Addr, Handler, Message, StreamHandler};
use actix::{Actor, Addr, AsyncContext, Handler, Message, StreamHandler};
use actix_web_actors::ws;
use log::debug;
use log::{debug, trace};
/// Actor that represents a WebRTC publisher.
#[derive(Debug)]
......@@ -37,6 +37,13 @@ impl Publisher {
impl Actor for Publisher {
type Context = ws::WebsocketContext<Self>;
fn stopped(&mut self, ctx: &mut Self::Context) {
// Drop reference to the joined room, if any
self.room.lock().unwrap().take();
trace!("Publisher {:?} stopped", ctx.address());
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Publisher {
......
......@@ -7,7 +7,7 @@ use std::sync::Mutex;
use actix::{Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, MessageResult};
use anyhow::{bail, Error};
use log::{debug, error, info};
use log::{debug, error, info, trace};
use crate::publisher::{self, Publisher};
use crate::subscriber::{self, Subscriber};
......@@ -218,32 +218,37 @@ impl Room {
impl Actor for Room {
type Context = Context<Self>;
fn stopped(&mut self, _ctx: &mut Self::Context) {
trace!("Room {:?} stopped", self.id);
}
}
impl Handler<DeleteRoomMessage> for Room {
type Result = Result<(), Error>;
fn handle(&mut self, msg: DeleteRoomMessage, ctx: &mut Context<Self>) -> Self::Result {
if self.publisher == msg.publisher {
info!("Deleting room {:?}", self.id);
{
let subscribers = self.subscribers.lock().unwrap();
for subscriber in &*subscribers {
subscriber.do_send(subscriber::RoomDeletedMessage);
}
}
self.rooms.do_send(RoomDeletedMessage { room_id: self.id });
ctx.stop();
Ok(())
} else {
if self.publisher != msg.publisher {
error!(
"Tried to delete room {:?} from wrong publisher {:?}",
self.id, msg.publisher
);
bail!("Deleting room {:?} not permitted", self.id)
bail!("Deleting room {:?} not permitted", self.id);
}
info!("Deleting room {:?}", self.id);
{
let mut subscribers = self.subscribers.lock().unwrap();
for subscriber in subscribers.drain() {
subscriber.do_send(subscriber::RoomDeletedMessage);
}
}
self.rooms.do_send(RoomDeletedMessage { room_id: self.id });
ctx.stop();
Ok(())
}
}
......@@ -276,13 +281,25 @@ impl Handler<LeaveRoomMessage> for Room {
self.id, msg.subscriber
);
{
let mut subscribers = self.subscribers.lock().unwrap();
if !subscribers.remove(&msg.subscriber) {
error!(
"Room {:?} didn't have subscriber {:?}",
self.id, msg.subscriber
);
bail!(
"Room {:?} didn't have subscriber {:?}",
self.id,
msg.subscriber
);
}
}
self.publisher.do_send(publisher::LeavingSubscriberMessage {
subscriber: msg.subscriber.clone(),
});
let mut subscribers = self.subscribers.lock().unwrap();
subscribers.remove(&msg.subscriber);
Ok(())
}
}
......
......@@ -7,11 +7,11 @@ use crate::rooms::{Room, Rooms};
use std::sync::{Arc, Mutex};
use actix::{Actor, Addr, Handler, Message, StreamHandler};
use actix::{Actor, Addr, AsyncContext, Handler, Message, StreamHandler};
use actix_web_actors::ws;
use log::debug;
use log::{debug, trace};
/// Actor that represents a WebRTC subscriber.
#[derive(Debug)]
......@@ -34,6 +34,13 @@ impl Subscriber {
impl Actor for Subscriber {
type Context = ws::WebsocketContext<Self>;
fn stopped(&mut self, ctx: &mut Self::Context) {
// Drop reference to the joined room, if any
self.room.lock().unwrap().take();
trace!("Subscriber {:?} stopped", ctx.address());
}
}
impl StreamHandler<Result<ws::Message, ws::ProtocolError>> for Subscriber {
......@@ -60,6 +67,9 @@ impl Handler<RoomDeletedMessage> for Subscriber {
) -> Self::Result {
debug!("Room deleted");
// Drop reference to the joined room
self.room.lock().unwrap().take();
// TODO
}
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment