Skip to content

Commit

Permalink
Remove c8y firmware dependency on tedge_timer_ext
Browse files Browse the repository at this point in the history
Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Mar 7, 2024
1 parent 098beb2 commit ece233d
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 118 deletions.
2 changes: 0 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/extensions/c8y_firmware_manager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ tedge_api = { workspace = true }
tedge_config = { workspace = true }
tedge_downloader_ext = { workspace = true }
tedge_mqtt_ext = { workspace = true }
tedge_timer_ext = { workspace = true }
tedge_utils = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
Expand Down
5 changes: 0 additions & 5 deletions crates/extensions/c8y_firmware_manager/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ use tedge_downloader_ext::DownloadRequest;
use tedge_downloader_ext::DownloadResult;
use tedge_mqtt_ext::MqttMessage;
use tedge_mqtt_ext::Topic;
use tedge_timer_ext::SetTimeout;
use tedge_timer_ext::Timeout;
use tedge_utils::file::move_file;
use tedge_utils::file::FileError;
use tedge_utils::file::PermissionEntry;
Expand All @@ -50,9 +48,6 @@ use crate::mpsc;
use crate::operation::FirmwareOperationEntry;
use crate::operation::OperationKey;

pub type OperationSetTimeout = SetTimeout<OperationKey>;
pub type OperationTimeout = Timeout<OperationKey>;

pub type IdDownloadResult = (String, DownloadResult);
pub type IdDownloadRequest = (String, DownloadRequest);

Expand Down
4 changes: 0 additions & 4 deletions crates/extensions/c8y_firmware_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ mod tests;
use crate::actor::IdDownloadRequest;
use crate::actor::IdDownloadResult;
use crate::actor::OperationOutcome;
use crate::actor::OperationSetTimeout;
use crate::actor::OperationTimeout;
use actor::FirmwareInput;
use actor::FirmwareManagerActor;
use c8y_http_proxy::credentials::JwtResult;
Expand All @@ -22,7 +20,6 @@ use tedge_actors::ClientMessageBox;
use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::LoggingReceiver;
use tedge_actors::NoConfig;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::Service;
Expand All @@ -48,7 +45,6 @@ impl FirmwareManagerBuilder {
config: FirmwareManagerConfig,
mqtt_actor: &mut impl ServiceProvider<MqttMessage, MqttMessage, TopicFilter>,
jwt_actor: &mut impl Service<(), JwtResult>,
_timer_actor: &mut impl ServiceProvider<OperationSetTimeout, OperationTimeout, NoConfig>,
downloader_actor: &mut impl Service<IdDownloadRequest, IdDownloadResult>,
) -> Result<FirmwareManagerBuilder, FileError> {
Self::init(&config.data_dir)?;
Expand Down
127 changes: 26 additions & 101 deletions crates/extensions/c8y_firmware_manager/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use super::*;
use crate::actor::OperationSetTimeout;
use crate::actor::OperationTimeout;
use crate::operation::OperationKey;
use assert_json_diff::assert_json_include;
use c8y_api::smartrest::topic::C8yTopic;
use c8y_http_proxy::credentials::JwtRequest;
Expand All @@ -25,7 +22,6 @@ use tedge_api::DownloadError;
use tedge_downloader_ext::DownloadResponse;
use tedge_mqtt_ext::Topic;
use tedge_test_utils::fs::TempTedgeDir;
use tedge_timer_ext::Timeout;
use tokio::fs::remove_dir_all;
use tokio::task::JoinHandle;

Expand All @@ -46,13 +42,8 @@ const DEFAULT_REQUEST_TIMEOUT_SEC: Duration = Duration::from_secs(3600);
async fn handle_request_child_device_without_new_download() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();

let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut _downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut _downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;

// Publish firmware update operation to child device.
publish_smartrest_firmware_operation(&mut mqtt_message_box).await?;
Expand Down Expand Up @@ -105,13 +96,8 @@ async fn handle_request_child_device_without_new_download() -> Result<(), DynErr
async fn resend_firmware_update_request_child_device() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();

let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut _downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut _downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;

// Publish firmware update operation to child device.
publish_smartrest_firmware_operation(&mut mqtt_message_box).await?;
Expand Down Expand Up @@ -182,13 +168,8 @@ async fn resend_firmware_update_request_child_device() -> Result<(), DynError> {
async fn handle_request_child_device_with_new_download() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();

let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?;

// Publish firmware update operation to child device.
publish_smartrest_firmware_operation(&mut mqtt_message_box).await?;
Expand Down Expand Up @@ -246,13 +227,8 @@ async fn handle_request_child_device_with_new_download() -> Result<(), DynError>
async fn handle_request_child_device_with_failed_download() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();

let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?;

// Publish firmware update operation to child device.
publish_smartrest_firmware_operation(&mut mqtt_message_box).await?;
Expand Down Expand Up @@ -294,13 +270,8 @@ async fn handle_request_child_device_with_failed_download() -> Result<(), DynErr
async fn create_download_request_with_c8y_auth() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();

let (
_handle,
mut mqtt_message_box,
mut jwt_message_box,
mut _timer_message_box,
mut downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?;
let (_handle, mut mqtt_message_box, mut jwt_message_box, mut downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?;

let c8y_download_url = format!("http://{C8Y_HOST}/file/end/point");
let token = "token";
Expand Down Expand Up @@ -338,13 +309,8 @@ async fn create_download_request_with_c8y_auth() -> Result<(), DynError> {
async fn handle_response_successful_child_device() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();

let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut _downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut _downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;

// On startup, SmartREST 500 should be sent by firmware manager.
mqtt_message_box.skip(1).await;
Expand Down Expand Up @@ -383,13 +349,8 @@ async fn handle_response_successful_child_device() -> Result<(), DynError> {
#[tokio::test]
async fn handle_response_executing_and_failed_child_device() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();
let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut _downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut _downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;

// On startup, SmartREST 500 should be sent by firmware manager.
mqtt_message_box.skip(1).await;
Expand Down Expand Up @@ -430,13 +391,8 @@ async fn handle_response_executing_and_failed_child_device() -> Result<(), DynEr
#[tokio::test]
async fn ignore_response_with_invalid_status_child_device() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();
let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut _downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut _downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;

// On startup, SmartREST 500 should be sent by firmware manager.
mqtt_message_box.skip(1).await;
Expand All @@ -461,13 +417,8 @@ async fn ignore_response_with_invalid_status_child_device() -> Result<(), DynErr
#[tokio::test]
async fn handle_response_with_invalid_operation_id_child_device() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();
let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut _downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut _downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;

// Publish firmware update operation to child device.
publish_smartrest_firmware_operation(&mut mqtt_message_box).await?;
Expand All @@ -489,13 +440,8 @@ async fn handle_response_with_invalid_operation_id_child_device() -> Result<(),
#[tokio::test]
async fn handle_request_timeout_child_device() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();
let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut timer_message_box,
mut _downloader_message_box,
) = spawn_firmware_manager(&mut ttd, Duration::from_secs(1), true).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut _downloader_message_box) =
spawn_firmware_manager(&mut ttd, Duration::from_secs(1), true).await?;

// On startup, SmartREST 500 should be sent by firmware manager.
mqtt_message_box.skip(1).await;
Expand Down Expand Up @@ -529,13 +475,8 @@ async fn handle_request_timeout_child_device() -> Result<(), DynError> {
async fn handle_child_response_while_busy_downloading() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();

let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut _downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut _downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, true).await?;

// Ignore SmartREST 500.
mqtt_message_box.skip(1).await;
Expand Down Expand Up @@ -583,13 +524,8 @@ async fn required_init_state_recreated_on_startup() -> Result<(), DynError> {
let mut ttd = TempTedgeDir::new();

//Start the mapper without pre-creating the required directories like {data.path}/cache, {data.path}/firmware etc
let (
handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut _downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?;
let (handle, mut mqtt_message_box, mut _jwt_message_box, mut _downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?;

// Assert that the startup succeeds and the plugin requests for pending operations
mqtt_message_box
Expand All @@ -610,13 +546,8 @@ async fn required_init_state_recreated_on_startup() -> Result<(), DynError> {
handle.abort();

// Start the mapper again
let (
_handle,
mut mqtt_message_box,
mut _jwt_message_box,
mut _timer_message_box,
mut _downloader_message_box,
) = spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?;
let (_handle, mut mqtt_message_box, mut _jwt_message_box, mut _downloader_message_box) =
spawn_firmware_manager(&mut ttd, DEFAULT_REQUEST_TIMEOUT_SEC, false).await?;

// Assert that the startup succeeds again and the mapper requests for pending operations
mqtt_message_box
Expand Down Expand Up @@ -686,7 +617,6 @@ async fn spawn_firmware_manager(
JoinHandle<Result<(), RuntimeError>>,
TimedMessageBox<SimpleMessageBox<MqttMessage, MqttMessage>>,
TimedMessageBox<FakeServerBox<JwtRequest, JwtResult>>,
SimpleMessageBox<OperationSetTimeout, OperationTimeout>,
TimedMessageBox<FakeServerBox<IdDownloadRequest, IdDownloadResult>>,
),
DynError,
Expand All @@ -712,24 +642,20 @@ async fn spawn_firmware_manager(
let mut mqtt_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage> =
SimpleMessageBoxBuilder::new("MQTT", 5);
let mut jwt_builder: FakeServerBoxBuilder<JwtRequest, JwtResult> = FakeServerBox::builder();
let mut timer_builder: SimpleMessageBoxBuilder<OperationSetTimeout, OperationTimeout> =
SimpleMessageBoxBuilder::new("Timer", 5);
let mut downloader_builder: FakeServerBoxBuilder<IdDownloadRequest, IdDownloadResult> =
FakeServerBox::builder();

let firmware_manager_builder = FirmwareManagerBuilder::try_new(
config,
&mut mqtt_builder,
&mut jwt_builder,
&mut timer_builder,
&mut downloader_builder,
)
.unwrap();

let mqtt_message_box = mqtt_builder.build().with_timeout(TEST_TIMEOUT_MS);
let jwt_message_box = jwt_builder.build().with_timeout(TEST_TIMEOUT_MS);
// Cannot change the timer box to TimedMessageBox as SetTimeout doesn't have std::cmp:Eq implementation
let timer_message_box = timer_builder.build();
let downloader_message_box = downloader_builder.build().with_timeout(TEST_TIMEOUT_MS);

let firmware_manager_actor = firmware_manager_builder.build();
Expand All @@ -739,7 +665,6 @@ async fn spawn_firmware_manager(
handle,
mqtt_message_box,
jwt_message_box,
timer_message_box,
downloader_message_box,
))
}
1 change: 0 additions & 1 deletion plugins/c8y_firmware_plugin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ tedge_downloader_ext = { workspace = true }
tedge_health_ext = { workspace = true }
tedge_mqtt_ext = { workspace = true }
tedge_signal_ext = { workspace = true }
tedge_timer_ext = { workspace = true }
tokio = { workspace = true, features = ["rt", "rt-multi-thread"] }
tracing = { workspace = true }

Expand Down
4 changes: 0 additions & 4 deletions plugins/c8y_firmware_plugin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use tedge_downloader_ext::DownloaderActor;
use tedge_health_ext::HealthMonitorBuilder;
use tedge_mqtt_ext::MqttActorBuilder;
use tedge_signal_ext::SignalActor;
use tedge_timer_ext::TimerActor;
use tracing::log::warn;

const PLUGIN_NAME: &str = "c8y-firmware-plugin";
Expand Down Expand Up @@ -83,7 +82,6 @@ async fn run_with(tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
// Create actor instances
let mqtt_config = tedge_config.mqtt_config()?;
let mut jwt_actor = C8YJwtRetriever::builder(mqtt_config.clone());
let mut timer_actor = TimerActor::builder();
let identity = tedge_config.http.client.auth.identity()?;
let mut downloader_actor = DownloaderActor::new(identity).builder();
let mut mqtt_actor = MqttActorBuilder::new(mqtt_config.clone().with_session_name(PLUGIN_NAME));
Expand Down Expand Up @@ -121,7 +119,6 @@ async fn run_with(tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
firmware_manager_config,
&mut mqtt_actor,
&mut jwt_actor,
&mut timer_actor,
&mut downloader_actor,
)?;

Expand All @@ -134,7 +131,6 @@ async fn run_with(tedge_config: TEdgeConfig) -> Result<(), anyhow::Error> {
runtime.spawn(jwt_actor).await?;
runtime.spawn(downloader_actor).await?;
runtime.spawn(firmware_actor).await?;
runtime.spawn(timer_actor).await?;
runtime.spawn(health_actor).await?;

runtime.run_to_completion().await?;
Expand Down

0 comments on commit ece233d

Please sign in to comment.