-
Notifications
You must be signed in to change notification settings - Fork 56
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: Simplify actor request handlers #2600
refactor: Simplify actor request handlers #2600
Conversation
f2cae6d
to
74dbfcf
Compare
Robot Results
|
74dbfcf
to
38a481e
Compare
38a481e
to
a59cb3a
Compare
bf0eeeb
to
b1d30f9
Compare
0b8c25e
to
dff9778
Compare
@@ -154,3 +155,14 @@ impl<Request: Message, Response: Message> ServiceProvider<Request, Response, NoC | |||
}) | |||
} | |||
} | |||
|
|||
/// A connector to a [Server] expecting Request and returning Response. | |||
pub trait Service<Request: Message, Response: Message>: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This trait was added just for aesthetics, to look better when used in actor builders along with ServiceProviders
, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If by aesthetics you mean a nicer type name, then yes. Also, the clients of a server actor doesn't need to know that their requests are wrapped into envelopes with a reply-to address.
Once this PR merged, we can go further. Indeed, the current ServiceProvider
/ ServiceConsumer
abstraction can now be deprecated in favor of MessageSource
/ MessageSink
.
ReplyToRequester, | ||
>, | ||
jwt: &mut impl ServiceProvider<RequestEnvelope<(), JwtResult>, NoMessage, ReplyToRequester>, | ||
http: &mut impl Service<HttpRequest, HttpResult>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seeing that makes me appreciate that dummy Service
abstraction a lot more, hiding all the unnecessary boilerplate like NoMessage
, NoConfig
etc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's the point of this abstraction. Not so dummy ;-).
mut self, | ||
operation_id: String, | ||
smartrest_request: SmartRestFirmwareRequest, | ||
mut input_receiver: mpsc::Receiver<FirmwareOperationResponse>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mut input_receiver: mpsc::Receiver<FirmwareOperationResponse>, | |
mut response_receiver: mpsc::Receiver<FirmwareOperationResponse>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will do that in a final commit for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done with a final commit: 0bd66fa
Ok(()) | ||
} | ||
} | ||
|
||
pub struct FirmwareManagerMessageBox { | ||
input_receiver: LoggingReceiver<FirmwareInput>, | ||
struct FirmwareManagerWorker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
struct FirmwareManagerWorker { | |
/// A worker, spawned per request, that can handle a single firmware update request for a child device end-to-end | |
struct FirmwareManagerWorker { |
Just trying to highlight that the scope of this worker is per request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done with a final commit: 0bd66fa
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I re-reviewed only c8y_firmware_manager. I have some trivial comments, so I would approve it.
// 2. Operation timeouts from the TimerActor for requests for which the child devices don't respond within the timeout window | ||
// 3. Download results from the DownloaderActor for firmware download requests | ||
|
||
// 3. RequestOutcome sent back by the background workers once the firmware request has been fully processed or failed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trivial ;)
// 3. RequestOutcome sent back by the background workers once the firmware request has been fully processed or failed | |
// 2. RequestOutcome sent back by the background workers once the firmware request has been fully processed or failed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed: e37bc2e
} | ||
FirmwareInput::IdDownloadResult((id, result)) => { | ||
self.process_downloaded_firmware(&id, result).await? | ||
FirmwareInput::OperationOutcome(outcome) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah nice, timeout moved to the worker!
}, | ||
|
||
#[error("Child device {child_id} did not respond within the timeout interval of {time_limit_sec}sec. Operation ID={operation_id}")] | ||
FromTimeout { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(trivial) I think we use the From
prefix for the errors inheriting from somewhere. Hence, here you don't need From
.
FromTimeout { | |
ExceedTimelimit { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed: e37bc2e
@@ -272,6 +251,7 @@ async fn handle_request_child_device_with_failed_download() -> Result<(), DynErr | |||
// Assert EXECUTING SmartREST MQTT message and FAILED SmartREST MQTT message due to missing 'cache' directory. | |||
mqtt_message_box | |||
.assert_received([ | |||
// FIXME: executing status is no more sent when download fails |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You fixed it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this has been fixed by 2f10f93
e37bc2e
to
ece233d
Compare
This method was used only in 3 tests, while adding extra complexity. Signed-off-by: Didier Wenzek <[email protected]>
Signed-off-by: Didier Wenzek <[email protected]>
Signed-off-by: Didier Wenzek <[email protected]>
- The `Clone` trait is no more implemented by `DynSender` This was the main reason preventing the implementation of useful `Into` convertions between misc `DynSender`. - The `adapt()` function as been deprecated as this was a workaround the lack of `Into` convertions between misc `DynSender`. - The `sender_clone()` method is to be used both to `clone` and adapt a sender. Signed-off-by: Didier Wenzek <[email protected]>
Instead of using an array of clients identified by their indexes, the client address is attached to the requests. Signed-off-by: Didier Wenzek <[email protected]>
This is an intermediate step. The next one will be to spawn a task to handle the download and subsequent steps so a the manager is not blocked during that time. The failing test (handle_child_response_while_busy_downloading) is a witness of this missing requirement. Signed-off-by: Didier Wenzek <[email protected]>
The idea is to have a worker which is Clone with methods that can be spawned in background tasks. The actor itself keeps only stateful things that cannot be shared (the message receiver and the cache of pending operations). This is an intermediate step. All the methods are for now still attached to the actor. The next step will be to move out from the actor to the worker all the methods that doesn't touch the actor state. Signed-off-by: Didier Wenzek <[email protected]>
…less methods Signed-off-by: Didier Wenzek <[email protected]>
Signed-off-by: Didier Wenzek <[email protected]>
Execute the whole firmware operation in a single async method. i.e. with no state cached in the actor. Signed-off-by: Didier Wenzek <[email protected]>
ece233d
to
917e3e1
Compare
The commit message has been fixed. |
Signed-off-by: Didier Wenzek <[email protected]>
21864b4
to
0bd66fa
Compare
Proposed changes
In this PR, @Bravo555 rightly argued that using async/await would be a more maintainable way to handle concurrent download and upload tasks in the c8y mapper, compared to the current design where an actor has to explicit maintain the intermediate states of these concurrent tasks (i.e. sending a request to a server actor, persisting in the actor state the fact that a response is expected from that actor along all the data required to process the response, proceeding with other messages, finally receiving and processing the response with the restored state).
This PR proposes a way to better combine async/await tasks and actors.
tedge_actors::Sender
which are not necessarilyClone
.tedge_actors
traits. This is currently the 4th commit, before merging I will have to move this commit in 3rd position.oneshot::Sender
into atedge_actors::Sender
,which can be then used in
RequestEnvelope<Request, Response>
: messages wrapping a request with atedge_actors::Sender
to be used for the response.KeyedMessageBox
es and to replace them with boxes expectingRequestEnvelope
s. A server has no more a fixed number of pre-registered clients nor a specific sender for its responses. Instead, the clients send areply_to
response sender along their requests.One test is still failing and will have to be fixed: the timer actor behaves as expected on shutdown but the test fails to notice that the actor has been dropped as expected. Fixed by 5126f58The POC is applied to the
c8y_firmware_manager
. The point is to remove the need forreqs_pending_download: HashMap<String, SmartRestFirmwareRequest>
where are stored the data to resume the request handling once the firmware has been successfully downloaded. We want to replace this explicit state management with regular async/await code.As the firmware actor now awaits for the firmware to be fully downloaded before sending a message to the child device, the firmware actor ignores any other requests till the firmware is fully downloaded. This issue is captured by the
handle_child_response_while_busy_downloading
test, which is now failing. The goal of the following steps is to make this test pass again.struct Worker
which isClone
and can bespawned in background tasks.
FirmwareManagerWorker
has no method and only holds data used by theFirmwareManagerActor
. The following commit 1cd4f03 fixes that: are moved to the worker all the methods that doesn't access the actor state.handle_child_response_while_busy_downloading
test.FirmwareManagerWorker
has been interleaved with theFirmwareManagerActor
implementation. This will have to be fixed before merge.Types of changes
Paste Link to the issue
Checklist
cargo fmt
as mentioned in CODING_GUIDELINEScargo clippy
as mentioned in CODING_GUIDELINESFurther comments