Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc v2: submitAndWatch replace old messages if it's lagging #4901

Merged
merged 9 commits into from
Jul 29, 2024
10 changes: 8 additions & 2 deletions substrate/client/consensus/beefy/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use parking_lot::RwLock;
use sp_consensus_beefy::AuthorityIdBound;
use std::sync::Arc;

use sc_rpc::{utils::pipe_from_stream, SubscriptionTaskExecutor};
use sc_rpc::{
utils::{BoundedVecDeque, PendingSubscription},
SubscriptionTaskExecutor,
};
use sp_application_crypto::RuntimeAppPublic;
use sp_runtime::traits::Block as BlockT;

Expand Down Expand Up @@ -145,7 +148,10 @@ where
.subscribe(100_000)
.map(|vfp| notification::EncodedVersionedFinalityProof::new::<Block, AuthorityId>(vfp));

sc_rpc::utils::spawn_subscription_task(&self.executor, pipe_from_stream(pending, stream));
sc_rpc::utils::spawn_subscription_task(
&self.executor,
PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
);
}

async fn latest_finalized(&self) -> Result<Block::Hash, Error> {
Expand Down
10 changes: 8 additions & 2 deletions substrate/client/consensus/grandpa/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,10 @@ use finality::{EncodedFinalityProof, RpcFinalityProofProvider};
use notification::JustificationNotification;
use report::{ReportAuthoritySet, ReportVoterState, ReportedRoundStates};
use sc_consensus_grandpa::GrandpaJustificationStream;
use sc_rpc::{utils::pipe_from_stream, SubscriptionTaskExecutor};
use sc_rpc::{
utils::{BoundedVecDeque, PendingSubscription},
SubscriptionTaskExecutor,
};
use sp_runtime::traits::{Block as BlockT, NumberFor};

/// Provides RPC methods for interacting with GRANDPA.
Expand Down Expand Up @@ -108,7 +111,10 @@ where
},
);

sc_rpc::utils::spawn_subscription_task(&self.executor, pipe_from_stream(pending, stream));
sc_rpc::utils::spawn_subscription_task(
&self.executor,
PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
);
}

async fn prove_finality(
Expand Down
11 changes: 5 additions & 6 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ use codec::Encode;
use futures::{channel::oneshot, future::FutureExt};
use jsonrpsee::{
core::async_trait, server::ResponsePayload, types::SubscriptionId, ConnectionId, Extensions,
MethodResponseFuture, PendingSubscriptionSink, SubscriptionSink,
MethodResponseFuture, PendingSubscriptionSink,
};
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockchainEvents, CallExecutor, ChildInfo, ExecutorProvider, StorageKey,
StorageProvider,
};
use sc_rpc::utils::to_sub_message;
use sc_rpc::utils::Subscription;
use sp_api::CallApiAt;
use sp_blockchain::{Error as BlockChainError, HeaderBackend, HeaderMetadata};
use sp_core::{traits::CallContext, Bytes};
Expand Down Expand Up @@ -158,7 +158,7 @@ impl<BE: Backend<Block>, Block: BlockT, Client> ChainHead<BE, Block, Client> {
}

/// Helper to convert the `subscription ID` to a string.
pub fn read_subscription_id_as_string(sink: &SubscriptionSink) -> String {
pub fn read_subscription_id_as_string(sink: &Subscription) -> String {
match sink.subscription_id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.into_owned().into(),
Expand Down Expand Up @@ -213,7 +213,7 @@ where
return
};

let Ok(sink) = pending.accept().await else { return };
let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };

let sub_id = read_subscription_id_as_string(&sink);
// Keep track of the subscription.
Expand All @@ -223,8 +223,7 @@ where
// Inserting the subscription can only fail if the JsonRPSee generated a duplicate
// subscription ID.
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription already accepted", sub_id);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
let _ = sink.send(&FollowEvent::<String>::Stop).await;
return
};
debug!(target: LOG_TARGET, "[follow][id={:?}] Subscription accepted", sub_id);
Expand Down
22 changes: 8 additions & 14 deletions substrate/client/rpc-spec-v2/src/chain_head/chain_head_follow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ use futures::{
stream::{self, Stream, StreamExt},
};
use futures_util::future::Either;
use jsonrpsee::SubscriptionSink;
use log::debug;
use sc_client_api::{
Backend, BlockBackend, BlockImportNotification, BlockchainEvents, FinalityNotification,
};
use sc_rpc::utils::to_sub_message;
use sc_rpc::utils::Subscription;
use schnellru::{ByLength, LruMap};
use sp_api::CallApiAt;
use sp_blockchain::{
Expand Down Expand Up @@ -597,7 +596,7 @@ where
&mut self,
startup_point: &StartupPoint<Block>,
mut stream: EventStream,
sink: SubscriptionSink,
sink: Subscription,
rx_stop: oneshot::Receiver<()>,
) -> Result<(), SubscriptionManagementError>
where
Expand Down Expand Up @@ -632,23 +631,20 @@ where
self.sub_id,
err
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
_ = sink.send(&FollowEvent::<String>::Stop).await;
return Err(err)
},
};

for event in events {
let msg = to_sub_message(&sink, &event);
if let Err(err) = sink.send(msg).await {
if let Err(err) = sink.send(&event).await {
// Failed to submit event.
debug!(
target: LOG_TARGET,
"[follow][id={:?}] Failed to send event {:?}", self.sub_id, err
);

let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
let _ = sink.send(&FollowEvent::<String>::Stop).await;
// No need to propagate this error further, the client disconnected.
return Ok(())
}
Expand All @@ -662,15 +658,14 @@ where
// - the substrate streams have closed
// - the `Stop` receiver was triggered internally (cannot hold the pinned block guarantee)
// - the client disconnected.
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
let _ = sink.send(&FollowEvent::<String>::Stop).await;
Ok(())
}

/// Generate the block events for the `chainHead_follow` method.
pub async fn generate_events(
&mut self,
sink: SubscriptionSink,
sink: Subscription,
sub_data: InsertedSubscriptionData<Block>,
) -> Result<(), SubscriptionManagementError> {
// Register for the new block and finalized notifications.
Expand Down Expand Up @@ -698,8 +693,7 @@ where
self.sub_id,
err
);
let msg = to_sub_message(&sink, &FollowEvent::<String>::Stop);
let _ = sink.send(msg).await;
let _ = sink.send(&FollowEvent::<String>::Stop).await;
return Err(err)
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use codec::Encode;
use jsonrpsee::rpc_params;
use sc_transaction_pool_api::{ChainEvent, MaintainedTransactionPool};
use sp_core::H256;
use std::sync::Arc;
use std::{sync::Arc, vec};
use substrate_test_runtime_client::AccountKeyring::*;
use substrate_test_runtime_transaction_pool::uxt;

Expand Down Expand Up @@ -149,3 +149,89 @@ async fn tx_with_pruned_best_block() {
let event: TransactionEvent<H256> = get_next_event_sub!(&mut sub);
assert_eq!(event, TransactionEvent::Finalized(TransactionBlock { hash: block_2, index: 0 }));
}

#[tokio::test]
async fn tx_slow_client_replace_old_messages() {
let (api, pool, client, tx_api, _exec_middleware, _pool_middleware) = setup_api_tx();
let block_1_header = api.push_block(1, vec![], true);
client.set_best_block(block_1_header.hash(), 1);

let uxt = uxt(Alice, ALICE_NONCE);
let xt = hex_string(&uxt.encode());

// The subscription itself has a buffer of length 1 and no way to create
// it without a buffer.
//
// Then `transactionWatch` has its own buffer of length 3 which leads to
// that it's limited to 5 items in the tests.
//
// 1. Send will complete immediately
// 2. Send will be pending in the subscription sink (not possible to cancel)
// 3. The rest of messages will be kept in a RingBuffer and older messages are replaced by newer
// items.
let mut sub = tx_api
.subscribe("transactionWatch_v1_submitAndWatch", rpc_params![&xt], 1)
.await
.unwrap();

// Import block 2 with the transaction included.
let block = api.push_block(2, vec![uxt.clone()], true);
let block_hash = block.hash();
let event = ChainEvent::NewBestBlock { hash: block_hash, tree_route: None };
pool.inner_pool.maintain(event).await;

let mut block2_hash = None;

// Import block 2 again without the transaction included.
for _ in 0..10 {
let block_not_imported = api.push_block(2, vec![], true);
let event = ChainEvent::NewBestBlock { hash: block_not_imported.hash(), tree_route: None };
pool.inner_pool.maintain(event).await;

let block2 = api.push_block(2, vec![uxt.clone()], true);
block2_hash = Some(block2.hash());
let event = ChainEvent::NewBestBlock { hash: block2.hash(), tree_route: None };

pool.inner_pool.maintain(event).await;
}

let block2_hash = block2_hash.unwrap();

// Finalize the transaction
let event = ChainEvent::Finalized { hash: block2_hash, tree_route: Arc::from(vec![]) };
pool.inner_pool.maintain(event).await;

// Hack to mimic a slow client.
tokio::time::sleep(std::time::Duration::from_secs(10)).await;

// Read the events.
let mut res: Vec<TransactionEvent<_>> = Vec::new();

while let Some(item) = tokio::time::timeout(std::time::Duration::from_secs(5), sub.next())
.await
.unwrap()
{
let (ev, _) = item.unwrap();
res.push(ev);
}

// BestBlockIncluded(None) is dropped and not seen.
let exp = vec![
// First message
TransactionEvent::Validated,
// Second message
TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
hash: block_hash,
index: 0,
})),
// Most recent 3 messages.
TransactionEvent::Validated,
TransactionEvent::BestChainBlockIncluded(Some(TransactionBlock {
hash: block2_hash,
index: 0,
})),
TransactionEvent::Finalized(TransactionBlock { hash: block2_hash, index: 0 }),
];

assert_eq!(res, exp);
}
27 changes: 16 additions & 11 deletions substrate/client/rpc-spec-v2/src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
use codec::Decode;
use futures::{StreamExt, TryFutureExt};
use jsonrpsee::{core::async_trait, PendingSubscriptionSink};
use sc_rpc::utils::{pipe_from_stream, to_sub_message};
use sc_rpc::utils::{RingBuffer, Subscription};
use sc_transaction_pool_api::{
error::IntoPoolError, BlockHash, TransactionFor, TransactionPool, TransactionSource,
TransactionStatus,
Expand Down Expand Up @@ -84,16 +84,14 @@ where
Err(e) => {
log::debug!(target: LOG_TARGET, "Extrinsic bytes cannot be decoded: {:?}", e);

let Ok(sink) = pending.accept().await else { return };
let Ok(sink) = pending.accept().await.map(Subscription::from) else { return };

// The transaction is invalid.
let msg = to_sub_message(
&sink,
&TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
let _ = sink
.send(&TransactionEvent::Invalid::<BlockHash<Pool>>(TransactionError {
error: "Extrinsic bytes cannot be decoded".into(),
}),
);
let _ = sink.send(msg).await;
}))
.await;
return
},
};
Expand All @@ -108,16 +106,23 @@ where
.unwrap_or_else(|e| Error::Verification(Box::new(e)))
});

let Ok(sink) = pending.accept().await.map(Subscription::from) else {
return;
};

match submit.await {
Ok(stream) => {
let stream = stream.filter_map(move |event| async move { handle_event(event) });
pipe_from_stream(pending, stream.boxed()).await;
let stream =
stream.filter_map(move |event| async move { handle_event(event) }).boxed();

// If the subscription is too slow older events will be overwritten.
sink.pipe_from_stream(stream, RingBuffer::new(3)).await;
},
Err(err) => {
// We have not created an `Watcher` for the tx. Make sure the
// error is still propagated as an event.
let event: TransactionEvent<<Pool::Block as BlockT>::Hash> = err.into();
pipe_from_stream(pending, futures::stream::once(async { event }).boxed()).await;
_ = sink.send(&event).await;
},
};
};
Expand Down
6 changes: 4 additions & 2 deletions substrate/client/rpc/src/author/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ mod tests;
use std::sync::Arc;

use crate::{
utils::{pipe_from_stream, spawn_subscription_task},
utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
SubscriptionTaskExecutor,
};

Expand Down Expand Up @@ -202,7 +202,9 @@ where
},
};

pipe_from_stream(pending, stream).await;
PendingSubscription::from(pending)
.pipe_from_stream(stream, BoundedVecDeque::default())
.await;
};

spawn_subscription_task(&self.executor, fut);
Expand Down
8 changes: 5 additions & 3 deletions substrate/client/rpc/src/chain/chain_full.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use super::{client_err, ChainBackend, Error};
use crate::{
utils::{pipe_from_stream, spawn_subscription_task},
utils::{spawn_subscription_task, BoundedVecDeque, PendingSubscription},
SubscriptionTaskExecutor,
};
use std::{marker::PhantomData, sync::Arc};
Expand Down Expand Up @@ -142,6 +142,8 @@ fn subscribe_headers<Block, Client, F, G, S>(
// we set up the stream and chain it to the stream. Consuming code would need to handle
// duplicates at the beginning of the stream though.
let stream = stream::iter(maybe_header).chain(stream());

spawn_subscription_task(executor, pipe_from_stream(pending, stream));
spawn_subscription_task(
executor,
PendingSubscription::from(pending).pipe_from_stream(stream, BoundedVecDeque::default()),
);
}
Loading
Loading