Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of share_room_key #2862

Merged
merged 11 commits into from
Nov 20, 2023
45 changes: 23 additions & 22 deletions crates/matrix-sdk-crypto/src/identities/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -431,28 +431,6 @@ impl Device {
self.inner.encrypt(self.verification_machine.store.inner(), event_type, content).await
}

pub(crate) async fn maybe_encrypt_room_key(
&self,
session: OutboundGroupSession,
) -> OlmResult<MaybeEncryptedRoomKey> {
let content = session.as_content().await;
let message_index = session.message_index().await;
let event_type = content.event_type();

match self.encrypt(event_type, content).await {
Ok((session, encrypted)) => Ok(MaybeEncryptedRoomKey::Encrypted {
share_info: ShareInfo::new_shared(session.sender_key().to_owned(), message_index),
used_session: session,
message: encrypted.cast(),
}),

Err(OlmError::MissingSession | OlmError::EventError(EventError::MissingSenderKey)) => {
Ok(MaybeEncryptedRoomKey::Withheld { code: WithheldCode::NoOlm })
}
Err(e) => Err(e),
}
}

/// Encrypt the given inbound group session as a forwarded room key for this
/// device.
pub async fn encrypt_room_key_for_forwarding(
Expand Down Expand Up @@ -788,6 +766,29 @@ impl ReadOnlyDevice {
}
}

pub(crate) async fn maybe_encrypt_room_key(
&self,
store: &CryptoStoreWrapper,
session: OutboundGroupSession,
) -> OlmResult<MaybeEncryptedRoomKey> {
let content = session.as_content().await;
let message_index = session.message_index().await;
let event_type = content.event_type();

match self.encrypt(store, event_type, content).await {
Ok((session, encrypted)) => Ok(MaybeEncryptedRoomKey::Encrypted {
share_info: ShareInfo::new_shared(session.sender_key().to_owned(), message_index),
used_session: session,
message: encrypted.cast(),
}),

Err(OlmError::MissingSession | OlmError::EventError(EventError::MissingSenderKey)) => {
Ok(MaybeEncryptedRoomKey::Withheld { code: WithheldCode::NoOlm })
}
Err(e) => Err(e),
}
}

/// Update a device with a new device keys struct.
pub(crate) fn update_device(&mut self, device_keys: &DeviceKeys) -> Result<(), SignatureError> {
self.verify_device_keys(device_keys)?;
Expand Down
22 changes: 16 additions & 6 deletions crates/matrix-sdk-crypto/src/session_manager/group_sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use crate::{
error::{EventError, MegolmResult, OlmResult},
identities::device::MaybeEncryptedRoomKey,
olm::{InboundGroupSession, OutboundGroupSession, Session, ShareInfo, ShareState},
store::{Changes, Result as StoreResult, Store},
store::{Changes, CryptoStoreWrapper, Result as StoreResult, Store},
types::events::{room::encrypted::RoomEncryptedEventContent, room_key_withheld::WithheldCode},
Device, EncryptionSettings, OlmError, ToDeviceRequest,
};
Expand Down Expand Up @@ -263,6 +263,7 @@ impl GroupSessionManager {
/// Encrypt the given content for the given devices and create a to-device
/// requests that sends the encrypted content to them.
async fn encrypt_session_for(
store: Arc<CryptoStoreWrapper>,
group_session: OutboundGroupSession,
devices: Vec<Device>,
) -> OlmResult<(
Expand All @@ -283,14 +284,21 @@ impl GroupSessionManager {
let mut share_infos = BTreeMap::new();
let mut withheld_devices = Vec::new();

let encrypt = |device: Device, session: OutboundGroupSession| async move {
let encryption_result = device.maybe_encrypt_room_key(session).await?;
// XXX is there a way to do this that doesn't involve cloning the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think there is, you are moving the Device into a spawned task. The task itself might be moved between threads.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my thinking went: we have an Arc<CryptoStoreWrapper> which is passed into this method, and which we know will live as long as any of those tasks (because the tasks are all spawned and join_all-ed inside this method). So shouldn't I be able to borrow a reference to the CryptoStoreWrapper contained in that Arc, and just pass a copy of that reference into each spawned task?

// `Arc<CryptoStoreWrapper>` fore each device?
let encrypt = |store: Arc<CryptoStoreWrapper>,
device: Device,
session: OutboundGroupSession| async move {
let encryption_result =
device.inner.maybe_encrypt_room_key(store.as_ref(), session).await?;

Ok::<_, OlmError>(DeviceResult { device, maybe_encrypted_room_key: encryption_result })
};

let tasks: Vec<_> =
devices.iter().map(|d| spawn(encrypt(d.clone(), group_session.clone()))).collect();
let tasks: Vec<_> = devices
.iter()
.map(|d| spawn(encrypt(store.clone(), d.clone(), group_session.clone())))
.collect();

let results = join_all(tasks).await;

Expand Down Expand Up @@ -428,12 +436,13 @@ impl GroupSessionManager {
}

async fn encrypt_request(
store: Arc<CryptoStoreWrapper>,
chunk: Vec<Device>,
outbound: OutboundGroupSession,
sessions: GroupSessionCache,
) -> OlmResult<(Vec<Session>, Vec<(Device, WithheldCode)>)> {
let (id, request, share_infos, used_sessions, no_olm) =
Self::encrypt_session_for(outbound.clone(), chunk).await?;
Self::encrypt_session_for(store, outbound.clone(), chunk).await?;

if !request.messages.is_empty() {
trace!(
Expand Down Expand Up @@ -520,6 +529,7 @@ impl GroupSessionManager {
.chunks(Self::MAX_TO_DEVICE_MESSAGES)
.map(|chunk| {
spawn(Self::encrypt_request(
self.store.crypto_store(),
chunk.to_vec(),
group_session.clone(),
self.sessions.clone(),
Expand Down
4 changes: 4 additions & 0 deletions crates/matrix-sdk-crypto/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,10 @@ impl Store {
) -> Result<RoomKeyImportResult> {
self.import_room_keys(exported_keys, false, progress_listener).await
}

pub(crate) fn crypto_store(&self) -> Arc<CryptoStoreWrapper> {
self.inner.store.clone()
}
}

impl Deref for Store {
Expand Down