Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Simplify actor request handlers #2600

Merged
Merged
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions crates/core/tedge_actors/src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub mod tests {
async fn running_an_actor_without_a_runtime() {
let mut box_builder = SimpleMessageBoxBuilder::new("test", 16);
let mut client_message_box = box_builder.new_client_box(NoConfig);
let mut runtime_box = box_builder.get_signal_sender();
let actor_message_box = box_builder.build();
let actor = Echo {
messages: actor_message_box,
Expand All @@ -93,7 +94,10 @@ pub mod tests {
assert_eq!(client_message_box.recv().await, Some("World".to_string()));

// When there is no more input message senders
client_message_box.close_sender();
runtime_box
.send(RuntimeRequest::Shutdown)
.await
.expect("fail to shutdown");

// The actor stops
actor_task
Expand All @@ -116,7 +120,7 @@ pub mod tests {
let actor_task = spawn(async move { actor.run().await });

spawn(async move {
let mut sender: DynSender<&str> = adapt(&input_sender);
let mut sender: DynSender<&str> = input_sender.sender_clone();
sender.send("Do this").await.expect("sent");
sender.send("Do nothing").await.expect("sent");
sender.send("Do that and this").await.expect("sent");
Expand Down Expand Up @@ -186,8 +190,8 @@ pub mod tests {
impl SpecificMessageBox {
fn new_box(capacity: usize, output: DynSender<DoMsg>) -> (DynSender<String>, Self) {
let (sender, input) = mpsc::channel(capacity);
let peer_1 = adapt(&output);
let peer_2 = adapt(&output);
let peer_1 = output.sender_clone();
let peer_2 = output.sender_clone();
let message_box = SpecificMessageBox {
input,
peer_1,
Expand Down
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@
//! using an `impl From<SourceMessage> for SinkMessage`. This flexibility allows an actor to receive
//! messages from several independent sources (see the [fan_in_message_type](crate::fan_in_message_type) macro).
use crate::mpsc;
use crate::CloneSender;
use crate::DynSender;
use crate::LoggingReceiver;
use crate::LoggingSender;
use crate::MappingSender;
use crate::Message;
use crate::NullSender;
use crate::RuntimeRequest;
use crate::Sender;
use crate::SimpleMessageBox;
use std::convert::Infallible;
use std::fmt::Debug;
Expand Down Expand Up @@ -130,7 +130,7 @@ pub trait MessageSink<M: Message, Config> {
N: Message,
M: From<N>,
{
source.register_peer(self.get_config(), crate::adapt(&self.get_sender()))
source.register_peer(self.get_config(), self.get_sender().sender_clone())
}

/// Add a source of messages to the actor under construction, the messages being translated on the fly.
Expand Down
153 changes: 64 additions & 89 deletions crates/core/tedge_actors/src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,128 +3,118 @@ use crate::ChannelError;
use crate::Message;
use async_trait::async_trait;
use futures::channel::mpsc;
use futures::channel::oneshot;
use futures::SinkExt;

/// A sender of messages of type `M`
///
/// Actors don't access directly the `mpsc::Sender` of their peers,
/// Actors don't get direct access to the `mpsc::Sender` of their peers,
/// but use intermediate senders that adapt the messages when sent.
pub type DynSender<M> = Box<dyn Sender<M>>;
pub type DynSender<M> = Box<dyn CloneSender<M>>;

#[async_trait]
pub trait Sender<M>: 'static + Send + Sync {
/// Send a message to the receiver behind this sender,
/// returning an error if the receiver is no more expecting messages
async fn send(&mut self, message: M) -> Result<(), ChannelError>;
}

pub trait CloneSender<M>: Sender<M> {
/// Clone this sender in order to send messages to the same receiver from another actor
fn sender_clone(&self) -> DynSender<M>;

/// Closes this channel from the sender side, preventing any new messages.
fn close_sender(&mut self);
/// Clone a cast of this sender into a `Box<dyn Sender<M>>`
///
/// This is a workaround for https://github.com/rust-lang/rust/issues/65991
fn sender(&self) -> Box<dyn Sender<M>>;
}

impl<M: Message> Clone for DynSender<M> {
fn clone(&self) -> Self {
self.sender_clone()
impl<M, S: Clone + Sender<M>> CloneSender<M> for S {
fn sender_clone(&self) -> DynSender<M> {
Box::new(self.clone())
}

fn sender(&self) -> Box<dyn Sender<M>> {
Box::new(self.clone())
}
}

/// An `mpsc::Sender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
impl<M: Message, N: Message + Into<M>> From<mpsc::Sender<M>> for DynSender<N> {
fn from(sender: mpsc::Sender<M>) -> Self {
impl<M, S: Clone + Sender<M>> From<S> for DynSender<M> {
fn from(sender: S) -> Self {
Box::new(sender)
}
}

/// A `DynSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(SinkExt::send(&mut self, message.into()).await?)
Ok(self.as_mut().send(message.into()).await?)
}
}

#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for Box<dyn Sender<M>> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(self.as_mut().send(message.into()).await?)
}
}

#[async_trait]
impl<M: Message, N: Message + Into<M>> CloneSender<N> for DynSender<M> {
fn sender_clone(&self) -> DynSender<N> {
Box::new(self.clone())
Box::new(self.as_ref().sender_clone())
}

fn close_sender(&mut self) {
self.close_channel();
fn sender(&self) -> Box<dyn Sender<N>> {
Box::new(self.as_ref().sender())
}
}

/// An `mpsc::UnboundedSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
impl<M: Message, N: Message + Into<M>> From<mpsc::UnboundedSender<M>> for DynSender<N> {
fn from(sender: mpsc::UnboundedSender<M>) -> Self {
Box::new(sender)
/// An `mpsc::Sender<M>` is a `DynSender<M>`
#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(SinkExt::send(&mut self, message.into()).await?)
}
}

/// An `mpsc::UnboundedSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::UnboundedSender<M> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(SinkExt::send(&mut self, message.into()).await?)
}

fn sender_clone(&self) -> DynSender<N> {
Box::new(self.clone())
}

fn close_sender(&mut self) {
self.close_channel();
}
}

/// Make a `DynSender<N>` from a `DynSender<M>`
///
/// This is a workaround to the fact the compiler rejects a From implementation:
/// A `oneshot::Sender<M>` is a `Sender<N>` provided `N` implements `Into<M>`
///
/// ```shell
/// There is one caveat. The `oneshot::Sender::send()` method consumes the sender,
/// hence the one shot sender is wrapped inside an `Option`.
///
/// impl<M: Message, N: Message + Into<M>> From<DynSender<M>> for DynSender<N> {
/// | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/// |
/// = note: conflicting implementation in crate `core`:
/// - impl<T> From<T> for T;
/// ```
pub fn adapt<M: Message, N: Message + Into<M>>(sender: &DynSender<M>) -> DynSender<N> {
Box::new(sender.clone())
}

/// Such a [Sender] can only be used once:
/// - it cannot be cloned
/// - any message sent after a first one will be silently ignored
/// - a message sent while the receiver has been drop will also be silently ignored
#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
impl<M: Message, N: Message + Into<M>> Sender<N> for Option<oneshot::Sender<M>> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(self.as_mut().send(message.into()).await?)
}

fn sender_clone(&self) -> DynSender<N> {
Box::new(self.as_ref().sender_clone())
}

fn close_sender(&mut self) {
self.as_mut().close_sender()
if let Some(sender) = self.take() {
let _ = sender.send(message.into());
}
Ok(())
}
}

/// A sender that discards messages instead of sending them
#[derive(Clone)]
pub struct NullSender;

#[async_trait]
impl<M: Message> Sender<M> for NullSender {
async fn send(&mut self, _message: M) -> Result<(), ChannelError> {
Ok(())
}

fn sender_clone(&self) -> DynSender<M> {
Box::new(NullSender)
}

fn close_sender(&mut self) {}
}

impl<M: Message> From<NullSender> for DynSender<M> {
fn from(sender: NullSender) -> Self {
Box::new(sender)
}
}

/// A sender that transforms the messages on the fly
Expand All @@ -133,6 +123,15 @@ pub struct MappingSender<F, M> {
cast: std::sync::Arc<F>,
}

impl<F, M: 'static> Clone for MappingSender<F, M> {
fn clone(&self) -> Self {
MappingSender {
inner: self.inner.sender_clone(),
cast: self.cast.clone(),
}
}
}

impl<F, M> MappingSender<F, M> {
pub fn new(inner: DynSender<M>, cast: F) -> Self {
MappingSender {
Expand All @@ -157,30 +156,6 @@ where
}
Ok(())
}

fn sender_clone(&self) -> DynSender<M> {
Box::new(MappingSender {
inner: self.inner.sender_clone(),
cast: self.cast.clone(),
})
}

fn close_sender(&mut self) {
self.inner.as_mut().close_sender()
}
}

impl<M, N, NS, F> From<MappingSender<F, N>> for DynSender<M>
where
M: Message,
N: Message,
NS: Iterator<Item = N> + Send,
F: Fn(M) -> NS,
F: 'static + Sync + Send,
{
fn from(value: MappingSender<F, N>) -> Self {
Box::new(value)
}
}

#[cfg(test)]
Expand Down Expand Up @@ -233,8 +208,8 @@ mod tests {
impl From<DynSender<Msg>> for Peers {
fn from(recipient: DynSender<Msg>) -> Self {
Peers {
peer_1: adapt(&recipient),
peer_2: adapt(&recipient),
peer_1: recipient.sender_clone(),
peer_2: recipient.sender_clone(),
}
}
}
Expand Down
Loading