diff --git a/crates/core/tedge_actors/src/builders.rs b/crates/core/tedge_actors/src/builders.rs index 43814fde19f..e6e072124aa 100644 --- a/crates/core/tedge_actors/src/builders.rs +++ b/crates/core/tedge_actors/src/builders.rs @@ -77,6 +77,7 @@ //! using an `impl From for SinkMessage`. This flexibility allows an actor to receive //! messages from several independent sources (see the [fan_in_message_type](crate::fan_in_message_type) macro). use crate::mpsc; +use crate::CloneSender; use crate::DynSender; use crate::LoggingReceiver; use crate::LoggingSender; @@ -84,7 +85,6 @@ use crate::MappingSender; use crate::Message; use crate::NullSender; use crate::RuntimeRequest; -use crate::Sender; use crate::SimpleMessageBox; use std::convert::Infallible; use std::fmt::Debug; diff --git a/crates/core/tedge_actors/src/channels.rs b/crates/core/tedge_actors/src/channels.rs index 66f4f8eca1b..c0ed7ed532f 100644 --- a/crates/core/tedge_actors/src/channels.rs +++ b/crates/core/tedge_actors/src/channels.rs @@ -7,21 +7,38 @@ use futures::SinkExt; /// A sender of messages of type `M` /// -/// Actors don't access directly the `mpsc::Sender` of their peers, +/// Actors don't access directly to the `mpsc::Sender` of their peers, /// but use intermediate senders that adapt the messages when sent. -pub type DynSender = Box>; +pub type DynSender = Box>; #[async_trait] pub trait Sender: 'static + Send + Sync { /// Send a message to the receiver behind this sender, /// returning an error if the receiver is no more expecting messages async fn send(&mut self, message: M) -> Result<(), ChannelError>; +} +pub trait CloneSender: Sender { /// Clone this sender in order to send messages to the same receiver from another actor fn sender_clone(&self) -> DynSender; + + /// Clone a cast of this sender into a `Box>` + /// + /// This is a workaround for https://github.com/rust-lang/rust/issues/65991 + fn sender(&self) -> Box>; } -impl Clone for DynSender { +impl> CloneSender for S { + fn sender_clone(&self) -> DynSender { + Box::new(self.clone()) + } + + fn sender(&self) -> Box> { + Box::new(self.clone()) + } +} + +impl Clone for DynSender { fn clone(&self) -> Self { self.sender_clone() } @@ -39,10 +56,6 @@ impl> Sender for mpsc::Sender { async fn send(&mut self, message: N) -> Result<(), ChannelError> { Ok(SinkExt::send(&mut self, message.into()).await?) } - - fn sender_clone(&self) -> DynSender { - Box::new(self.clone()) - } } /// Make a `DynSender` from a `DynSender` @@ -66,13 +79,10 @@ impl> Sender for DynSender { async fn send(&mut self, message: N) -> Result<(), ChannelError> { Ok(self.as_mut().send(message.into()).await?) } - - fn sender_clone(&self) -> DynSender { - Box::new(self.as_ref().sender_clone()) - } } /// A sender that discards messages instead of sending them +#[derive(Clone)] pub struct NullSender; #[async_trait] @@ -80,10 +90,6 @@ impl Sender for NullSender { async fn send(&mut self, _message: M) -> Result<(), ChannelError> { Ok(()) } - - fn sender_clone(&self) -> DynSender { - Box::new(NullSender) - } } impl From for DynSender { @@ -98,6 +104,15 @@ pub struct MappingSender { cast: std::sync::Arc, } +impl Clone for MappingSender { + fn clone(&self) -> Self { + MappingSender { + inner: self.inner.clone(), + cast: self.cast.clone(), + } + } +} + impl MappingSender { pub fn new(inner: DynSender, cast: F) -> Self { MappingSender { @@ -122,13 +137,6 @@ where } Ok(()) } - - fn sender_clone(&self) -> DynSender { - Box::new(MappingSender { - inner: self.inner.sender_clone(), - cast: self.cast.clone(), - }) - } } impl From> for DynSender diff --git a/crates/core/tedge_actors/src/message_boxes.rs b/crates/core/tedge_actors/src/message_boxes.rs index c604b451e5c..0a02223c3a3 100644 --- a/crates/core/tedge_actors/src/message_boxes.rs +++ b/crates/core/tedge_actors/src/message_boxes.rs @@ -91,6 +91,7 @@ //! use crate::channels::Sender; use crate::ChannelError; +use crate::CloneSender; use crate::DynSender; use crate::Message; use crate::RuntimeRequest; @@ -212,34 +213,27 @@ pub struct LoggingSender { sender: DynSender, } -impl LoggingSender { - pub fn new(name: String, sender: DynSender) -> Self { - Self { name, sender } - } -} - impl Clone for LoggingSender { fn clone(&self) -> Self { - Self { + LoggingSender { name: self.name.clone(), sender: self.sender.sender_clone(), } } } +impl LoggingSender { + pub fn new(name: String, sender: DynSender) -> Self { + Self { name, sender } + } +} + #[async_trait] -impl Sender for LoggingSender { +impl Sender for LoggingSender { async fn send(&mut self, message: Output) -> Result<(), ChannelError> { log_message_sent(&self.name, &message); self.sender.send(message).await } - - fn sender_clone(&self) -> DynSender { - Box::new(LoggingSender { - name: self.name.clone(), - sender: self.sender.clone(), - }) - } } pub fn log_message_sent(target: &str, message: I) { @@ -253,7 +247,7 @@ pub fn log_message_sent(target: &str, message: I) { /// - Log sent messages when pushed into the target box /// /// Such a box is connected to peer actors using a [SimpleMessageBoxBuilder](crate::SimpleMessageBoxBuilder). -pub struct SimpleMessageBox { +pub struct SimpleMessageBox { input_receiver: LoggingReceiver, output_sender: LoggingSender, } @@ -312,9 +306,15 @@ impl Sender for SimpleMessageBox Result<(), ChannelError> { self.output_sender.send(message).await } +} +impl CloneSender for SimpleMessageBox { fn sender_clone(&self) -> DynSender { - self.output_sender.sender_clone() + CloneSender::sender_clone(&self.output_sender) + } + + fn sender(&self) -> Box> { + CloneSender::sender(&self.output_sender) } } diff --git a/crates/core/tedge_actors/src/servers/builders.rs b/crates/core/tedge_actors/src/servers/builders.rs index 40107ff06da..5a95f9bdb52 100644 --- a/crates/core/tedge_actors/src/servers/builders.rs +++ b/crates/core/tedge_actors/src/servers/builders.rs @@ -2,6 +2,7 @@ use crate::mpsc; use crate::Actor; use crate::Builder; use crate::ClientId; +use crate::CloneSender; use crate::ConcurrentServerActor; use crate::ConcurrentServerMessageBox; use crate::DynSender; @@ -13,7 +14,6 @@ use crate::NoConfig; use crate::RuntimeError; use crate::RuntimeRequest; use crate::RuntimeRequestSink; -use crate::Sender; use crate::SenderVec; use crate::Server; use crate::ServerActor; diff --git a/crates/core/tedge_actors/src/servers/keyed_messages.rs b/crates/core/tedge_actors/src/servers/keyed_messages.rs index bdab5493908..c71c45f83ee 100644 --- a/crates/core/tedge_actors/src/servers/keyed_messages.rs +++ b/crates/core/tedge_actors/src/servers/keyed_messages.rs @@ -6,11 +6,20 @@ use crate::Sender; use async_trait::async_trait; /// A sender that adds a key to messages on the fly -pub struct KeyedSender { +pub struct KeyedSender { key: K, sender: mpsc::Sender<(K, M)>, } +impl Clone for KeyedSender { + fn clone(&self) -> Self { + KeyedSender { + key: self.key.clone(), + sender: self.sender.clone(), + } + } +} + impl KeyedSender { pub fn new_sender(key: K, sender: mpsc::Sender<(K, M)>) -> DynSender { Box::new(KeyedSender { key, sender }) @@ -22,20 +31,21 @@ impl Sender for KeyedSender { async fn send(&mut self, message: M) -> Result<(), ChannelError> { self.sender.send((self.key.clone(), message)).await } - - fn sender_clone(&self) -> DynSender { - Box::new(KeyedSender { - key: self.key.clone(), - sender: self.sender.clone(), - }) - } } /// A vector of senders addressed using a sender id attached to each message -pub struct SenderVec { +pub struct SenderVec { senders: Vec>, } +impl Clone for SenderVec { + fn clone(&self) -> Self { + SenderVec { + senders: self.senders.clone(), + } + } +} + impl SenderVec { pub fn new_sender(senders: Vec>) -> DynSender<(usize, M)> { Box::new(SenderVec { senders }) @@ -51,9 +61,4 @@ impl Sender<(usize, M)> for SenderVec { } Ok(()) } - - fn sender_clone(&self) -> DynSender<(usize, M)> { - let senders = self.senders.iter().map(|r| r.sender_clone()).collect(); - Box::new(SenderVec { senders }) - } } diff --git a/crates/core/tedge_actors/src/servers/message_boxes.rs b/crates/core/tedge_actors/src/servers/message_boxes.rs index bdaf1489bb6..db5aff08442 100644 --- a/crates/core/tedge_actors/src/servers/message_boxes.rs +++ b/crates/core/tedge_actors/src/servers/message_boxes.rs @@ -22,7 +22,7 @@ pub type ServerMessageBox = pub type ClientId = usize; /// A message box for services that handles requests concurrently -pub struct ConcurrentServerMessageBox { +pub struct ConcurrentServerMessageBox { /// Max concurrent requests max_concurrency: usize, @@ -119,7 +119,7 @@ impl ConcurrentServerMessageBox { +pub struct ClientMessageBox { messages: SimpleMessageBox, } diff --git a/crates/core/tedge_actors/src/test_helpers.rs b/crates/core/tedge_actors/src/test_helpers.rs index 5d2c289ea8c..9be0260775b 100644 --- a/crates/core/tedge_actors/src/test_helpers.rs +++ b/crates/core/tedge_actors/src/test_helpers.rs @@ -318,6 +318,15 @@ pub struct TimedMessageBox { inner: T, } +impl Clone for TimedMessageBox { + fn clone(&self) -> Self { + TimedMessageBox { + timeout: self.timeout, + inner: self.inner.clone(), + } + } +} + #[async_trait] impl MessageReceiver for TimedMessageBox where @@ -358,10 +367,6 @@ where async fn send(&mut self, message: M) -> Result<(), ChannelError> { self.inner.send(message).await } - - fn sender_clone(&self) -> DynSender { - self.inner.sender_clone() - } } impl AsRef for TimedMessageBox { diff --git a/crates/core/tedge_api/src/message_log.rs b/crates/core/tedge_api/src/message_log.rs index 7eeaf58d359..2d5fe9d1584 100644 --- a/crates/core/tedge_api/src/message_log.rs +++ b/crates/core/tedge_api/src/message_log.rs @@ -84,7 +84,7 @@ impl MessageLogWriter { let mut writer = BufWriter::new(file); if file_is_empty { - let version_info = json!({"version": LOG_FORMAT_VERSION}).to_string(); + let version_info = json!({ "version": LOG_FORMAT_VERSION }).to_string(); writeln!(writer, "{}", version_info)?; } diff --git a/crates/extensions/c8y_mapper_ext/src/converter.rs b/crates/extensions/c8y_mapper_ext/src/converter.rs index 7b006ad8ffb..673ee9ff9db 100644 --- a/crates/extensions/c8y_mapper_ext/src/converter.rs +++ b/crates/extensions/c8y_mapper_ext/src/converter.rs @@ -1706,6 +1706,7 @@ pub(crate) mod tests { use std::collections::HashMap; use std::str::FromStr; use tedge_actors::Builder; + use tedge_actors::CloneSender; use tedge_actors::LoggingSender; use tedge_actors::MessageReceiver; use tedge_actors::Sender; @@ -3213,7 +3214,7 @@ pub(crate) mod tests { for i in 0..3 { let measurement_message = Message::new( &Topic::new_unchecked("te/custom/child1///m/environment"), - json!({"temperature": i}).to_string(), + json!({ "temperature": i }).to_string(), ); let mapped_messages = converter.convert(&measurement_message).await; assert!( diff --git a/crates/extensions/tedge_timer_ext/src/builder.rs b/crates/extensions/tedge_timer_ext/src/builder.rs index 72b8c9bd21e..431a5fd3c93 100644 --- a/crates/extensions/tedge_timer_ext/src/builder.rs +++ b/crates/extensions/tedge_timer_ext/src/builder.rs @@ -72,6 +72,14 @@ struct TimeoutSender { inner: DynSender>, } +impl Clone for TimeoutSender { + fn clone(&self) -> Self { + TimeoutSender { + inner: self.inner.clone(), + } + } +} + #[async_trait] impl Sender> for TimeoutSender { async fn send(&mut self, message: Timeout) -> Result<(), ChannelError> { @@ -80,18 +88,13 @@ impl Sender> for TimeoutSender { } Ok(()) } - - fn sender_clone(&self) -> DynSender> { - Box::new(TimeoutSender { - inner: self.inner.sender_clone(), - }) - } } /// A Sender that translates timeout requests on the wire /// /// This sender receives `SetTimeout` requests from some actor, /// and translates then forwards these messages to the timer actor expecting`Timeout` +#[derive(Clone)] struct SetTimeoutSender { inner: DynSender>, } @@ -103,10 +106,4 @@ impl Sender> for SetTimeoutSender { let event: AnyPayload = Box::new(request.event); self.inner.send(SetTimeout { duration, event }).await } - - fn sender_clone(&self) -> DynSender> { - Box::new(SetTimeoutSender { - inner: self.inner.sender_clone(), - }) - } }