rooms.rs 7.67 KiB
Newer Older
// Copyright (C) 2020 Sebastian Dröge <sebastian@centricular.com>
//
// Licensed under the MIT license, see the LICENSE file or <http://opensource.org/licenses/MIT>

use std::collections::{HashMap, HashSet};
use std::sync::Mutex;

use actix::{Actor, ActorContext, Addr, AsyncContext, Context, Handler, Message, MessageResult};
use anyhow::{bail, Error};
use log::{debug, error, info};

use crate::publisher::{self, Publisher};
use crate::subscriber::{self, Subscriber};

/// Unique identifier for a `Room`.
#[derive(Debug, PartialEq, Eq, Copy, Clone, Hash)]
pub struct RoomId(u64);

/// Message to create a new `Room` for a `Publisher`.
#[derive(Debug)]
pub struct CreateRoomMessage {
    pub publisher: Addr<Publisher>,
    pub name: String,
    pub description: Option<String>,
}

impl Message for CreateRoomMessage {
    type Result = Result<Addr<Room>, Error>;
}

/// Message to find an existing `Room`.
#[derive(Debug)]
pub struct FindRoomMessage {
    pub room_id: RoomId,
}

impl Message for FindRoomMessage {
    type Result = Option<Addr<Room>>;
}

/// Message to list all currently existing `Room`s.
#[derive(Debug)]
pub struct ListRoomsMessage;

impl Message for ListRoomsMessage {
    type Result = Vec<Addr<Room>>;
}

/// Sent from a `Room` to remove itself from the list of `Rooms` after the `Publisher` deleted it.
#[derive(Debug)]
struct RoomDeletedMessage {
    room_id: RoomId,
}

impl Message for RoomDeletedMessage {
    type Result = ();
}

/// Actor that keeps track of all currently existing `Room`s.
#[derive(Debug)]
pub struct Rooms {
    rooms: Mutex<HashMap<RoomId, Addr<Room>>>,
}

impl Rooms {
    /// Create a new `Rooms` instance.
    pub fn new() -> Self {
        Self {
            rooms: Mutex::new(HashMap::new()),
        }
    }
}

impl Actor for Rooms {
    type Context = Context<Self>;
}

impl Handler<CreateRoomMessage> for Rooms {
    type Result = Result<Addr<Room>, Error>;

    fn handle(&mut self, msg: CreateRoomMessage, ctx: &mut Context<Self>) -> Self::Result {
        info!("Creating new room for message {:?}", msg);

        let mut rooms = self.rooms.lock().unwrap();

        let room = Room::new(ctx.address(), msg.name, msg.description, msg.publisher);
        let room_id = room.id;

        info!("Created new room {:?}", room_id);

        let room_addr = room.start();
        rooms.insert(room_id, room_addr.clone());

        Ok(room_addr)
    }
}

impl Handler<FindRoomMessage> for Rooms {
    type Result = Option<Addr<Room>>;

    fn handle(&mut self, msg: FindRoomMessage, _ctx: &mut Context<Self>) -> Self::Result {
        debug!("Finding room {:?}", msg.room_id);

        let rooms = self.rooms.lock().unwrap();
        rooms.get(&msg.room_id).cloned()
    }
}

impl Handler<ListRoomsMessage> for Rooms {
    type Result = MessageResult<ListRoomsMessage>;

    fn handle(&mut self, _msg: ListRoomsMessage, _ctx: &mut Context<Self>) -> Self::Result {
        debug!("Listing all current rooms");

        MessageResult(self.rooms.lock().unwrap().values().cloned().collect())
    }
}

impl Handler<RoomDeletedMessage> for Rooms {
    type Result = ();

    fn handle(&mut self, msg: RoomDeletedMessage, _ctx: &mut Context<Self>) -> Self::Result {
        let mut rooms = self.rooms.lock().unwrap();

        info!("Room {:?} destroyed", msg.room_id);

        rooms.remove(&msg.room_id).expect("Room not found");
    }
}

/// Request `RoomInformation` from a given `Room`.
#[derive(Debug)]
pub struct RoomInformationMessage;

/// Room information returned from the `RoomInformationMessage`.
#[derive(Debug)]
pub struct RoomInformation {
    pub id: RoomId,
    pub name: String,
    pub description: Option<String>,
}

impl Message for RoomInformationMessage {
    type Result = RoomInformation;
}

/// Delete a `Room` from a `Publisher`.
#[derive(Debug)]
pub struct DeleteRoomMessage {
    pub publisher: Addr<Publisher>,
}

impl Message for DeleteRoomMessage {
    type Result = Result<(), Error>;
}

/// Join a `Room` by a `Subscriber`.
#[derive(Debug)]
pub struct JoinRoomMessage {
    pub subscriber: Addr<Subscriber>,
}

impl Message for JoinRoomMessage {
    type Result = Result<(), Error>;
}

/// Leave a `Room` by a `Subscriber`.
#[derive(Debug)]
pub struct LeaveRoomMessage {
    pub subscriber: Addr<Subscriber>,
}

impl Message for LeaveRoomMessage {
    type Result = Result<(), Error>;
}

/// Actor that manages a `Room` with its `Publisher` and `Subscriber`s.
#[derive(Debug)]
pub struct Room {
    id: RoomId,
    name: String,
    description: Option<String>,

    rooms: Addr<Rooms>,

    publisher: Addr<Publisher>,
    subscribers: Mutex<HashSet<Addr<Subscriber>>>,
}

impl Room {
    /// Returns a next valid `RoomId`.
    fn next_room_id() -> RoomId {
        // XXX: We assume that no more than 2**64 rooms are ever created
        use std::sync::atomic::{self, AtomicU64};

        static ROOM_ID: AtomicU64 = AtomicU64::new(0);

        RoomId(ROOM_ID.fetch_add(1, atomic::Ordering::Relaxed))
    }

    /// Create a new `Room`.
    fn new(
        rooms: Addr<Rooms>,
        name: String,
        description: Option<String>,
        publisher: Addr<Publisher>,
    ) -> Self {
        Room {
            rooms,
            id: Self::next_room_id(),
            name,
            description,
            publisher,
            subscribers: Mutex::new(HashSet::new()),
        }
    }
}

impl Actor for Room {
    type Context = Context<Self>;
}

impl Handler<DeleteRoomMessage> for Room {
    type Result = Result<(), Error>;

    fn handle(&mut self, msg: DeleteRoomMessage, ctx: &mut Context<Self>) -> Self::Result {
        if self.publisher == msg.publisher {
            info!("Deleting room {:?}", self.id);

            {
                let subscribers = self.subscribers.lock().unwrap();
                for subscriber in &*subscribers {
                    subscriber.do_send(subscriber::RoomDeletedMessage);
                }
            }
            self.rooms.do_send(RoomDeletedMessage { room_id: self.id });

            ctx.stop();
            Ok(())
        } else {
            error!(
                "Tried to delete room {:?} from wrong publisher {:?}",
                self.id, msg.publisher
            );
            bail!("Deleting room {:?} not permitted", self.id)
        }
    }
}

impl Handler<JoinRoomMessage> for Room {
    type Result = Result<(), Error>;

    fn handle(&mut self, msg: JoinRoomMessage, _ctx: &mut Context<Self>) -> Self::Result {
        info!(
            "Joining room {:?} by subscriber {:?}",
            self.id, msg.subscriber
        );

        self.publisher.do_send(publisher::NewSubscriberMessage {
            subscriber: msg.subscriber.clone(),
        });

        let mut subscribers = self.subscribers.lock().unwrap();
        subscribers.insert(msg.subscriber);

        Ok(())
    }
}

impl Handler<LeaveRoomMessage> for Room {
    type Result = Result<(), Error>;

    fn handle(&mut self, msg: LeaveRoomMessage, _ctx: &mut Context<Self>) -> Self::Result {
        info!(
            "Leaving room {:?} by subscriber {:?}",
            self.id, msg.subscriber
        );

        self.publisher.do_send(publisher::LeavingSubscriberMessage {
            subscriber: msg.subscriber.clone(),
        });

        let mut subscribers = self.subscribers.lock().unwrap();
        subscribers.remove(&msg.subscriber);

        Ok(())
    }
}

impl Handler<RoomInformationMessage> for Room {
    type Result = MessageResult<RoomInformationMessage>;

    fn handle(&mut self, _msg: RoomInformationMessage, _ctx: &mut Context<Self>) -> Self::Result {
        debug!("Returning room information for room {:?}", self.id);

        MessageResult(RoomInformation {
            id: self.id,
            name: self.name.clone(),
            description: self.description.clone(),
        })
    }
}