Skip to content

Commit

Permalink
Deprecate Sender::sender_clone()
Browse files Browse the repository at this point in the history
This method was used only in 3 tests, while adding extra complexity.

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Jan 23, 2024
1 parent 6e5e373 commit 1709df2
Show file tree
Hide file tree
Showing 8 changed files with 10 additions and 51 deletions.
6 changes: 5 additions & 1 deletion crates/core/tedge_actors/src/actors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ pub mod tests {
async fn running_an_actor_without_a_runtime() {
let mut box_builder = SimpleMessageBoxBuilder::new("test", 16);
let mut client_message_box = box_builder.new_client_box(NoConfig);
let mut runtime_box = box_builder.get_signal_sender();
let actor_message_box = box_builder.build();
let actor = Echo {
messages: actor_message_box,
Expand All @@ -93,7 +94,10 @@ pub mod tests {
assert_eq!(client_message_box.recv().await, Some("World".to_string()));

// When there is no more input message senders
client_message_box.close_sender();
runtime_box
.send(RuntimeRequest::Shutdown)
.await
.expect("fail to shutdown");

// The actor stops
actor_task
Expand Down
17 changes: 0 additions & 17 deletions crates/core/tedge_actors/src/channels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ pub trait Sender<M>: 'static + Send + Sync {

/// Clone this sender in order to send messages to the same receiver from another actor
fn sender_clone(&self) -> DynSender<M>;

/// Closes this channel from the sender side, preventing any new messages.
fn close_sender(&mut self);
}

impl<M: Message> Clone for DynSender<M> {
Expand All @@ -46,10 +43,6 @@ impl<M: Message, N: Message + Into<M>> Sender<N> for mpsc::Sender<M> {
fn sender_clone(&self) -> DynSender<N> {
Box::new(self.clone())
}

fn close_sender(&mut self) {
self.close_channel();
}
}

/// Make a `DynSender<N>` from a `DynSender<M>`
Expand Down Expand Up @@ -77,10 +70,6 @@ impl<M: Message, N: Message + Into<M>> Sender<N> for DynSender<M> {
fn sender_clone(&self) -> DynSender<N> {
Box::new(self.as_ref().sender_clone())
}

fn close_sender(&mut self) {
self.as_mut().close_sender()
}
}

/// A sender that discards messages instead of sending them
Expand All @@ -95,8 +84,6 @@ impl<M: Message> Sender<M> for NullSender {
fn sender_clone(&self) -> DynSender<M> {
Box::new(NullSender)
}

fn close_sender(&mut self) {}
}

impl<M: Message> From<NullSender> for DynSender<M> {
Expand Down Expand Up @@ -142,10 +129,6 @@ where
cast: self.cast.clone(),
})
}

fn close_sender(&mut self) {
self.inner.as_mut().close_sender()
}
}

impl<M, N, NS, F> From<MappingSender<F, N>> for DynSender<M>
Expand Down
8 changes: 0 additions & 8 deletions crates/core/tedge_actors/src/message_boxes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,6 @@ impl<Output: Debug + Send + Sync + 'static> Sender<Output> for LoggingSender<Out
sender: self.sender.clone(),
})
}

fn close_sender(&mut self) {
Sender::<Output>::close_sender(&mut self.sender)
}
}

pub fn log_message_sent<I: Debug>(target: &str, message: I) {
Expand Down Expand Up @@ -320,10 +316,6 @@ impl<Input: Message, Output: Message> Sender<Output> for SimpleMessageBox<Input,
fn sender_clone(&self) -> DynSender<Output> {
self.output_sender.sender_clone()
}

fn close_sender(&mut self) {
self.output_sender.close_sender()
}
}

pub struct CombinedReceiver<Input> {
Expand Down
10 changes: 0 additions & 10 deletions crates/core/tedge_actors/src/servers/keyed_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ impl<K: Message + Clone, M: Message> Sender<M> for KeyedSender<K, M> {
sender: self.sender.clone(),
})
}

fn close_sender(&mut self) {
self.sender.close_channel()
}
}

/// A vector of senders addressed using a sender id attached to each message
Expand Down Expand Up @@ -60,10 +56,4 @@ impl<M: Message> Sender<(usize, M)> for SenderVec<M> {
let senders = self.senders.iter().map(|r| r.sender_clone()).collect();
Box::new(SenderVec { senders })
}

fn close_sender(&mut self) {
self.senders
.iter_mut()
.for_each(|s| s.as_mut().close_sender())
}
}
4 changes: 0 additions & 4 deletions crates/core/tedge_actors/src/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,10 +362,6 @@ where
fn sender_clone(&self) -> DynSender<M> {
self.inner.sender_clone()
}

fn close_sender(&mut self) {
self.inner.close_sender()
}
}

impl<T> AsRef<T> for TimedMessageBox<T> {
Expand Down
2 changes: 0 additions & 2 deletions crates/extensions/c8y_mapper_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2210,8 +2210,6 @@ async fn inventory_registers_unknown_entity_once() {
mqtt.send(measurement_message.clone()).await.unwrap();
}

mqtt.close_sender();

let mut messages = vec![];
while let Some(WrappedInput::Message(msg)) = mqtt.recv_message().await {
messages.push(msg);
Expand Down
8 changes: 0 additions & 8 deletions crates/extensions/tedge_timer_ext/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ impl<T: Message> Sender<Timeout<AnyPayload>> for TimeoutSender<T> {
inner: self.inner.sender_clone(),
})
}

fn close_sender(&mut self) {
self.inner.as_mut().close_sender()
}
}

/// A Sender that translates timeout requests on the wire
Expand All @@ -113,8 +109,4 @@ impl<T: Message> Sender<SetTimeout<T>> for SetTimeoutSender {
inner: self.inner.sender_clone(),
})
}

fn close_sender(&mut self) {
self.inner.as_mut().close_sender()
}
}
6 changes: 5 additions & 1 deletion crates/extensions/tedge_timer_ext/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ async fn should_shutdown_even_if_there_are_pending_timers() {
#[tokio::test]
async fn should_process_all_pending_timers_on_end_of_inputs() {
let mut client_box_builder = SimpleMessageBoxBuilder::new("Test timers", 16);
let mut runtime_box = client_box_builder.get_signal_sender();
let _ = spawn_timer_actor(&mut client_box_builder).await;
let mut client_box = client_box_builder.build();

Expand All @@ -129,7 +130,10 @@ async fn should_process_all_pending_timers_on_end_of_inputs() {
.unwrap();

// Then close the stream of requests
client_box.close_sender();
runtime_box
.send(RuntimeRequest::Shutdown)
.await
.expect("fail to shutdown");

// The actor timer is expected to shutdown,
// but *only* after all the pending requests have been processed.
Expand Down

0 comments on commit 1709df2

Please sign in to comment.