From 667ac8d2c32689f135615c9d98079a7952eae182 Mon Sep 17 00:00:00 2001 From: Eric Semeniuc <3838856+esemeniuc@users.noreply.github.com> Date: Sat, 2 Dec 2023 17:08:09 +0100 Subject: [PATCH] [JIT-1708] Fix TOC TOU condition for relayer and block engine config (#491) --- core/src/proxy/block_engine_stage.rs | 45 +++++++++++++++++----------- core/src/proxy/relayer_stage.rs | 27 ++++++++++------- 2 files changed, 43 insertions(+), 29 deletions(-) diff --git a/core/src/proxy/block_engine_stage.rs b/core/src/proxy/block_engine_stage.rs index 4128f5379f..5dd8510bad 100644 --- a/core/src/proxy/block_engine_stage.rs +++ b/core/src/proxy/block_engine_stage.rs @@ -148,9 +148,11 @@ impl BlockEngineStage { while !exit.load(Ordering::Relaxed) { // Wait until a valid config is supplied (either initially or by admin rpc) // Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination - if !Self::is_valid_block_engine_config(&block_engine_config.lock().unwrap()) { + let local_block_engine_config = block_engine_config.lock().unwrap().clone(); + if !Self::is_valid_block_engine_config(&local_block_engine_config) { sleep(CONNECTION_BACKOFF).await; } else if let Err(e) = Self::connect_auth_and_stream( + &local_block_engine_config, &block_engine_config, &cluster_info, &bundle_tx, @@ -183,7 +185,8 @@ impl BlockEngineStage { } async fn connect_auth_and_stream( - block_engine_config: &Arc>, + local_block_engine_config: &BlockEngineConfig, + global_block_engine_config: &Arc>, cluster_info: &Arc, bundle_tx: &Sender>, packet_tx: &Sender, @@ -194,17 +197,20 @@ impl BlockEngineStage { ) -> crate::proxy::Result<()> { // Get a copy of configs here in case they have changed at runtime let keypair = cluster_info.keypair().clone(); - let local_config = block_engine_config.lock().unwrap().clone(); - - let mut backend_endpoint = Endpoint::from_shared(local_config.block_engine_url.clone()) - .map_err(|_| { - ProxyError::BlockEngineConnectionError(format!( - "invalid block engine url value: {}", - local_config.block_engine_url - )) - })? - .tcp_keepalive(Some(Duration::from_secs(60))); - if local_config.block_engine_url.starts_with("https") { + + let mut backend_endpoint = + Endpoint::from_shared(local_block_engine_config.block_engine_url.clone()) + .map_err(|_| { + ProxyError::BlockEngineConnectionError(format!( + "invalid block engine url value: {}", + local_block_engine_config.block_engine_url + )) + })? + .tcp_keepalive(Some(Duration::from_secs(60))); + if local_block_engine_config + .block_engine_url + .starts_with("https") + { backend_endpoint = backend_endpoint .tls_config(tonic::transport::ClientTlsConfig::new()) .map_err(|_| { @@ -214,7 +220,10 @@ impl BlockEngineStage { })?; } - debug!("connecting to auth: {}", local_config.block_engine_url); + debug!( + "connecting to auth: {}", + local_block_engine_config.block_engine_url + ); let auth_channel = timeout(*connection_timeout, backend_endpoint.connect()) .await .map_err(|_| ProxyError::AuthenticationConnectionTimeout)? @@ -232,13 +241,13 @@ impl BlockEngineStage { datapoint_info!( "block_engine_stage-tokens_generated", - ("url", local_config.block_engine_url, String), + ("url", local_block_engine_config.block_engine_url, String), ("count", 1, i64), ); debug!( "connecting to block engine: {}", - local_config.block_engine_url + local_block_engine_config.block_engine_url ); let block_engine_channel = timeout(*connection_timeout, backend_endpoint.connect()) .await @@ -255,8 +264,8 @@ impl BlockEngineStage { bundle_tx, block_engine_client, packet_tx, - &local_config, - block_engine_config, + local_block_engine_config, + global_block_engine_config, banking_packet_sender, exit, block_builder_fee_info, diff --git a/core/src/proxy/relayer_stage.rs b/core/src/proxy/relayer_stage.rs index 3c754fb9e4..e640bd7554 100644 --- a/core/src/proxy/relayer_stage.rs +++ b/core/src/proxy/relayer_stage.rs @@ -147,9 +147,11 @@ impl RelayerStage { while !exit.load(Ordering::Relaxed) { // Wait until a valid config is supplied (either initially or by admin rpc) // Use if!/else here to avoid extra CONNECTION_BACKOFF wait on successful termination - if !Self::is_valid_relayer_config(&relayer_config.lock().unwrap()) { + let local_relayer_config = relayer_config.lock().unwrap().clone(); + if !Self::is_valid_relayer_config(&local_relayer_config) { sleep(CONNECTION_BACKOFF).await; } else if let Err(e) = Self::connect_auth_and_stream( + &local_relayer_config, &relayer_config, &cluster_info, &heartbeat_tx, @@ -181,7 +183,8 @@ impl RelayerStage { } async fn connect_auth_and_stream( - relayer_config: &Arc>, + local_relayer_config: &RelayerConfig, + global_relayer_config: &Arc>, cluster_info: &Arc, heartbeat_tx: &Sender, packet_tx: &Sender, @@ -191,17 +194,16 @@ impl RelayerStage { ) -> crate::proxy::Result<()> { // Get a copy of configs here in case they have changed at runtime let keypair = cluster_info.keypair().clone(); - let local_config = relayer_config.lock().unwrap().clone(); - let mut backend_endpoint = Endpoint::from_shared(local_config.relayer_url.clone()) + let mut backend_endpoint = Endpoint::from_shared(local_relayer_config.relayer_url.clone()) .map_err(|_| { ProxyError::RelayerConnectionError(format!( "invalid relayer url value: {}", - local_config.relayer_url + local_relayer_config.relayer_url )) })? .tcp_keepalive(Some(Duration::from_secs(60))); - if local_config.relayer_url.starts_with("https") { + if local_relayer_config.relayer_url.starts_with("https") { backend_endpoint = backend_endpoint .tls_config(tonic::transport::ClientTlsConfig::new()) .map_err(|_| { @@ -211,7 +213,7 @@ impl RelayerStage { })?; } - debug!("connecting to auth: {}", local_config.relayer_url); + debug!("connecting to auth: {}", local_relayer_config.relayer_url); let auth_channel = timeout(*connection_timeout, backend_endpoint.connect()) .await .map_err(|_| ProxyError::AuthenticationConnectionTimeout)? @@ -229,11 +231,14 @@ impl RelayerStage { datapoint_info!( "relayer_stage-tokens_generated", - ("url", local_config.relayer_url, String), + ("url", local_relayer_config.relayer_url, String), ("count", 1, i64), ); - debug!("connecting to relayer: {}", local_config.relayer_url); + debug!( + "connecting to relayer: {}", + local_relayer_config.relayer_url + ); let relayer_channel = timeout(*connection_timeout, backend_endpoint.connect()) .await .map_err(|_| ProxyError::RelayerConnectionTimeout)? @@ -250,8 +255,8 @@ impl RelayerStage { heartbeat_tx, packet_tx, banking_packet_sender, - &local_config, - relayer_config, + local_relayer_config, + global_relayer_config, exit, auth_client, access_token,