diff --git a/crates/core/tedge_actors/src/actors.rs b/crates/core/tedge_actors/src/actors.rs index 1922ffab99c..37cd56afaa5 100644 --- a/crates/core/tedge_actors/src/actors.rs +++ b/crates/core/tedge_actors/src/actors.rs @@ -78,6 +78,7 @@ pub mod tests { async fn running_an_actor_without_a_runtime() { let mut box_builder = SimpleMessageBoxBuilder::new("test", 16); let mut client_message_box = box_builder.new_client_box(NoConfig); + let mut runtime_box = box_builder.get_signal_sender(); let actor_message_box = box_builder.build(); let actor = Echo { messages: actor_message_box, @@ -93,7 +94,10 @@ pub mod tests { assert_eq!(client_message_box.recv().await, Some("World".to_string())); // When there is no more input message senders - client_message_box.close_sender(); + runtime_box + .send(RuntimeRequest::Shutdown) + .await + .expect("fail to shutdown"); // The actor stops actor_task diff --git a/crates/core/tedge_actors/src/channels.rs b/crates/core/tedge_actors/src/channels.rs index 0f6fe6f99ee..66f4f8eca1b 100644 --- a/crates/core/tedge_actors/src/channels.rs +++ b/crates/core/tedge_actors/src/channels.rs @@ -19,9 +19,6 @@ pub trait Sender: 'static + Send + Sync { /// Clone this sender in order to send messages to the same receiver from another actor fn sender_clone(&self) -> DynSender; - - /// Closes this channel from the sender side, preventing any new messages. - fn close_sender(&mut self); } impl Clone for DynSender { @@ -46,10 +43,6 @@ impl> Sender for mpsc::Sender { fn sender_clone(&self) -> DynSender { Box::new(self.clone()) } - - fn close_sender(&mut self) { - self.close_channel(); - } } /// Make a `DynSender` from a `DynSender` @@ -77,10 +70,6 @@ impl> Sender for DynSender { fn sender_clone(&self) -> DynSender { Box::new(self.as_ref().sender_clone()) } - - fn close_sender(&mut self) { - self.as_mut().close_sender() - } } /// A sender that discards messages instead of sending them @@ -95,8 +84,6 @@ impl Sender for NullSender { fn sender_clone(&self) -> DynSender { Box::new(NullSender) } - - fn close_sender(&mut self) {} } impl From for DynSender { @@ -142,10 +129,6 @@ where cast: self.cast.clone(), }) } - - fn close_sender(&mut self) { - self.inner.as_mut().close_sender() - } } impl From> for DynSender diff --git a/crates/core/tedge_actors/src/message_boxes.rs b/crates/core/tedge_actors/src/message_boxes.rs index 38d386a2488..c604b451e5c 100644 --- a/crates/core/tedge_actors/src/message_boxes.rs +++ b/crates/core/tedge_actors/src/message_boxes.rs @@ -240,10 +240,6 @@ impl Sender for LoggingSender::close_sender(&mut self.sender) - } } pub fn log_message_sent(target: &str, message: I) { @@ -320,10 +316,6 @@ impl Sender for SimpleMessageBox DynSender { self.output_sender.sender_clone() } - - fn close_sender(&mut self) { - self.output_sender.close_sender() - } } pub struct CombinedReceiver { diff --git a/crates/core/tedge_actors/src/servers/keyed_messages.rs b/crates/core/tedge_actors/src/servers/keyed_messages.rs index f366d2a62df..bdab5493908 100644 --- a/crates/core/tedge_actors/src/servers/keyed_messages.rs +++ b/crates/core/tedge_actors/src/servers/keyed_messages.rs @@ -29,10 +29,6 @@ impl Sender for KeyedSender { sender: self.sender.clone(), }) } - - fn close_sender(&mut self) { - self.sender.close_channel() - } } /// A vector of senders addressed using a sender id attached to each message @@ -60,10 +56,4 @@ impl Sender<(usize, M)> for SenderVec { let senders = self.senders.iter().map(|r| r.sender_clone()).collect(); Box::new(SenderVec { senders }) } - - fn close_sender(&mut self) { - self.senders - .iter_mut() - .for_each(|s| s.as_mut().close_sender()) - } } diff --git a/crates/core/tedge_actors/src/test_helpers.rs b/crates/core/tedge_actors/src/test_helpers.rs index 69857732c5b..5d2c289ea8c 100644 --- a/crates/core/tedge_actors/src/test_helpers.rs +++ b/crates/core/tedge_actors/src/test_helpers.rs @@ -362,10 +362,6 @@ where fn sender_clone(&self) -> DynSender { self.inner.sender_clone() } - - fn close_sender(&mut self) { - self.inner.close_sender() - } } impl AsRef for TimedMessageBox { diff --git a/crates/extensions/c8y_mapper_ext/src/tests.rs b/crates/extensions/c8y_mapper_ext/src/tests.rs index 34a9fddf2a2..bf9a3f62094 100644 --- a/crates/extensions/c8y_mapper_ext/src/tests.rs +++ b/crates/extensions/c8y_mapper_ext/src/tests.rs @@ -2210,8 +2210,6 @@ async fn inventory_registers_unknown_entity_once() { mqtt.send(measurement_message.clone()).await.unwrap(); } - mqtt.close_sender(); - let mut messages = vec![]; while let Some(WrappedInput::Message(msg)) = mqtt.recv_message().await { messages.push(msg); diff --git a/crates/extensions/tedge_timer_ext/src/builder.rs b/crates/extensions/tedge_timer_ext/src/builder.rs index 9b7e8c4832a..72b8c9bd21e 100644 --- a/crates/extensions/tedge_timer_ext/src/builder.rs +++ b/crates/extensions/tedge_timer_ext/src/builder.rs @@ -86,10 +86,6 @@ impl Sender> for TimeoutSender { inner: self.inner.sender_clone(), }) } - - fn close_sender(&mut self) { - self.inner.as_mut().close_sender() - } } /// A Sender that translates timeout requests on the wire @@ -113,8 +109,4 @@ impl Sender> for SetTimeoutSender { inner: self.inner.sender_clone(), }) } - - fn close_sender(&mut self) { - self.inner.as_mut().close_sender() - } } diff --git a/crates/extensions/tedge_timer_ext/src/tests.rs b/crates/extensions/tedge_timer_ext/src/tests.rs index 20faa1a25fa..cff8d95fae5 100644 --- a/crates/extensions/tedge_timer_ext/src/tests.rs +++ b/crates/extensions/tedge_timer_ext/src/tests.rs @@ -108,6 +108,7 @@ async fn should_shutdown_even_if_there_are_pending_timers() { #[tokio::test] async fn should_process_all_pending_timers_on_end_of_inputs() { let mut client_box_builder = SimpleMessageBoxBuilder::new("Test timers", 16); + let mut runtime_box = client_box_builder.get_signal_sender(); let _ = spawn_timer_actor(&mut client_box_builder).await; let mut client_box = client_box_builder.build(); @@ -129,7 +130,10 @@ async fn should_process_all_pending_timers_on_end_of_inputs() { .unwrap(); // Then close the stream of requests - client_box.close_sender(); + runtime_box + .send(RuntimeRequest::Shutdown) + .await + .expect("fail to shutdown"); // The actor timer is expected to shutdown, // but *only* after all the pending requests have been processed.