// Copyright (C) 2020 Sebastian Dröge // // Licensed under the MIT license, see the LICENSE file or use std::collections::{HashMap, HashSet}; use std::sync::Mutex; use actix::{Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, MessageResult}; use anyhow::{bail, Error}; use log::{debug, error, info}; use crate::publisher::{self, Publisher}; use crate::subscriber::{self, Subscriber}; /// Unique identifier for a `Room`. #[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)] pub struct RoomId(u64); /// Message to create a new `Room` for a `Publisher`. #[derive(Debug)] pub struct CreateRoomMessage { pub publisher: Addr, pub name: String, pub description: Option, } impl Message for CreateRoomMessage { type Result = Result, Error>; } /// Message to find an existing `Room`. #[derive(Debug)] pub struct FindRoomMessage { pub room_id: RoomId, } impl Message for FindRoomMessage { type Result = Option>; } /// Message to list all currently existing `Room`s. #[derive(Debug)] pub struct ListRoomsMessage; impl Message for ListRoomsMessage { type Result = Vec>; } /// Sent from a `Room` to remove itself from the list of `Rooms` after the `Publisher` deleted it. #[derive(Debug)] struct RoomDeletedMessage { room_id: RoomId, } impl Message for RoomDeletedMessage { type Result = (); } /// Actor that keeps track of all currently existing `Room`s. #[derive(Debug)] pub struct Rooms { rooms: Mutex>>, } impl Rooms { /// Create a new `Rooms` instance. pub fn new() -> Self { Self { rooms: Mutex::new(HashMap::new()), } } } impl Actor for Rooms { type Context = Context; } impl Handler for Rooms { type Result = Result, Error>; fn handle(&mut self, msg: CreateRoomMessage, ctx: &mut Context) -> Self::Result { info!("Creating new room for message {:?}", msg); let mut rooms = self.rooms.lock().unwrap(); let room = Room::new(ctx.address(), msg.name, msg.description, msg.publisher); let room_id = room.id; info!("Created new room {:?}", room_id); let room_addr = room.start(); rooms.insert(room_id, room_addr.clone()); Ok(room_addr) } } impl Handler for Rooms { type Result = Option>; fn handle(&mut self, msg: FindRoomMessage, _ctx: &mut Context) -> Self::Result { debug!("Finding room {:?}", msg.room_id); let rooms = self.rooms.lock().unwrap(); rooms.get(&msg.room_id).cloned() } } impl Handler for Rooms { type Result = MessageResult; fn handle(&mut self, _msg: ListRoomsMessage, _ctx: &mut Context) -> Self::Result { debug!("Listing all current rooms"); MessageResult(self.rooms.lock().unwrap().values().cloned().collect()) } } impl Handler for Rooms { type Result = (); fn handle(&mut self, msg: RoomDeletedMessage, _ctx: &mut Context) -> Self::Result { let mut rooms = self.rooms.lock().unwrap(); info!("Room {:?} destroyed", msg.room_id); rooms.remove(&msg.room_id).expect("Room not found"); } } /// Request `RoomInformation` from a given `Room`. #[derive(Debug)] pub struct RoomInformationMessage; /// Room information returned from the `RoomInformationMessage`. #[derive(Debug)] pub struct RoomInformation { pub id: RoomId, pub name: String, pub description: Option, } impl Message for RoomInformationMessage { type Result = RoomInformation; } /// Delete a `Room` from a `Publisher`. #[derive(Debug)] pub struct DeleteRoomMessage { pub publisher: Addr, } impl Message for DeleteRoomMessage { type Result = Result<(), Error>; } /// Join a `Room` by a `Subscriber`. #[derive(Debug)] pub struct JoinRoomMessage { pub subscriber: Addr, } impl Message for JoinRoomMessage { type Result = Result<(), Error>; } /// Leave a `Room` by a `Subscriber`. #[derive(Debug)] pub struct LeaveRoomMessage { pub subscriber: Addr, } impl Message for LeaveRoomMessage { type Result = Result<(), Error>; } /// Actor that manages a `Room` with its `Publisher` and `Subscriber`s. #[derive(Debug)] pub struct Room { id: RoomId, name: String, description: Option, rooms: Addr, publisher: Addr, subscribers: Mutex>>, } impl Room { /// Returns a next valid `RoomId`. fn next_room_id() -> RoomId { // XXX: We assume that no more than 2**64 rooms are ever created use std::sync::atomic::{self, AtomicU64}; static ROOM_ID: AtomicU64 = AtomicU64::new(0); RoomId(ROOM_ID.fetch_add(1, atomic::Ordering::Relaxed)) } /// Create a new `Room`. fn new( rooms: Addr, name: String, description: Option, publisher: Addr, ) -> Self { Room { rooms, id: Self::next_room_id(), name, description, publisher, subscribers: Mutex::new(HashSet::new()), } } } impl Actor for Room { type Context = Context; } impl Handler for Room { type Result = Result<(), Error>; fn handle(&mut self, msg: DeleteRoomMessage, ctx: &mut Context) -> Self::Result { if self.publisher == msg.publisher { info!("Deleting room {:?}", self.id); { let subscribers = self.subscribers.lock().unwrap(); for subscriber in &*subscribers { subscriber.do_send(subscriber::RoomDeletedMessage); } } self.rooms.do_send(RoomDeletedMessage { room_id: self.id }); ctx.stop(); Ok(()) } else { error!( "Tried to delete room {:?} from wrong publisher {:?}", self.id, msg.publisher ); bail!("Deleting room {:?} not permitted", self.id) } } } impl Handler for Room { type Result = Result<(), Error>; fn handle(&mut self, msg: JoinRoomMessage, _ctx: &mut Context) -> Self::Result { info!( "Joining room {:?} by subscriber {:?}", self.id, msg.subscriber ); self.publisher.do_send(publisher::NewSubscriberMessage { subscriber: msg.subscriber.clone(), }); let mut subscribers = self.subscribers.lock().unwrap(); subscribers.insert(msg.subscriber); Ok(()) } } impl Handler for Room { type Result = Result<(), Error>; fn handle(&mut self, msg: LeaveRoomMessage, _ctx: &mut Context) -> Self::Result { info!( "Leaving room {:?} by subscriber {:?}", self.id, msg.subscriber ); self.publisher.do_send(publisher::LeavingSubscriberMessage { subscriber: msg.subscriber.clone(), }); let mut subscribers = self.subscribers.lock().unwrap(); subscribers.remove(&msg.subscriber); Ok(()) } } impl Handler for Room { type Result = MessageResult; fn handle(&mut self, _msg: RoomInformationMessage, _ctx: &mut Context) -> Self::Result { debug!("Returning room information for room {:?}", self.id); MessageResult(RoomInformation { id: self.id, name: self.name.clone(), description: self.description.clone(), }) } }