Skip to content

Commit

Permalink
Simplify the logic to adapt dyn senders
Browse files Browse the repository at this point in the history
- The `Clone` trait is no more implemented by `DynSender`
  This was the main reason preventing the implementation
  of useful `Into` convertions between misc `DynSender`.
- The `adapt()` function as been deprecated
  as this was a workaround the lack of `Into` convertions between misc `DynSender`.
- The `sender_clone()` method is to be used both to `clone` and adapt a
  sender.

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Feb 15, 2024
1 parent 4f22ca2 commit b1d30f9
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 88 deletions.
6 changes: 3 additions & 3 deletions crates/core/tedge_actors/src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ pub mod tests {
let actor_task = spawn(async move { actor.run().await });

spawn(async move {
let mut sender: DynSender<&str> = adapt(&input_sender);
let mut sender: DynSender<&str> = input_sender.sender_clone();
sender.send("Do this").await.expect("sent");
sender.send("Do nothing").await.expect("sent");
sender.send("Do that and this").await.expect("sent");
Expand Down Expand Up @@ -190,8 +190,8 @@ pub mod tests {
impl SpecificMessageBox {
fn new_box(capacity: usize, output: DynSender<DoMsg>) -> (DynSender<String>, Self) {
let (sender, input) = mpsc::channel(capacity);
let peer_1 = adapt(&output);
let peer_2 = adapt(&output);
let peer_1 = output.sender_clone();
let peer_2 = output.sender_clone();
let message_box = SpecificMessageBox {
input,
peer_1,
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_actors/src/builders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub trait MessageSink<M: Message, Config> {
N: Message,
M: From<N>,
{
source.register_peer(self.get_config(), crate::adapt(&self.get_sender()))
source.register_peer(self.get_config(), self.get_sender().sender_clone())
}

/// Add a source of messages to the actor under construction, the messages being translated on the fly.
Expand Down
90 changes: 31 additions & 59 deletions crates/core/tedge_actors/src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,41 +39,55 @@ impl<M, S: Clone + Sender<M>> CloneSender<M> for S {
}
}

impl<M: 'static> Clone for DynSender<M> {
fn clone(&self) -> Self {
self.sender_clone()
impl<M, S: Clone + Sender<M>> From<S> for DynSender<M> {
fn from(sender: S) -> Self {
Box::new(sender)
}
}

/// An `mpsc::Sender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
impl<M: Message, N: Message + Into<M>> From<mpsc::Sender<M>> for DynSender<N> {
fn from(sender: mpsc::Sender<M>) -> Self {
Box::new(sender)
/// A `DynSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(self.as_mut().send(message.into()).await?)
}
}

#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
impl<M: Message, N: Message + Into<M>> Sender<N> for Box<dyn Sender<M>> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(SinkExt::send(&mut self, message.into()).await?)
Ok(self.as_mut().send(message.into()).await?)
}
}

/// An `mpsc::UnboundedSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
impl<M: Message, N: Message + Into<M>> From<mpsc::UnboundedSender<M>> for DynSender<N> {
fn from(sender: mpsc::UnboundedSender<M>) -> Self {
Box::new(sender)
#[async_trait]
impl<M: Message, N: Message + Into<M>> CloneSender<N> for DynSender<M> {
fn sender_clone(&self) -> DynSender<N> {
Box::new(self.as_ref().sender_clone())
}

fn sender(&self) -> Box<dyn Sender<N>> {
Box::new(self.as_ref().sender())
}
}

/// An `mpsc::Sender<M>` is a `DynSender<M>`
#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(SinkExt::send(&mut self, message.into()).await?)
}
}

/// An `mpsc::UnboundedSender<M>` is a `DynSender<N>` provided `N` implements `Into<M>`
#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::UnboundedSender<M> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(SinkExt::send(&mut self, message.into()).await?)
}
}

/// An `oneshot::Sender<M>` is a `Sender<N>` provided `N` implements `Into<M>`
/// A `oneshot::Sender<M>` is a `Sender<N>` provided `N` implements `Into<M>`
///
/// There is one caveat. The `oneshot::Sender::send()` method consumes the sender,
/// hence the one shot sender is wrapped inside an `Option`.
Expand All @@ -92,29 +106,6 @@ impl<M: Message, N: Message + Into<M>> Sender<N> for Option<oneshot::Sender<M>>
}
}

/// Make a `DynSender<N>` from a `DynSender<M>`
///
/// This is a workaround to the fact the compiler rejects a From implementation:
///
/// ```shell
///
/// impl<M: Message, N: Message + Into<M>> From<DynSender<M>> for DynSender<N> {
/// | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
/// |
/// = note: conflicting implementation in crate `core`:
/// - impl<T> From<T> for T;
/// ```
pub fn adapt<M: Message, N: Message + Into<M>>(sender: &DynSender<M>) -> DynSender<N> {
Box::new(sender.clone())
}

#[async_trait]
impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
async fn send(&mut self, message: N) -> Result<(), ChannelError> {
Ok(self.as_mut().send(message.into()).await?)
}
}

/// A sender that discards messages instead of sending them
#[derive(Clone)]
pub struct NullSender;
Expand All @@ -126,12 +117,6 @@ impl<M: Message> Sender<M> for NullSender {
}
}

impl<M: Message> From<NullSender> for DynSender<M> {
fn from(sender: NullSender) -> Self {
Box::new(sender)
}
}

/// A sender that transforms the messages on the fly
pub struct MappingSender<F, M> {
inner: DynSender<M>,
Expand All @@ -141,7 +126,7 @@ pub struct MappingSender<F, M> {
impl<F, M: 'static> Clone for MappingSender<F, M> {
fn clone(&self) -> Self {
MappingSender {
inner: self.inner.clone(),
inner: self.inner.sender_clone(),
cast: self.cast.clone(),
}
}
Expand Down Expand Up @@ -173,19 +158,6 @@ where
}
}

impl<M, N, NS, F> From<MappingSender<F, N>> for DynSender<M>
where
M: Message,
N: Message,
NS: Iterator<Item = N> + Send,
F: Fn(M) -> NS,
F: 'static + Sync + Send,
{
fn from(value: MappingSender<F, N>) -> Self {
Box::new(value)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -236,8 +208,8 @@ mod tests {
impl From<DynSender<Msg>> for Peers {
fn from(recipient: DynSender<Msg>) -> Self {
Peers {
peer_1: adapt(&recipient),
peer_2: adapt(&recipient),
peer_1: recipient.sender_clone(),
peer_2: recipient.sender_clone(),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/core/tedge_actors/src/servers/keyed_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct SenderVec<M> {
impl<M: 'static> Clone for SenderVec<M> {
fn clone(&self) -> Self {
SenderVec {
senders: self.senders.clone(),
senders: self.senders.iter().map(|s| s.sender_clone()).collect(),
}
}
}
Expand Down
17 changes: 9 additions & 8 deletions crates/core/tedge_actors/src/servers/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,20 @@ impl<Request: Message, Response: Message> ClientMessageBox<Request, Response> {

/// A [Sender] used by a client to send requests to a server,
/// redirecting the responses to another recipient.
#[derive(Clone)]
pub struct RequestSender<Request: 'static, Response: 'static> {
sender: DynSender<RequestEnvelope<Request, Response>>,
reply_to: DynSender<Response>,
}

impl<Request, Response> Clone for RequestSender<Request, Response> {
fn clone(&self) -> Self {
RequestSender {
sender: self.sender.sender_clone(),
reply_to: self.reply_to.sender_clone(),
}
}
}

#[async_trait]
impl<Request: Message, Response: Message> Sender<Request> for RequestSender<Request, Response> {
async fn send(&mut self, request: Request) -> Result<(), ChannelError> {
Expand All @@ -65,13 +73,6 @@ impl<Request: Message, Response: Message> Sender<Request> for RequestSender<Requ
}
}

/* Adding this prevents to derive Clone for RequestSender! Why?
impl<Request: Message, Response: Message> From<RequestSender<Request,Response>> for DynSender<Request> {
fn from(sender: RequestSender<Request,Response>) -> Self {
Box::new(sender)
}
}*/

/// An actor that wraps a request-response server
///
/// Requests are processed in turn, leading either to a response or an error.
Expand Down
6 changes: 3 additions & 3 deletions crates/core/tedge_agent/src/operation_file_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ use camino::Utf8PathBuf;
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use tedge_actors::adapt;
use tedge_actors::fan_in_message_type;
use tedge_actors::Actor;
use tedge_actors::Builder;
use tedge_actors::CloneSender;
use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::LoggingReceiver;
Expand Down Expand Up @@ -295,11 +295,11 @@ impl FileCacheActorBuilder {
let message_box = SimpleMessageBoxBuilder::new("RestartManager", 10);

let download_sender =
downloader_actor.connect_consumer(NoConfig, adapt(&message_box.get_sender()));
downloader_actor.connect_consumer(NoConfig, message_box.get_sender().sender_clone());

let mqtt_sender = mqtt_actor.connect_consumer(
Self::subscriptions(&mqtt_schema),
adapt(&message_box.get_sender()),
message_box.get_sender().sender_clone(),
);

Self {
Expand Down
21 changes: 14 additions & 7 deletions crates/extensions/c8y_mapper_ext/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use c8y_http_proxy::messages::C8YRestRequest;
use c8y_http_proxy::messages::C8YRestResult;
use std::path::PathBuf;
use std::time::Duration;
use tedge_actors::adapt;
use tedge_actors::fan_in_message_type;
use tedge_actors::Actor;
use tedge_actors::Builder;
use tedge_actors::CloneSender;
use tedge_actors::DynSender;
use tedge_actors::LoggingSender;
use tedge_actors::MessageReceiver;
Expand Down Expand Up @@ -306,14 +306,21 @@ impl C8yMapperBuilder {

let box_builder = SimpleMessageBoxBuilder::new("CumulocityMapper", 16);

let mqtt_publisher =
mqtt.connect_consumer(config.topics.clone(), adapt(&box_builder.get_sender()));
let mqtt_publisher = mqtt.connect_consumer(
config.topics.clone(),
box_builder.get_sender().sender_clone(),
);
let http_proxy = C8YHttpProxy::new("C8yMapper => C8YHttpProxy", http);
let timer_sender = timer.connect_consumer(NoConfig, adapt(&box_builder.get_sender()));
let upload_sender = uploader.connect_consumer(NoConfig, adapt(&box_builder.get_sender()));
let timer_sender =
timer.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());
let upload_sender =
uploader.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());
let download_sender =
downloader.connect_consumer(NoConfig, adapt(&box_builder.get_sender()));
fs_watcher.register_peer(config.ops_dir.clone(), adapt(&box_builder.get_sender()));
downloader.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());
fs_watcher.register_peer(
config.ops_dir.clone(),
box_builder.get_sender().sender_clone(),
);
let auth_proxy = ProxyUrlGenerator::new(
config.auth_proxy_addr.clone(),
config.auth_proxy_port,
Expand Down
8 changes: 4 additions & 4 deletions crates/extensions/tedge_log_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ pub use actor::*;
pub use config::*;
use log_manager::LogPluginConfig;
use std::path::PathBuf;
use tedge_actors::adapt;
use tedge_actors::Builder;
use tedge_actors::CloneSender;
use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::LoggingSender;
Expand Down Expand Up @@ -53,15 +53,15 @@ impl LogManagerBuilder {
let box_builder = SimpleMessageBoxBuilder::new("Log Manager", 16);
let mqtt_publisher = mqtt.connect_consumer(
Self::subscriptions(&config),
adapt(&box_builder.get_sender()),
box_builder.get_sender().sender_clone(),
);
fs_notify.register_peer(
LogManagerBuilder::watched_directory(&config),
adapt(&box_builder.get_sender()),
box_builder.get_sender().sender_clone(),
);

let upload_sender =
uploader_actor.connect_consumer(NoConfig, adapt(&box_builder.get_sender()));
uploader_actor.connect_consumer(NoConfig, box_builder.get_sender().sender_clone());

Ok(Self {
config,
Expand Down
12 changes: 10 additions & 2 deletions crates/extensions/tedge_timer_ext/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use async_trait::async_trait;
use std::convert::Infallible;
use tedge_actors::Builder;
use tedge_actors::ChannelError;
use tedge_actors::CloneSender;
use tedge_actors::DynSender;
use tedge_actors::Message;
use tedge_actors::NoConfig;
Expand Down Expand Up @@ -75,7 +76,7 @@ struct TimeoutSender<T: Message> {
impl<T: Message> Clone for TimeoutSender<T> {
fn clone(&self) -> Self {
TimeoutSender {
inner: self.inner.clone(),
inner: self.inner.sender_clone(),
}
}
}
Expand All @@ -94,11 +95,18 @@ impl<T: Message> Sender<Timeout<AnyPayload>> for TimeoutSender<T> {
///
/// This sender receives `SetTimeout<T>` requests from some actor,
/// and translates then forwards these messages to the timer actor expecting`Timeout<AnyPayload>`
#[derive(Clone)]
struct SetTimeoutSender {
inner: DynSender<SetTimeout<AnyPayload>>,
}

impl Clone for SetTimeoutSender {
fn clone(&self) -> Self {
SetTimeoutSender {
inner: self.inner.sender_clone(),
}
}
}

#[async_trait]
impl<T: Message> Sender<SetTimeout<T>> for SetTimeoutSender {
async fn send(&mut self, request: SetTimeout<T>) -> Result<(), ChannelError> {
Expand Down

0 comments on commit b1d30f9

Please sign in to comment.