Skip to content

Commit

Permalink
Split trait Sender into Sender + CloneSender
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Jan 23, 2024
1 parent 1709df2 commit 8f76478
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 75 deletions.
2 changes: 1 addition & 1 deletion crates/core/tedge_actors/src/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@
//! using an `impl From<SourceMessage> for SinkMessage`. This flexibility allows an actor to receive
//! messages from several independent sources (see the [fan_in_message_type](crate::fan_in_message_type) macro).
use crate::mpsc;
use crate::CloneSender;
use crate::DynSender;
use crate::LoggingReceiver;
use crate::LoggingSender;
use crate::MappingSender;
use crate::Message;
use crate::NullSender;
use crate::RuntimeRequest;
use crate::Sender;
use crate::SimpleMessageBox;
use std::convert::Infallible;
use std::fmt::Debug;
Expand Down
52 changes: 30 additions & 22 deletions crates/core/tedge_actors/src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,38 @@ use futures::SinkExt;

/// A sender of messages of type `M`
///
/// Actors don't access directly the `mpsc::Sender` of their peers,
/// Actors don't access directly to the `mpsc::Sender` of their peers,
/// but use intermediate senders that adapt the messages when sent.
pub type DynSender<M> = Box<dyn Sender<M>>;
pub type DynSender<M> = Box<dyn CloneSender<M>>;

#[async_trait]
pub trait Sender<M>: 'static + Send + Sync {
/// Send a message to the receiver behind this sender,
/// returning an error if the receiver is no more expecting messages
async fn send(&mut self, message: M) -> Result<(), ChannelError>;
}

pub trait CloneSender<M>: Sender<M> {
/// Clone this sender in order to send messages to the same receiver from another actor
fn sender_clone(&self) -> DynSender<M>;

/// Clone a cast of this sender into a `Box<dyn Sender<M>>`
///
/// This is a workaround for https://github.com/rust-lang/rust/issues/65991
fn sender(&self) -> Box<dyn Sender<M>>;
}

impl<M: Message> Clone for DynSender<M> {
impl<M, S: Clone + Sender<M>> CloneSender<M> for S {
fn sender_clone(&self) -> DynSender<M> {
Box::new(self.clone())
}

fn sender(&self) -> Box<dyn Sender<M>> {
Box::new(self.clone())
}
}

impl<M: 'static> Clone for DynSender<M> {
fn clone(&self) -> Self {
self.sender_clone()
}
Expand All @@ -39,10 +56,6 @@ impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(SinkExt::send(&mut self, message.into()).await?)
}

fn sender_clone(&self) -> DynSender<N> {
Box::new(self.clone())
}
}

/// Make a `DynSender<N>` from a `DynSender<M>`
Expand All @@ -66,24 +79,17 @@ impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(self.as_mut().send(message.into()).await?)
}

fn sender_clone(&self) -> DynSender<N> {
Box::new(self.as_ref().sender_clone())
}
}

/// A sender that discards messages instead of sending them
#[derive(Clone)]
pub struct NullSender;

#[async_trait]
impl<M: Message> Sender<M> for NullSender {
async fn send(&mut self, _message: M) -> Result<(), ChannelError> {
Ok(())
}

fn sender_clone(&self) -> DynSender<M> {
Box::new(NullSender)
}
}

impl<M: Message> From<NullSender> for DynSender<M> {
Expand All @@ -98,6 +104,15 @@ pub struct MappingSender<F, M> {
cast: std::sync::Arc<F>,
}

impl<F, M: 'static> Clone for MappingSender<F, M> {
fn clone(&self) -> Self {
MappingSender {
inner: self.inner.clone(),
cast: self.cast.clone(),
}
}
}

impl<F, M> MappingSender<F, M> {
pub fn new(inner: DynSender<M>, cast: F) -> Self {
MappingSender {
Expand All @@ -122,13 +137,6 @@ where
}
Ok(())
}

fn sender_clone(&self) -> DynSender<M> {
Box::new(MappingSender {
inner: self.inner.sender_clone(),
cast: self.cast.clone(),
})
}
}

impl<M, N, NS, F> From<MappingSender<F, N>> for DynSender<M>
Expand Down
34 changes: 17 additions & 17 deletions crates/core/tedge_actors/src/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
//!
use crate::channels::Sender;
use crate::ChannelError;
use crate::CloneSender;
use crate::DynSender;
use crate::Message;
use crate::RuntimeRequest;
Expand Down Expand Up @@ -212,34 +213,27 @@ pub struct LoggingSender<Output> {
sender: DynSender<Output>,
}

impl<Output> LoggingSender<Output> {
pub fn new(name: String, sender: DynSender<Output>) -> Self {
Self { name, sender }
}
}

impl<Output: 'static> Clone for LoggingSender<Output> {
fn clone(&self) -> Self {
Self {
LoggingSender {
name: self.name.clone(),
sender: self.sender.sender_clone(),
}
}
}

impl<Output> LoggingSender<Output> {
pub fn new(name: String, sender: DynSender<Output>) -> Self {
Self { name, sender }
}
}

#[async_trait]
impl<Output: Debug + Send + Sync + 'static> Sender<Output> for LoggingSender<Output> {
impl<Output: Message + Debug> Sender<Output> for LoggingSender<Output> {
async fn send(&mut self, message: Output) -> Result<(), ChannelError> {
log_message_sent(&self.name, &message);
self.sender.send(message).await
}

fn sender_clone(&self) -> DynSender<Output> {
Box::new(LoggingSender {
name: self.name.clone(),
sender: self.sender.clone(),
})
}
}

pub fn log_message_sent<I: Debug>(target: &str, message: I) {
Expand All @@ -253,7 +247,7 @@ pub fn log_message_sent<I: Debug>(target: &str, message: I) {
/// - Log sent messages when pushed into the target box
///
/// Such a box is connected to peer actors using a [SimpleMessageBoxBuilder](crate::SimpleMessageBoxBuilder).
pub struct SimpleMessageBox<Input: Debug, Output> {
pub struct SimpleMessageBox<Input: Debug, Output: Debug> {
input_receiver: LoggingReceiver<Input>,
output_sender: LoggingSender<Output>,
}
Expand Down Expand Up @@ -312,9 +306,15 @@ impl<Input: Message, Output: Message> Sender<Output> for SimpleMessageBox<Input,
async fn send(&mut self, message: Output) -> Result<(), ChannelError> {
self.output_sender.send(message).await
}
}

impl<Input: Message, Output: Message> CloneSender<Output> for SimpleMessageBox<Input, Output> {
fn sender_clone(&self) -> DynSender<Output> {
self.output_sender.sender_clone()
CloneSender::sender_clone(&self.output_sender)
}

fn sender(&self) -> Box<dyn Sender<Output>> {
CloneSender::sender(&self.output_sender)
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_actors/src/servers/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::mpsc;
use crate::Actor;
use crate::Builder;
use crate::ClientId;
use crate::CloneSender;
use crate::ConcurrentServerActor;
use crate::ConcurrentServerMessageBox;
use crate::DynSender;
Expand All @@ -13,7 +14,6 @@ use crate::NoConfig;
use crate::RuntimeError;
use crate::RuntimeRequest;
use crate::RuntimeRequestSink;
use crate::Sender;
use crate::SenderVec;
use crate::Server;
use crate::ServerActor;
Expand Down
33 changes: 19 additions & 14 deletions crates/core/tedge_actors/src/servers/keyed_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,20 @@ use crate::Sender;
use async_trait::async_trait;

/// A sender that adds a key to messages on the fly
pub struct KeyedSender<K: Message + Clone, M: Message> {
pub struct KeyedSender<K, M> {
key: K,
sender: mpsc::Sender<(K, M)>,
}

impl<K: Clone, M> Clone for KeyedSender<K, M> {
fn clone(&self) -> Self {
KeyedSender {
key: self.key.clone(),
sender: self.sender.clone(),
}
}
}

impl<K: Message + Clone, M: Message> KeyedSender<K, M> {
pub fn new_sender(key: K, sender: mpsc::Sender<(K, M)>) -> DynSender<M> {
Box::new(KeyedSender { key, sender })
Expand All @@ -22,20 +31,21 @@ impl<K: Message + Clone, M: Message> Sender<M> for KeyedSender<K, M> {
async fn send(&mut self, message: M) -> Result<(), ChannelError> {
self.sender.send((self.key.clone(), message)).await
}

fn sender_clone(&self) -> DynSender<M> {
Box::new(KeyedSender {
key: self.key.clone(),
sender: self.sender.clone(),
})
}
}

/// A vector of senders addressed using a sender id attached to each message
pub struct SenderVec<M: Message> {
pub struct SenderVec<M> {
senders: Vec<DynSender<M>>,
}

impl<M: 'static> Clone for SenderVec<M> {
fn clone(&self) -> Self {
SenderVec {
senders: self.senders.clone(),
}
}
}

impl<M: Message> SenderVec<M> {
pub fn new_sender(senders: Vec<DynSender<M>>) -> DynSender<(usize, M)> {
Box::new(SenderVec { senders })
Expand All @@ -51,9 +61,4 @@ impl<M: Message> Sender<(usize, M)> for SenderVec<M> {
}
Ok(())
}

fn sender_clone(&self) -> DynSender<(usize, M)> {
let senders = self.senders.iter().map(|r| r.sender_clone()).collect();
Box::new(SenderVec { senders })
}
}
4 changes: 2 additions & 2 deletions crates/core/tedge_actors/src/servers/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub type ServerMessageBox<Request, Response> =
pub type ClientId = usize;

/// A message box for services that handles requests concurrently
pub struct ConcurrentServerMessageBox<Request: Debug, Response> {
pub struct ConcurrentServerMessageBox<Request: Debug, Response: Debug> {
/// Max concurrent requests
max_concurrency: usize,

Expand Down Expand Up @@ -119,7 +119,7 @@ impl<Request: Message, Response: Message> ConcurrentServerMessageBox<Request, Re
/// and synchronously wait for its response using the `await_response` function.
///
/// Note that this message box sends requests and receive responses.
pub struct ClientMessageBox<Request, Response: Debug> {
pub struct ClientMessageBox<Request: Message, Response: Message + Debug> {
messages: SimpleMessageBox<Response, Request>,
}

Expand Down
13 changes: 9 additions & 4 deletions crates/core/tedge_actors/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,15 @@ pub struct TimedMessageBox<T> {
inner: T,
}

impl<T: Clone> Clone for TimedMessageBox<T> {
fn clone(&self) -> Self {
TimedMessageBox {
timeout: self.timeout,
inner: self.inner.clone(),
}
}
}

#[async_trait]
impl<T, M> MessageReceiver<M> for TimedMessageBox<T>
where
Expand Down Expand Up @@ -358,10 +367,6 @@ where
async fn send(&mut self, message: M) -> Result<(), ChannelError> {
self.inner.send(message).await
}

fn sender_clone(&self) -> DynSender<M> {
self.inner.sender_clone()
}
}

impl<T> AsRef<T> for TimedMessageBox<T> {
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_api/src/message_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ impl MessageLogWriter {
let mut writer = BufWriter::new(file);

if file_is_empty {
let version_info = json!({"version": LOG_FORMAT_VERSION}).to_string();
let version_info = json!({ "version": LOG_FORMAT_VERSION }).to_string();
writeln!(writer, "{}", version_info)?;
}

Expand Down
3 changes: 2 additions & 1 deletion crates/extensions/c8y_mapper_ext/src/converter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1706,6 +1706,7 @@ pub(crate) mod tests {
use std::collections::HashMap;
use std::str::FromStr;
use tedge_actors::Builder;
use tedge_actors::CloneSender;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
use tedge_actors::Sender;
Expand Down Expand Up @@ -3213,7 +3214,7 @@ pub(crate) mod tests {
for i in 0..3 {
let measurement_message = Message::new(
&Topic::new_unchecked("te/custom/child1///m/environment"),
json!({"temperature": i}).to_string(),
json!({ "temperature": i }).to_string(),
);
let mapped_messages = converter.convert(&measurement_message).await;
assert!(
Expand Down
21 changes: 9 additions & 12 deletions crates/extensions/tedge_timer_ext/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ struct TimeoutSender<T: Message> {
inner: DynSender<Timeout<T>>,
}

impl<T: Message> Clone for TimeoutSender<T> {
fn clone(&self) -> Self {
TimeoutSender {
inner: self.inner.clone(),
}
}
}

#[async_trait]
impl<T: Message> Sender<Timeout<AnyPayload>> for TimeoutSender<T> {
async fn send(&mut self, message: Timeout<AnyPayload>) -> Result<(), ChannelError> {
Expand All @@ -80,18 +88,13 @@ impl<T: Message> Sender<Timeout<AnyPayload>> for TimeoutSender<T> {
}
Ok(())
}

fn sender_clone(&self) -> DynSender<Timeout<AnyPayload>> {
Box::new(TimeoutSender {
inner: self.inner.sender_clone(),
})
}
}

/// A Sender that translates timeout requests on the wire
///
/// This sender receives `SetTimeout<T>` requests from some actor,
/// and translates then forwards these messages to the timer actor expecting`Timeout<AnyPayload>`
#[derive(Clone)]
struct SetTimeoutSender {
inner: DynSender<SetTimeout<AnyPayload>>,
}
Expand All @@ -103,10 +106,4 @@ impl<T: Message> Sender<SetTimeout<T>> for SetTimeoutSender {
let event: AnyPayload = Box::new(request.event);
self.inner.send(SetTimeout { duration, event }).await
}

fn sender_clone(&self) -> DynSender<SetTimeout<T>> {
Box::new(SetTimeoutSender {
inner: self.inner.sender_clone(),
})
}
}

0 comments on commit 8f76478

Please sign in to comment.