Skip to content

Commit

Permalink
(Cont) Deprecate keyed message boxes
Browse files Browse the repository at this point in the history
Update all actors interacting with a server actor

Signed-off-by: Didier Wenzek <[email protected]>
  • Loading branch information
didier-wenzek committed Feb 19, 2024
1 parent 40538f5 commit dff9778
Show file tree
Hide file tree
Showing 19 changed files with 287 additions and 185 deletions.
11 changes: 9 additions & 2 deletions crates/core/tedge_agent/src/tedge_operation_converter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::LoggingSender;
use tedge_actors::NoConfig;
use tedge_actors::NoMessage;
use tedge_actors::ReplyToRequester;
use tedge_actors::RequestEnvelope;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::ServiceProvider;
Expand Down Expand Up @@ -47,7 +50,11 @@ impl TedgeOperationConverterBuilder {
software_actor: &mut impl ServiceProvider<SoftwareCommand, SoftwareCommand, NoConfig>,
restart_actor: &mut impl ServiceProvider<RestartCommand, RestartCommand, NoConfig>,
mqtt_actor: &mut impl ServiceProvider<MqttMessage, MqttMessage, TopicFilter>,
script_runner: &mut impl ServiceProvider<Execute, std::io::Result<Output>, NoConfig>,
script_runner: &mut impl ServiceProvider<
RequestEnvelope<Execute, std::io::Result<Output>>,
NoMessage,
ReplyToRequester,
>,
) -> Self {
let (input_sender, input_receiver) = mpsc::unbounded();
let (signal_sender, signal_receiver) = mpsc::channel(10);
Expand All @@ -72,7 +79,7 @@ impl TedgeOperationConverterBuilder {
);
let mqtt_publisher = LoggingSender::new("MqttPublisher".into(), mqtt_publisher);

let script_runner = ClientMessageBox::new("Operation Script Runner", script_runner);
let script_runner = ClientMessageBox::new(script_runner);

for capability in Self::capabilities() {
let operation = capability.to_string();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use tedge_actors::Actor;
use tedge_actors::Builder;
use tedge_actors::DynError;
use tedge_actors::MessageReceiver;
use tedge_actors::NoMessage;
use tedge_actors::RequestEnvelope;
use tedge_actors::Sender;
use tedge_actors::SimpleMessageBox;
use tedge_actors::SimpleMessageBoxBuilder;
Expand Down Expand Up @@ -253,8 +255,10 @@ async fn spawn_mqtt_operation_converter(
SimpleMessageBoxBuilder::new("Restart", 5);
let mut mqtt_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage> =
SimpleMessageBoxBuilder::new("MQTT", 5);
let mut script_builder: SimpleMessageBoxBuilder<Execute, std::io::Result<Output>> =
SimpleMessageBoxBuilder::new("Script", 5);
let mut script_builder: SimpleMessageBoxBuilder<
RequestEnvelope<Execute, std::io::Result<Output>>,
NoMessage,
> = SimpleMessageBoxBuilder::new("Script", 5);

let workflows = WorkflowSupervisor::default();

Expand Down
2 changes: 1 addition & 1 deletion crates/extensions/c8y_auth_proxy/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ impl C8yAuthProxyBuilder {
let app_data = AppData {
is_https: true,
host: config.c8y.http.or_config_not_set()?.to_string(),
token_manager: TokenManager::new(JwtRetriever::new("C8Y-PROXY => JWT", jwt)).shared(),
token_manager: TokenManager::new(JwtRetriever::new(jwt)).shared(),
};
let bind = &config.c8y.proxy.bind;
let (signal_sender, signal_receiver) = mpsc::channel(10);
Expand Down
3 changes: 1 addition & 2 deletions crates/extensions/c8y_auth_proxy/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1023,8 +1023,7 @@ mod tests {
let state = AppData {
is_https: false,
host: target_host.into(),
token_manager: TokenManager::new(JwtRetriever::new("TEST => JWT", &mut retriever))
.shared(),
token_manager: TokenManager::new(JwtRetriever::new(&mut retriever)).shared(),
};
let trust_store = ca_dir
.as_ref()
Expand Down
11 changes: 9 additions & 2 deletions crates/extensions/c8y_firmware_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ use tedge_actors::DynSender;
use tedge_actors::LinkError;
use tedge_actors::LoggingReceiver;
use tedge_actors::NoConfig;
use tedge_actors::NoMessage;
use tedge_actors::ReplyToRequester;
use tedge_actors::RequestEnvelope;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::ServiceProvider;
Expand All @@ -48,7 +51,11 @@ impl FirmwareManagerBuilder {
pub fn try_new(
config: FirmwareManagerConfig,
mqtt_actor: &mut impl ServiceProvider<MqttMessage, MqttMessage, TopicFilter>,
jwt_actor: &mut impl ServiceProvider<JwtRequest, JwtResult, NoConfig>,
jwt_actor: &mut impl ServiceProvider<
RequestEnvelope<JwtRequest, JwtResult>,
NoMessage,
ReplyToRequester,
>,
timer_actor: &mut impl ServiceProvider<OperationSetTimeout, OperationTimeout, NoConfig>,
downloader_actor: &mut impl ServiceProvider<IdDownloadRequest, IdDownloadResult, NoConfig>,
) -> Result<FirmwareManagerBuilder, FileError> {
Expand All @@ -64,7 +71,7 @@ impl FirmwareManagerBuilder {

let mqtt_publisher =
mqtt_actor.connect_consumer(Self::subscriptions(), input_sender.clone().into());
let jwt_retriever = JwtRetriever::new("Firmware => JWT", jwt_actor);
let jwt_retriever = JwtRetriever::new(jwt_actor);
let timer_sender = timer_actor.connect_consumer(NoConfig, input_sender.clone().into());
let download_sender = downloader_actor.connect_consumer(NoConfig, input_sender.into());
Ok(Self {
Expand Down
7 changes: 4 additions & 3 deletions crates/extensions/c8y_firmware_manager/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use serde_json::json;
use sha256::digest;
use std::io;
use std::time::Duration;
use tedge_actors::test_helpers::FakeServerBox;
use tedge_actors::test_helpers::FakeServerBoxBuilder;
use tedge_actors::test_helpers::MessageReceiverExt;
use tedge_actors::test_helpers::TimedMessageBox;
use tedge_actors::Actor;
Expand Down Expand Up @@ -695,7 +697,7 @@ async fn spawn_firmware_manager(
(
JoinHandle<Result<(), RuntimeError>>,
TimedMessageBox<SimpleMessageBox<MqttMessage, MqttMessage>>,
TimedMessageBox<SimpleMessageBox<JwtRequest, JwtResult>>,
TimedMessageBox<FakeServerBox<JwtRequest, JwtResult>>,
SimpleMessageBox<OperationSetTimeout, OperationTimeout>,
TimedMessageBox<SimpleMessageBox<IdDownloadRequest, IdDownloadResult>>,
),
Expand All @@ -721,8 +723,7 @@ async fn spawn_firmware_manager(

let mut mqtt_builder: SimpleMessageBoxBuilder<MqttMessage, MqttMessage> =
SimpleMessageBoxBuilder::new("MQTT", 5);
let mut jwt_builder: SimpleMessageBoxBuilder<JwtRequest, JwtResult> =
SimpleMessageBoxBuilder::new("JWT", 1);
let mut jwt_builder: FakeServerBoxBuilder<JwtRequest, JwtResult> = FakeServerBox::builder();
let mut timer_builder: SimpleMessageBoxBuilder<OperationSetTimeout, OperationTimeout> =
SimpleMessageBoxBuilder::new("Timer", 5);
let mut downloader_builder: SimpleMessageBoxBuilder<IdDownloadRequest, IdDownloadResult> =
Expand Down
1 change: 1 addition & 0 deletions crates/extensions/c8y_http_proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ tokio = { workspace = true, features = ["macros"] }
mockito = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
tedge_actors = { workspace = true, features = ["test-helpers"] }
tedge_http_ext = { workspace = true, features = ["test_helpers"] }
time = { workspace = true }

Expand Down
19 changes: 10 additions & 9 deletions crates/extensions/c8y_http_proxy/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use tedge_actors::fan_in_message_type;
use tedge_actors::Actor;
use tedge_actors::ClientMessageBox;
use tedge_actors::MessageReceiver;
use tedge_actors::RequestEnvelope;
use tedge_actors::RuntimeError;
use tedge_actors::RuntimeRequest;
use tedge_actors::Sender;
Expand Down Expand Up @@ -66,14 +67,10 @@ pub struct C8YHttpProxyMessageBox {
pub(crate) jwt: JwtRetriever,
}

#[derive(Debug)]
pub struct C8YRestRequestWithClientId(usize, C8YRestRequest);
pub type C8YRestRequestEnvelope = RequestEnvelope<C8YRestRequest, C8YRestResult>;

#[derive(Debug)]
pub struct C8YRestResponseWithClientId(usize, C8YRestResult);

fan_in_message_type!(C8YHttpProxyInput[C8YRestRequestWithClientId, HttpResult, JwtResult] : Debug);
fan_in_message_type!(C8YHttpProxyOutput[C8YRestResponseWithClientId, HttpRequest, JwtRequest] : Debug);
fan_in_message_type!(C8YHttpProxyInput[C8YRestRequestEnvelope, HttpResult, JwtResult] : Debug);
fan_in_message_type!(C8YHttpProxyOutput[HttpRequest, JwtRequest] : Debug);

#[async_trait]
impl Actor for C8YHttpProxyActor {
Expand All @@ -84,7 +81,11 @@ impl Actor for C8YHttpProxyActor {
async fn run(mut self) -> Result<(), RuntimeError> {
self.init().await.map_err(Box::new)?;

while let Some((client_id, request)) = self.peers.clients.recv().await {
while let Some(RequestEnvelope {
request,
mut reply_to,
}) = self.peers.clients.recv().await
{
let result = match request {
C8YRestRequest::GetJwtToken(_) => self
.get_and_set_jwt_token()
Expand Down Expand Up @@ -122,7 +123,7 @@ impl Actor for C8YHttpProxyActor {
.await
.map(|response| response.into()),
};
self.peers.clients.send((client_id, result)).await?;
reply_to.send(result).await?;
}
Ok(())
}
Expand Down
13 changes: 9 additions & 4 deletions crates/extensions/c8y_http_proxy/src/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ use c8y_api::json_c8y::C8yUpdateSoftwareListResponse;
use std::path::Path;
use std::path::PathBuf;
use tedge_actors::ClientMessageBox;
use tedge_actors::NoConfig;
use tedge_actors::NoMessage;
use tedge_actors::ReplyToRequester;
use tedge_actors::RequestEnvelope;
use tedge_actors::ServiceProvider;
use tedge_utils::file::PermissionEntry;

Expand All @@ -25,10 +27,13 @@ pub struct C8YHttpProxy {

impl C8YHttpProxy {
pub fn new(
client_name: &str,
proxy_builder: &mut impl ServiceProvider<C8YRestRequest, C8YRestResult, NoConfig>,
proxy_builder: &mut impl ServiceProvider<
RequestEnvelope<C8YRestRequest, C8YRestResult>,
NoMessage,
ReplyToRequester,
>,
) -> Self {
let c8y = ClientMessageBox::new(client_name, proxy_builder);
let c8y = ClientMessageBox::new(proxy_builder);
C8YHttpProxy { c8y }
}

Expand Down
27 changes: 23 additions & 4 deletions crates/extensions/c8y_http_proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ use tedge_actors::Builder;
use tedge_actors::ClientMessageBox;
use tedge_actors::DynSender;
use tedge_actors::NoConfig;
use tedge_actors::NoMessage;
use tedge_actors::ReplyToRequester;
use tedge_actors::RequestEnvelope;
use tedge_actors::RuntimeRequest;
use tedge_actors::RuntimeRequestSink;
use tedge_actors::ServerMessageBoxBuilder;
Expand Down Expand Up @@ -94,12 +97,16 @@ pub struct C8YHttpProxyBuilder {
impl C8YHttpProxyBuilder {
pub fn new(
config: C8YHttpConfig,
http: &mut impl ServiceProvider<HttpRequest, HttpResult, NoConfig>,
jwt: &mut impl ServiceProvider<(), JwtResult, NoConfig>,
http: &mut impl ServiceProvider<
RequestEnvelope<HttpRequest, HttpResult>,
NoMessage,
ReplyToRequester,
>,
jwt: &mut impl ServiceProvider<RequestEnvelope<(), JwtResult>, NoMessage, ReplyToRequester>,
) -> Self {
let clients = ServerMessageBoxBuilder::new("C8Y-REST", 10);
let http = ClientMessageBox::new("C8Y-REST => HTTP", http);
let jwt = JwtRetriever::new("C8Y-REST => JWT", jwt);
let http = ClientMessageBox::new(http);
let jwt = JwtRetriever::new(jwt);
C8YHttpProxyBuilder {
config,
clients,
Expand Down Expand Up @@ -137,6 +144,18 @@ impl ServiceProvider<C8YRestRequest, C8YRestResult, NoConfig> for C8YHttpProxyBu
}
}

impl ServiceProvider<RequestEnvelope<C8YRestRequest, C8YRestResult>, NoMessage, ReplyToRequester>
for C8YHttpProxyBuilder
{
fn connect_consumer(
&mut self,
config: ReplyToRequester,
response_sender: DynSender<NoMessage>,
) -> DynSender<RequestEnvelope<C8YRestRequest, C8YRestResult>> {
self.clients.connect_consumer(config, response_sender)
}
}

impl RuntimeRequestSink for C8YHttpProxyBuilder {
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
self.clients.get_signal_sender()
Expand Down
Loading

0 comments on commit dff9778

Please sign in to comment.