Skip to content

Commit

Permalink
feat(sdk)!: retry broadcast operations (#2337)
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek authored Nov 26, 2024
1 parent 7393162 commit 840dcec
Show file tree
Hide file tree
Showing 21 changed files with 333 additions and 192 deletions.
2 changes: 1 addition & 1 deletion packages/dapi-grpc/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ impl MappingConfig {
create_dir_all(&self.out_dir)?;

self.builder
.compile(&[self.protobuf_file], &self.proto_includes)
.compile_protos(&[self.protobuf_file], &self.proto_includes)
}
}

Expand Down
17 changes: 15 additions & 2 deletions packages/rs-dapi-client/src/dapi_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,11 @@ impl DapiRequestExecutor for DapiClient {
.address_list
.write()
.expect("can't get address list for write");

tracing::warn!(
?address,
?error,
"received server error, banning address"
);
address_list.ban_address(&address).map_err(|error| {
ExecutionError {
inner: DapiClientError::AddressList(error),
Expand All @@ -236,9 +240,18 @@ impl DapiRequestExecutor for DapiClient {
address: Some(address.clone()),
}
})?;
} else {
tracing::debug!(
?address,
?error,
"received server error, we should ban the node but banning is disabled"
);
}
} else {
tracing::trace!(?error, "received error");
tracing::debug!(
?error,
"received server error, most likely the request is invalid"
);
}
}
};
Expand Down
73 changes: 73 additions & 0 deletions packages/rs-dapi-client/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,18 @@ where
/// Result of request execution
pub type ExecutionResult<R, E> = Result<ExecutionResponse<R>, ExecutionError<E>>;

impl<R, E> From<ExecutionResponse<R>> for ExecutionResult<R, E> {
fn from(response: ExecutionResponse<R>) -> Self {
ExecutionResult::<R, E>::Ok(response)
}
}

impl<R, E> From<ExecutionError<E>> for ExecutionResult<R, E> {
fn from(e: ExecutionError<E>) -> Self {
ExecutionResult::<R, E>::Err(e)
}
}

impl<R, E> IntoInner<Result<R, E>> for ExecutionResult<R, E> {
fn into_inner(self) -> Result<R, E> {
match self {
Expand All @@ -145,3 +157,64 @@ where
}
}
}

/// Convert Result<T,TE> to ExecutionResult<R,E>, taking context from ExecutionResponse.
pub trait WrapToExecutionResult<R, RE, W>: Sized {
/// Convert self (eg. some [Result]) to [ExecutionResult], taking context information from `W` (eg. ExecutionResponse).
///
/// This function simplifies processing of results by wrapping them into ExecutionResult.
/// It is useful when you have execution result retrieved in previous step and you want to
/// add it to the result of the current step.
///
/// Useful when chaining multiple commands and you want to keep track of retries and address.
///
/// ## Example
///
/// ```rust
/// use rs_dapi_client::{ExecutionResponse, ExecutionResult, WrapToExecutionResult};
///
/// fn some_request() -> ExecutionResult<i8, String> {
/// Ok(ExecutionResponse {
/// inner: 42,
/// retries: 123,
/// address: "http://127.0.0.1".parse().expect("create mock address"),
/// })
/// }
///
/// fn next_step() -> Result<i32, String> {
/// Err("next error".to_string())
/// }
///
/// let response = some_request().expect("request should succeed");
/// let result: ExecutionResult<i32, String> = next_step().wrap_to_execution_result(&response);
///
/// if let ExecutionResult::Err(error) = result {
/// assert_eq!(error.inner, "next error");
/// assert_eq!(error.retries, 123);
/// } else {
/// panic!("Expected error");
/// }
/// ```
fn wrap_to_execution_result(self, result: &W) -> ExecutionResult<R, RE>;
}

impl<R, RE, TR, IR, IRE> WrapToExecutionResult<R, RE, ExecutionResponse<TR>> for Result<IR, IRE>
where
R: From<IR>,
RE: From<IRE>,
{
fn wrap_to_execution_result(self, result: &ExecutionResponse<TR>) -> ExecutionResult<R, RE> {
match self {
Ok(r) => ExecutionResult::Ok(ExecutionResponse {
inner: r.into(),
retries: result.retries,
address: result.address.clone(),
}),
Err(e) => ExecutionResult::Err(ExecutionError {
inner: e.into(),
retries: result.retries,
address: Some(result.address.clone()),
}),
}
}
}
1 change: 1 addition & 0 deletions packages/rs-dapi-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ pub use dapi_client::{DapiClient, DapiClientError};
pub use dump::DumpData;
pub use executor::{
DapiRequestExecutor, ExecutionError, ExecutionResponse, ExecutionResult, InnerInto, IntoInner,
WrapToExecutionResult,
};
use futures::{future::BoxFuture, FutureExt};
pub use request_settings::RequestSettings;
Expand Down
5 changes: 4 additions & 1 deletion packages/rs-dapi-client/src/request_settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ const DEFAULT_BAN_FAILED_ADDRESS: bool = true;
pub struct RequestSettings {
/// Timeout for establishing a connection.
pub connect_timeout: Option<Duration>,
/// Timeout for a request.
/// Timeout for single request (soft limit).
///
/// Note that the total maximum time of execution can exceed `(timeout + connect_timeout) * retries`
/// as it accounts for internal processing time between retries.
pub timeout: Option<Duration>,
/// Number of retries in case of failed requests. If max retries reached, the last error is returned.
/// 1 means one request and one retry in case of error, etc.
Expand Down
34 changes: 32 additions & 2 deletions packages/rs-dapi-client/src/transport/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,38 @@ impl CanRetry for dapi_grpc::tonic::Status {
}
}

/// A shortcut to link between gRPC request type, response type, client and its
/// method in order to represent it in a form of types and data.
/// Macro to implement the `TransportRequest` trait for a given request type, response type, client type, and settings.
///
/// # Parameters
///
/// - `$request:ty`: The request type for which the `TransportRequest` trait will be implemented.
/// - `$response:ty`: The response type returned by the transport request.
/// - `$client:ty`: The client type used to execute the transport request (eg. generated by `tonic` crate).
/// - `$settings:expr`: The settings to be used for the transport request; these settings will override client's
/// default settings, but can still be overriden by arguments to
/// the [`DapiRequestExecutor::execute`](crate::DapiRequestExecutor::execute) method.
/// - `$($method:tt)+`: The method of `$client` to be called to execute the request.
///
/// # Example
///
/// ```compile_fail
/// impl_transport_request_grpc!(
/// MyRequestType,
/// MyResponseType,
/// MyClientType,
/// my_settings,
/// my_method
/// );
/// ```
///
/// This will generate an implementation of the `TransportRequest` trait for `MyRequestType`
/// that uses `MyClientType` to execute the `my_method` method, with the specified `my_settings`.
///
/// The generated implementation will:
/// - Define the associated types `Client` and `Response`.
/// - Set the `SETTINGS_OVERRIDES` constant to the provided settings.
/// - Implement the `method_name` function to return the name of the method as a string.
/// - Implement the `execute_transport` function to execute the transport request using the provided client and settings.
macro_rules! impl_transport_request_grpc {
($request:ty, $response:ty, $client:ty, $settings:expr, $($method:tt)+) => {
impl TransportRequest for $request {
Expand Down
4 changes: 2 additions & 2 deletions packages/rs-dpp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ dashcore = { git = "https://github.com/dashpay/rust-dashcore", features = [
"signer",
"serde",
"bls",
"eddsa"
"eddsa",
], default-features = false, tag = "0.32.0" }
env_logger = { version = "0.11" }
getrandom = { version = "0.2", features = ["js"] }
Expand All @@ -56,7 +56,7 @@ platform-version = { path = "../rs-platform-version" }
platform-versioning = { path = "../rs-platform-versioning" }
platform-serialization = { path = "../rs-platform-serialization" }
platform-serialization-derive = { path = "../rs-platform-serialization-derive" }
derive_more = { version = "1.0", features = ["from", "display"] }
derive_more = { version = "1.0", features = ["from", "display", "try_into"] }
nohash-hasher = "0.2.0"
rust_decimal = "1.29.1"
rust_decimal_macros = "1.29.1"
Expand Down
2 changes: 1 addition & 1 deletion packages/rs-dpp/src/state_transition/proof_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::voting::votes::Vote;
use platform_value::Identifier;
use std::collections::BTreeMap;

#[derive(Debug)]
#[derive(Debug, strum::Display, derive_more::TryInto)]
pub enum StateTransitionProofResult {
VerifiedDataContract(DataContract),
VerifiedIdentity(Identity),
Expand Down
2 changes: 1 addition & 1 deletion packages/rs-sdk/src/core/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl Sdk {
self.execute(core_transactions_stream, RequestSettings::default())
.await
.into_inner()
.map_err(|e| Error::DapiClientError(e.to_string()))
.map_err(|e| e.into())
}

/// Waits for a response for the asset lock proof
Expand Down
17 changes: 15 additions & 2 deletions packages/rs-sdk/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
//! Definitions of errors
use dapi_grpc::tonic::Code;
use dpp::consensus::ConsensusError;
use dpp::serialization::PlatformDeserializable;
use dpp::version::PlatformVersionError;
Expand Down Expand Up @@ -56,6 +57,10 @@ pub enum Error {
/// SDK operation timeout reached error
#[error("SDK operation timeout {} secs reached: {1}", .0.as_secs())]
TimeoutReached(Duration, String),

/// Returned when an attempt is made to create an object that already exists in the system
#[error("Object already exists: {0}")]
AlreadyExists(String),
/// Generic error
// TODO: Use domain specific errors instead of generic ones
#[error("SDK error: {0}")]
Expand All @@ -78,6 +83,7 @@ pub enum Error {
impl From<DapiClientError> for Error {
fn from(value: DapiClientError) -> Self {
if let DapiClientError::Transport(TransportError::Grpc(status)) = &value {
// If we have some consensus error metadata, we deserialize it and return as ConsensusError
if let Some(consensus_error_value) = status
.metadata()
.get_bin("dash-serialized-consensus-error-bin")
Expand All @@ -88,11 +94,18 @@ impl From<DapiClientError> for Error {
.map(|consensus_error| {
Self::Protocol(ProtocolError::ConsensusError(Box::new(consensus_error)))
})
.unwrap_or_else(Self::Protocol);
.unwrap_or_else(|e| {
tracing::debug!("Failed to deserialize consensus error: {}", e);
Self::Protocol(e)
});
}
// Otherwise we parse the error code and act accordingly
if status.code() == Code::AlreadyExists {
return Self::AlreadyExists(status.message().to_string());
}
}

Self::DapiClientError(format!("{:?}", value))
Self::DapiClientError(value.to_string())
}
}

Expand Down
3 changes: 0 additions & 3 deletions packages/rs-sdk/src/platform/transition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
pub mod broadcast;
pub(crate) mod broadcast_identity;
pub mod broadcast_request;
pub(crate) mod context;
pub mod purchase_document;
pub mod put_contract;
pub mod put_document;
Expand All @@ -16,6 +15,4 @@ pub mod update_price_of_document;
pub mod vote;
pub mod withdraw_from_identity;

pub use context::*;

pub use txid::TxId;
Loading

0 comments on commit 840dcec

Please sign in to comment.