Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make publish_data wait until the DataChannel's bufferedAmount becomes low. #545

Merged
merged 23 commits into from
Jan 17, 2025

Conversation

typester
Copy link
Member

@typester typester commented Jan 11, 2025

This pull request modifies publish_data to wait until the bufferedAmount of a DataChannel falls below a specified threshold before sending data, to support DataStream functionality.

The main changes are as follows:

  1. Expose the bufferedAmount property from libwebrtc's DataChannel.
  2. Forward the bufferedAmountChange callback of the DataChannel to an RtcEvent, allowing the rtc session to retrieve the current bufferedAmount.
  3. Expose the ability for users to set and get the buffered_amount_low_threshold via FFI. (Default: 0)
  4. Update publish_data to monitor the values in 2 and 3 and wait to send data until the bufferedAmount falls below the specified threshold.

Currently, the logic to wait for the buffer to become low has only been implemented for reliable DataChannels. In fact, all the features added in this update are exclusive to reliable DataChannels. Implementing this for lossy DataChannels didn’t make sense to me, but it might be worth discussing further.

Copy link
Contributor

ilo-nanpa bot commented Jan 11, 2025

it seems like you haven't added any nanpa changeset files to this PR.

if this pull request includes changes to code, make sure to add a changeset, by writing a file to .nanpa/<unique-name>.kdl:

minor type="added" "Introduce frobnication algorithm"

refer to the manpage for more information.

@typester typester changed the title WIP: DataChannel related utility functions Make publish_data wait until the DataChannel's bufferedAmount becomes low. Jan 14, 2025
rtc_events::forward_dc_events(&mut lossy_dc, rtc_emitter.clone());
rtc_events::forward_dc_events(&mut reliable_dc, rtc_emitter);
rtc_events::forward_dc_events(&mut lossy_dc, DataPacketKind::Lossy, rtc_emitter.clone());
rtc_events::forward_dc_events(&mut reliable_dc, DataPacketKind::Reliable, rtc_emitter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a separate bug?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I just added it as a new argument because the newly added Event handler needed DataPacketKind.

@typester typester marked this pull request as ready for review January 14, 2025 06:26
RtcEvent::DataChannelBufferedAmountChange { sent: _, amount, kind } => {
match kind {
DataPacketKind::Lossy => {
// Do nothing at this moment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your assessment that this issue isn't as pressing on the lossy channel, but one thing we'd want to avoid is that the data channel just errors out if the buffered_amount surpassing an internal maximum value. I think @theomonnom mentioned that right now it will raise an error in this case?
For that reason I think it could also make sense to include the same logic for the lossy data channel

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think we should use the same logic to avoid internal RtcError

@@ -958,6 +991,9 @@ impl SessionInner {
kind: DataPacketKind,
) -> Result<(), EngineError> {
self.ensure_publisher_connected(kind).await?;
if kind == DataPacketKind::Reliable {
self.wait_buffer_low().await?;
}
Copy link
Contributor

@lukasIO lukasIO Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that successive calls to publish_data are handled in order here?
I'm imagining a scenario where a user doesn't await publish_data and just fires a bunch of them in very quick succession, and we'd want to make sure that the next self.data_channel().send(...) is still processed in the correct order.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there are no issues with the usage of notify, but I found one part that isn't ideal. I'll explain it in the comment below.

if amount <= threshold {
return Ok(());
}
self.reliable_dc_buffered_amount_low_notify.notified().await;
Copy link
Contributor

@lukasIO lukasIO Jan 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not very familiar with tokio notify, so I'll ask a couple of questions that might be a bit naive:

  • what if wait_for_buffer_low is called exactly when amount drops under threshold? Is there a potential for a race here, where the new call will succeed before previous calls to wait_buffer_low will be notified ?
  • if there are multiple calls to self.reliable_dc_buffered_amount_low_notify.notified().await because the threshold is exceeded, how would subsequent callers get notified? the buffered_amount_changed has a notify_once so it will only notify one caller when the buffered amount changes. I'm thinking of a scenario where the buffer drops below the threshold in buffere_amount_changed and there would be enough headroom to forward all pending packets. We would then still wait for each packet to be sent first, because we don't receive a buffered_amount_changed event until another packet has been sent, right? I'm not sure if that is actually a problem but I think at least we'd have to be sure that buffered_amount_changed is reliably fired for every packet

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is the same with your concern, but I found one part of my code that isn't ideal. Following is the content of wait_for_buffer_low. The code checks the threshold and immediately returns if there's no issue, but there's a possibility that a notify waiter already exists when this code is executed. This isn't ideal, so let me fix it.

if amount <= threshold {
return Ok(());
}
self.reliable_dc_buffered_amount_low_notify.notified().await;
Ok(())


I'm thinking of a scenario where the buffer drops below the threshold in buffere_amount_changed and there would be enough headroom to forward all pending packets. We would then still wait for each packet to be sent first, because we don't receive a buffered_amount_changed event until another packet has been sent, right? I'm not sure if that is actually but I think at least we'd have to be sure that buffered_amount_changed is reliably fired for every packet

Yes, with my current code, calling publish_data multiple times could result in each call waiting for a notify. However, since DataChannel.send sends at least one buffer amount change event every time, I don't think it will end up waiting indefinitely. That said, as you pointed out, I agree that more testing is needed in this point.

@lukasIO
Copy link
Contributor

lukasIO commented Jan 14, 2025

Expose the ability for users to set and get the buffered_amount_low_threshold via FFI. (Default: 0)

It's great that it's configurable, but we might want to use a different threshold default value. This discussion here might be helpful for choosing some sane default values?

@typester
Copy link
Member Author

I have made the following changes:

  1. Updated the FFI functions to set and get the buffered amount low threshold, extending support to lossy data channels in addition to the already supported reliable data channels.
  2. Created a dedicated worker task for data channels to enable a more robust message-sending mechanism.

Regarding 2, I believe that lowering latency is not an option to support lossy data channels. However, in my previous implementation, when multiple data packets were sent in a short period, a slight delay occurred even if the buffer had sufficient capacity.

To eliminate this delay, I created a new worker dedicated to data channels, which now manages the buffer by itself.

@typester
Copy link
Member Author

For the default value for buffered amount low threshold, I am still not sure what value should be ideal. 🤔

@typester typester requested review from lukasIO and bcherry January 15, 2025 19:59
@lukasIO
Copy link
Contributor

lukasIO commented Jan 16, 2025

For a default value, maybe just half of the max value? So 8192 ?

@theomonnom
Copy link
Member

theomonnom commented Jan 16, 2025

For a default value, maybe just half of the max value? So 8192 ?

Isn't the max value 65536?

Comment on lines 120 to 121
GetDataChannelBufferedAmountLowThresholdRequest get_data_channel_buffered_amount_low_threshold = 46;
SetDataChannelBufferedAmountLowThresholdRequest set_data_channel_buffered_amount_low_threshold = 47;
Copy link
Member

@theomonnom theomonnom Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should "request" the buffered amount low threshold. Since datachannels are part of the roomm, let's add it to our RoomInfo?

The issue if we do a request, then on the python side, getting this threshold is going to be an async operation but this isn't ideal.

Ideally it is just:

def buffered_amount_low_threshold(self) -> int:
	return self._info.buffered_amount_low_threshold

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That super makes sense to me. Let me fix it

Comment on lines 605 to 607
if let Err(_) = tx.send(result) {
log::error!("failed to send publish_data result");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can ignore this failure? Not a big deal if the user cancel their call to publish_data

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 0aad6d1

Copy link
Member

@theomonnom theomonnom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise lgtm!

@lukasIO
Copy link
Contributor

lukasIO commented Jan 16, 2025

For a default value, maybe just half of the max value? So 8192 ?

Isn't the max value 65536?

that's the theoretic max value for a data packet, I think?
But I was also missing a couple of zeros in my value, meant kB.

Apparently the maximum bufferedAmount is 16MB in Chrome, so my suggestion was to use half of that as the threshold value, but we might also just do something a lot smaller like 1 or 2MB to be safe.

@typester
Copy link
Member Author

Updated:

  • Removed Set function for buffered_amount_low_threshold from FFI, instead, added it to RoomInfo.
  • Added BufferedAmountLowThreasholdUpdated event so that FFI client can update room info

@typester typester requested a review from theomonnom January 16, 2025 21:05
Comment on lines 380 to 381
required DataChannelOptions lossy_dc_options = 4;
required DataChannelOptions reliable_dc_options = 5;
Copy link
Member

@theomonnom theomonnom Jan 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit since we have one field I would have flatten the dc options

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix. Thank you!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 6c73254.

Copy link
Member

@theomonnom theomonnom left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm!

@typester typester merged commit f162413 into main Jan 17, 2025
11 of 16 checks passed
@typester typester deleted the typester/data-stream branch January 17, 2025 17:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants