-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Simplify the operation handling code by replacing fragmented control flow with regular async/await #2881
Conversation
@@ -60,15 +64,36 @@ pub(crate) type IdDownloadRequest = (CmdId, DownloadRequest); | |||
fan_in_message_type!(C8yMapperInput[MqttMessage, FsWatchEvent, SyncComplete, IdUploadResult, IdDownloadResult] : Debug); | |||
type C8yMapperOutput = MqttMessage; | |||
|
|||
#[derive(Clone)] | |||
struct MqttPublisher(Arc<Mutex<LoggingSender<MqttMessage>>>); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any specific reason you couldn't just freely clone the senders and use them independently? I see that you're already planning on exploring that option, but just wondering why you chose this path first. Faced any issues with cloned senders?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did that first because I knew that I'll need Mutex
es for synchronization of other parts, and to be honest, I forgot that our senders are Clone
, so I thought Mutex
is the only option, but it's not the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be honest, I forgot that our senders are Clone, so I thought Mutex is the only option, but it's not the case
That's not entirely correct. Because our LoggingSender<MqttMessage>
internally uses futures::channel::mpsc::Sender
, which is send(&mut self)
, we need the methods to be &mut self
as well if we want to use the sender. However, if we put the sender behind a Mutex
, we can implement send(&self)
, allowing methods of the worker to also be &self
.
while let Some(event) = messages.recv().await { | ||
let worker = Arc::clone(&worker); | ||
|
||
tokio::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a different model in mind where we have independent workers (even actors, may be) for each operation handler with minimal coordination with the top level mapper actor/worker, eliminating the need to share and lock the top level converter. This mapper actor would still coordinate the MQTT messages and convert them to typed operation messages and dispatch that to those respective operation workers. Once the worker gets an operation message, they should be able to freely exchange messages between the other actors like upload/download actors. Extending the usage of the RequestEnvelope
beyond the HTTP actors would be key to enabling this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had a different model in mind where we have independent workers (even actors, may be) for each operation handler with minimal coordination with the top level mapper actor/worker, eliminating the need to share and lock the top level converter.
Yes, the idea would be to decouple as much as possible from CumulocityConverter
wrt. the operation handling, so it can be run separately as a new task, or maybe as a new actor, without locking the rest of the converter. Right now, basically all the functionality is implemented in CumulocityConverter
in &mut self
methods, so it's impossible to run stuff concurrently without locking.
Ideally the each operation handler should be as independent as possible, and we should be able to easily reason about each in isolation, without the control flow jumping between actor.rs
, converter.rs
, and e.g. log_upload.rs
.
Right now they're just tokio tasks, because my current goal is to just move the boundary on which we block, but the handlers could definitely be extracted out of the main actor.
This mapper actor would still coordinate the MQTT messages and convert them to typed operation messages and dispatch that to those respective operation workers. Once the worker gets an operation message, they should be able to freely exchange messages between the other actors like upload/download actors. Extending the usage of the RequestEnvelope beyond the HTTP actors would be key to enabling this.
That's right, the actor which owns the MQTT message box should route these operation state change message to the operation handlers, which should then be able to do do blocking stuff, like wait for the upload, without synchronizing with the main actor. But I'd say that we overuse messages a bit. It's fine to have actors that manage shared mutable state, but for downloading/uploading, what shared state is there that needs to be managed between different downloads/uploads? Why shouldn't we use the downloader/uploader directly instead of going via the actor?
And I believe that sometimes we use the actor only so that we can mock it in tests. That instead of having an interface that we can mock, we effectively mock the interface by simulating messages being sent in the message boxes. And I think this way of testing is a bit tedious, because you need to handle and ignore unrelated messages, and sometimes when messages are not being send by actor under test, you need to do timeouts, and so on. I think there's room for improvement there.
But that's a discussion for another time, and I expect we'll discuss it more in the future. For the purposes of this PR, the important part is allowing handlers to await either tasks they themselves spawn, or requests sent to other actors via RequestEnvelope
, one of these two.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why shouldn't we use the downloader/uploader directly instead of going via the actor?
One additional reason, other than message visibility and testability, is to control the bandwidth usage as well. We don't want to let too many uploads/downloads happening parallelly, to reduce the load on the network and the disk. Routing those via a centralised downloader/uploader will let us better control the maximum parallelism that can be allowed.
url: tedge_file_url.clone(), | ||
file_dir: destination_dir, | ||
let downloader = | ||
Downloader::new(destination_path.clone(), self.config.identity.clone()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we can continue using the downloader actor here by using a ClientMessageBox
to it so that we can await on the response receiver inline. The c8y_firmware_manager
is already using the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's right, we could use the ClientMessageBox
here, but I also wanted to explore if we could just use the downloader directly, without relying on an outside actor to do it. I'd say it has an advantage of decreasing coupling. The disadvantage is that as the messages are no longer visible, but does other code need them to be visible, other than testing code? If so, then maybe we don't need to go via the actor.
But the main way I did it like this, was to just demonstrate that after the refactor it will be possible, whereas currently this would block. But now we can easily swap this with an EnvelopeSender
if we want to go via the actor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And additional benefit of going via the downloader actor is described in this comment: #2881 (comment)
@@ -375,7 +309,7 @@ impl C8yMapperBuilder { | |||
mqtt.connect_sink(config.topics.clone(), &box_builder.get_sender()); | |||
let http_proxy = C8YHttpProxy::new(http); | |||
let timer_sender = timer.connect_client(box_builder.get_sender().sender_clone()); | |||
let upload_sender = uploader.connect_client(box_builder.get_sender().sender_clone()); | |||
let uploader = ClientMessageBox::new(uploader); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be able to do the same for the downloader as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, explanation why it was not done for now in comment: #2881 (comment)
892ecbe
to
f5f7511
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files
|
To be more sure about future changes to the c8y mapper and the converter, we need tests checking that operations happen concurrently. This change introduces a test that only covers config_snapshot operation, but it expectedly fails when regressions were introduced by the previous commits in this PR. Signed-off-by: Marcel Guzik <[email protected]>
f5f7511
to
884b25f
Compare
Robot Results
Failed Tests
|
Closing in favour of #2904, which is a more promising and feasible approach. |
Proposed changes
This PR should make the operation handle simpler to reason about and change, by allowing operation handlers to use
.await
without blocking. The scope of the PR is to show the idea working on theconfig_snapshot
andconfig_update
operations.Below i describe the current state, which is subject to change.
Mutex
es to guard theMqttPublisher
andCumulocityConverter
:struct MqttPublisher(Arc<Mutex<LoggingSender<MqttMessage>>>)
withfn send(&self)
is ok, as we lock the mutex to use the sender, and unlock it immediately after we finished sending, so it shouldn't block. The alternative would be to clone the sender for each worker, and I will explore it later.Mutex<CumulocityConverter>
, it ensures that only a single worker can useCumulocityConverter
at any given time, which can lead to deadlocks if we're not careful, and currently results in the same behaviour as before, i.e. blocking in the operation handling code will block processing of other messages, because the lock will be open across await points.This is temporary, and will have to be fixed, but is used right now so I could show how the full operation handling function would look like, without the current fragmentation. Also, the test demonstrating that the converter doesn't block should be made.
Identity
type from thedownload
crate, so I can use it in the next commit.Downloader
is used directly in the operation handler, to show how we can use it directly without going through the actor: if we only want to download a file via HTTP, and this download shouldn't influence anything else, we can use it directly.UploaderActor
viaClientMessageBox
, where we receive the response directly in the same place where wesend()
, instead of going to the top-level receiver in the top-level actor, which then needs to decide to call an operation handling function again. This way we still use an uploader actor, and have messages sent between them, but don't need to fragment our control flow. This was done to show that we can still send and receive messages from other actors from within the proposed new operation handlers.Next steps
replace lock on mqtt sender with cloned senders(can be done, but will require&mut self
)CumulocityConverter
so operation handling doesn't blockTypes of changes
Paste Link to the issue
Checklist
cargo fmt
as mentioned in CODING_GUIDELINEScargo clippy
as mentioned in CODING_GUIDELINESFurther comments