-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make publish_data
wait until the DataChannel's bufferedAmount becomes low.
#545
Conversation
it seems like you haven't added any nanpa changeset files to this PR. if this pull request includes changes to code, make sure to add a changeset, by writing a file to
refer to the manpage for more information. |
publish_data
wait until the DataChannel's bufferedAmount becomes low.
rtc_events::forward_dc_events(&mut lossy_dc, rtc_emitter.clone()); | ||
rtc_events::forward_dc_events(&mut reliable_dc, rtc_emitter); | ||
rtc_events::forward_dc_events(&mut lossy_dc, DataPacketKind::Lossy, rtc_emitter.clone()); | ||
rtc_events::forward_dc_events(&mut reliable_dc, DataPacketKind::Reliable, rtc_emitter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
was this a separate bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I just added it as a new argument because the newly added Event handler needed DataPacketKind.
RtcEvent::DataChannelBufferedAmountChange { sent: _, amount, kind } => { | ||
match kind { | ||
DataPacketKind::Lossy => { | ||
// Do nothing at this moment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your assessment that this issue isn't as pressing on the lossy channel, but one thing we'd want to avoid is that the data channel just errors out if the buffered_amount surpassing an internal maximum value. I think @theomonnom mentioned that right now it will raise an error in this case?
For that reason I think it could also make sense to include the same logic for the lossy data channel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we should use the same logic to avoid internal RtcError
@@ -958,6 +991,9 @@ impl SessionInner { | |||
kind: DataPacketKind, | |||
) -> Result<(), EngineError> { | |||
self.ensure_publisher_connected(kind).await?; | |||
if kind == DataPacketKind::Reliable { | |||
self.wait_buffer_low().await?; | |||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it guaranteed that successive calls to publish_data
are handled in order here?
I'm imagining a scenario where a user doesn't await publish_data
and just fires a bunch of them in very quick succession, and we'd want to make sure that the next self.data_channel().send(...)
is still processed in the correct order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are no issues with the usage of notify
, but I found one part that isn't ideal. I'll explain it in the comment below.
if amount <= threshold { | ||
return Ok(()); | ||
} | ||
self.reliable_dc_buffered_amount_low_notify.notified().await; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not very familiar with tokio notify, so I'll ask a couple of questions that might be a bit naive:
- what if
wait_for_buffer_low
is called exactly whenamount
drops underthreshold
? Is there a potential for a race here, where the new call will succeed before previous calls towait_buffer_low
will be notified ? - if there are multiple calls to
self.reliable_dc_buffered_amount_low_notify.notified().await
because the threshold is exceeded, how would subsequent callers get notified? thebuffered_amount_changed
has anotify_once
so it will only notify one caller when the buffered amount changes. I'm thinking of a scenario where the buffer drops below the threshold inbuffere_amount_changed
and there would be enough headroom to forward all pending packets. We would then still wait for each packet to be sent first, because we don't receive abuffered_amount_changed
event until another packet has been sent, right? I'm not sure if that is actually a problem but I think at least we'd have to be sure thatbuffered_amount_changed
is reliably fired for every packet
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure if this is the same with your concern, but I found one part of my code that isn't ideal. Following is the content of wait_for_buffer_low
. The code checks the threshold and immediately returns if there's no issue, but there's a possibility that a notify waiter already exists when this code is executed. This isn't ideal, so let me fix it.
rust-sdks/livekit/src/rtc_engine/rtc_session.rs
Lines 1008 to 1012 in 8887378
if amount <= threshold { | |
return Ok(()); | |
} | |
self.reliable_dc_buffered_amount_low_notify.notified().await; | |
Ok(()) |
I'm thinking of a scenario where the buffer drops below the threshold in buffere_amount_changed and there would be enough headroom to forward all pending packets. We would then still wait for each packet to be sent first, because we don't receive a buffered_amount_changed event until another packet has been sent, right? I'm not sure if that is actually but I think at least we'd have to be sure that buffered_amount_changed is reliably fired for every packet
Yes, with my current code, calling publish_data
multiple times could result in each call waiting for a notify. However, since DataChannel.send
sends at least one buffer amount change event every time, I don't think it will end up waiting indefinitely. That said, as you pointed out, I agree that more testing is needed in this point.
It's great that it's configurable, but we might want to use a different threshold default value. This discussion here might be helpful for choosing some sane default values? |
I have made the following changes:
Regarding 2, I believe that lowering latency is not an option to support lossy data channels. However, in my previous implementation, when multiple data packets were sent in a short period, a slight delay occurred even if the buffer had sufficient capacity. To eliminate this delay, I created a new worker dedicated to data channels, which now manages the buffer by itself. |
For the default value for buffered amount low threshold, I am still not sure what value should be ideal. 🤔 |
For a default value, maybe just half of the max value? So |
Isn't the max value 65536? |
livekit-ffi/protocol/ffi.proto
Outdated
GetDataChannelBufferedAmountLowThresholdRequest get_data_channel_buffered_amount_low_threshold = 46; | ||
SetDataChannelBufferedAmountLowThresholdRequest set_data_channel_buffered_amount_low_threshold = 47; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should "request" the buffered amount low threshold. Since datachannels are part of the roomm, let's add it to our RoomInfo?
The issue if we do a request, then on the python side, getting this threshold is going to be an async operation but this isn't ideal.
Ideally it is just:
def buffered_amount_low_threshold(self) -> int:
return self._info.buffered_amount_low_threshold
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That super makes sense to me. Let me fix it
if let Err(_) = tx.send(result) { | ||
log::error!("failed to send publish_data result"); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can ignore this failure? Not a big deal if the user cancel their call to publish_data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 0aad6d1
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise lgtm!
that's the theoretic max value for a data packet, I think? Apparently the maximum bufferedAmount is 16MB in Chrome, so my suggestion was to use half of that as the threshold value, but we might also just do something a lot smaller like 1 or 2MB to be safe. |
Updated:
|
livekit-ffi/protocol/room.proto
Outdated
required DataChannelOptions lossy_dc_options = 4; | ||
required DataChannelOptions reliable_dc_options = 5; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit since we have one field I would have flatten the dc options
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix. Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed in 6c73254.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm!
This pull request modifies
publish_data
to wait until thebufferedAmount
of a DataChannel falls below a specified threshold before sending data, to support DataStream functionality.The main changes are as follows:
bufferedAmount
property from libwebrtc's DataChannel.bufferedAmountChange
callback of the DataChannel to anRtcEvent
, allowing the rtc session to retrieve the current bufferedAmount.buffered_amount_low_threshold
via FFI. (Default: 0)publish_data
to monitor the values in 2 and 3 and wait to send data until the bufferedAmount falls below the specified threshold.Currently, the logic to wait for the buffer to become low has only been implemented for reliable DataChannels. In fact, all the features added in this update are exclusive to reliable DataChannels. Implementing this for lossy DataChannels didn’t make sense to me, but it might be worth discussing further.