Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fetch and save missing accounts #9

Merged
merged 4 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 119 additions & 32 deletions crates/core/src/rpc/accounts_data.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use crate::rpc::utils::{transform_account_to_ui_account, verify_pubkey};
use std::sync::Arc;

use crate::rpc::utils::verify_pubkey;
use crate::rpc::State;

use jsonrpc_core::futures::future;
use jsonrpc_core::futures::future::{self, join_all};
use jsonrpc_core::BoxFuture;
use jsonrpc_core::Result;
use jsonrpc_core::{Error, Result};
use jsonrpc_derive::rpc;
use solana_account_decoder::parse_token::UiTokenAmount;
use solana_account_decoder::UiAccount;
use solana_account_decoder::{encode_ui_account, UiAccount, UiAccountEncoding};
use solana_client::rpc_config::RpcAccountInfoConfig;
use solana_client::rpc_response::RpcBlockCommitment;
use solana_client::rpc_response::RpcResponseContext;
use solana_rpc_client_api::response::Response as RpcResponse;
use solana_runtime::commitment::BlockCommitmentArray;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::{clock::Slot, commitment_config::CommitmentConfig};

use super::RunloopContext;

Expand Down Expand Up @@ -75,35 +76,61 @@ impl AccountsData for SurfpoolAccountsDataRpc {
pubkey_str: String,
config: Option<RpcAccountInfoConfig>,
) -> BoxFuture<Result<RpcResponse<Option<UiAccount>>>> {
println!(
"get_account_info rpc request received: {:?} {:?}",
pubkey_str, config
);
let config = config.unwrap_or_default();
let pubkey = match verify_pubkey(&pubkey_str) {
Ok(res) => res,
Err(e) => return Box::pin(future::err(e)),
};

let config = config.unwrap_or_default();
let state_reader = match meta.get_state() {
Ok(res) => res,
Err(e) => return Box::pin(future::err(e.into())),
};

let res = match transform_account_to_ui_account(
&state_reader.svm.get_account(&pubkey),
&config,
) {
Ok(res) => Ok(res),
Err(e) => return Box::pin(future::err(e.into())),
};

let res = res.map(|value| RpcResponse {
context: RpcResponseContext::new(state_reader.epoch_info.absolute_slot),
value,
});

Box::pin(future::ready(res))
let account = state_reader.svm.get_account(&pubkey);

// Drop the lock on the state while we fetch accounts
let absolute_slot = state_reader.epoch_info.absolute_slot;
let rpc_client = state_reader.rpc_client.clone();
let encoding = config.encoding.clone();
let data_slice = config.data_slice.clone();
drop(state_reader);

Box::pin(async move {
let account = if let None = account {
// Fetch and save the missing account
if let Some(fetched_account) = rpc_client.get_account(&pubkey).await.ok() {
let mut state_reader = meta.get_state_mut()?;
state_reader
.svm
.set_account(pubkey, fetched_account.clone())
.map_err(|err| {
Error::invalid_params(format!(
"failed to save fetched account {pubkey:?}: {err:?}"
))
})?;

Some(fetched_account)
} else {
None
}
} else {
account
};

Ok(RpcResponse {
context: RpcResponseContext::new(absolute_slot),
value: account.map(|account| {
encode_ui_account(
&pubkey,
&account,
encoding.unwrap_or(UiAccountEncoding::Base64),
None,
data_slice,
)
}),
})
})
Comment on lines +79 to +133
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Async-based account retrieval logic looks solid but silently discards fetch errors.

  1. Dropping the lock before partially fetching prevents blocking other threads, which is good. However, in line 102 (.ok()), you ignore the Err variant from RPC. Consider at least logging the error or returning a more informative response to the client.
  2. Parallel checks for missing accounts: if multiple callers fetch the same missing account simultaneously, you may get redundant RPC calls. A concurrency-aware memoization strategy could help prevent race conditions or repeated requests.
  3. Partial error vs. partial success: If the account retrieval fails, you return None for the account in your RpcResponse. This is acceptable as a fallback but can be confusing to the caller. You might offer an explicit error field in the response.

Below is a suggested diff to handle the remote fetch error more explicitly (applicable to the relevant lines near 102):

- if let Some(fetched_account) = rpc_client.get_account(&pubkey).await.ok() {
+ let remote_fetch = rpc_client.get_account(&pubkey).await;
+ if let Ok(fetched_account) = remote_fetch {
    let mut state_reader = meta.get_state_mut()?;
    ...
  } else if let Err(e) = remote_fetch {
    // Log or handle the specific error here
    warn!("Failed to fetch account from remote: {:?}", e);
  }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let config = config.unwrap_or_default();
let pubkey = match verify_pubkey(&pubkey_str) {
Ok(res) => res,
Err(e) => return Box::pin(future::err(e)),
};
let config = config.unwrap_or_default();
let state_reader = match meta.get_state() {
Ok(res) => res,
Err(e) => return Box::pin(future::err(e.into())),
};
let res = match transform_account_to_ui_account(
&state_reader.svm.get_account(&pubkey),
&config,
) {
Ok(res) => Ok(res),
Err(e) => return Box::pin(future::err(e.into())),
};
let res = res.map(|value| RpcResponse {
context: RpcResponseContext::new(state_reader.epoch_info.absolute_slot),
value,
});
Box::pin(future::ready(res))
let account = state_reader.svm.get_account(&pubkey);
// Drop the lock on the state while we fetch accounts
let absolute_slot = state_reader.epoch_info.absolute_slot;
let rpc_client = state_reader.rpc_client.clone();
let encoding = config.encoding.clone();
let data_slice = config.data_slice.clone();
drop(state_reader);
Box::pin(async move {
let account = if let None = account {
// Fetch and save the missing account
if let Some(fetched_account) = rpc_client.get_account(&pubkey).await.ok() {
let mut state_reader = meta.get_state_mut()?;
state_reader
.svm
.set_account(pubkey, fetched_account.clone())
.map_err(|err| {
Error::invalid_params(format!(
"failed to save fetched account {pubkey:?}: {err:?}"
))
})?;
Some(fetched_account)
} else {
None
}
} else {
account
};
Ok(RpcResponse {
context: RpcResponseContext::new(absolute_slot),
value: account.map(|account| {
encode_ui_account(
&pubkey,
&account,
encoding.unwrap_or(UiAccountEncoding::Base64),
None,
data_slice,
)
}),
})
})
let config = config.unwrap_or_default();
let pubkey = match verify_pubkey(&pubkey_str) {
Ok(res) => res,
Err(e) => return Box::pin(future::err(e)),
};
let state_reader = match meta.get_state() {
Ok(res) => res,
Err(e) => return Box::pin(future::err(e.into())),
};
let account = state_reader.svm.get_account(&pubkey);
// Drop the lock on the state while we fetch accounts
let absolute_slot = state_reader.epoch_info.absolute_slot;
let rpc_client = state_reader.rpc_client.clone();
let encoding = config.encoding.clone();
let data_slice = config.data_slice.clone();
drop(state_reader);
Box::pin(async move {
let account = if let None = account {
// Fetch and save the missing account
+ let remote_fetch = rpc_client.get_account(&pubkey).await;
+ if let Ok(fetched_account) = remote_fetch {
let mut state_reader = meta.get_state_mut()?;
state_reader
.svm
.set_account(pubkey, fetched_account.clone())
.map_err(|err| {
Error::invalid_params(format!(
"failed to save fetched account {pubkey:?}: {err:?}"
))
})?;
+ Some(fetched_account)
+ } else if let Err(e) = remote_fetch {
+ // Log or handle the specific error here
+ warn!("Failed to fetch account from remote: {:?}", e);
+ None
+ } else {
+ None
}
} else {
account
};
Ok(RpcResponse {
context: RpcResponseContext::new(absolute_slot),
value: account.map(|account| {
encode_ui_account(
&pubkey,
&account,
encoding.unwrap_or(UiAccountEncoding::Base64),
None,
data_slice,
)
}),
})
})

}

fn get_multiple_accounts(
Expand All @@ -118,19 +145,79 @@ impl AccountsData for SurfpoolAccountsDataRpc {
Err(e) => return Box::pin(future::err(e.into())),
};

let res = pubkeys
let accounts = match pubkeys
.iter()
.map(|s| {
let pk = verify_pubkey(s)?;
transform_account_to_ui_account(&state_reader.svm.get_account(&pk), &config)
Ok((pk, state_reader.svm.get_account(&pk)))
})
.collect::<Result<Vec<_>>>()
.map(|value| RpcResponse {
context: RpcResponseContext::new(state_reader.epoch_info.absolute_slot),
value,
});
{
Ok(accs) => accs,
Err(e) => return Box::pin(future::err(e.into())),
};

Box::pin(future::ready(res))
// Drop the lock on the state while we fetch accounts
let absolute_slot = state_reader.epoch_info.absolute_slot;
let rpc_client = state_reader.rpc_client.clone();
let encoding = config.encoding.clone();
let data_slice = config.data_slice.clone();
drop(state_reader);

Box::pin(async move {
// Fetch all accounts at once and then only use those that are missing
let fetched_accounts = join_all(accounts.iter().map(|(pk, _)| {
let rpc_client = Arc::clone(&rpc_client);

async move { (pk, rpc_client.get_account(&pk).await.ok()) }
}))
.await;

// Save missing fetched accounts
let mut state_reader = meta.get_state_mut()?;
let combined_accounts = accounts
.iter()
.zip(fetched_accounts)
.map(|((pk, local), (_, fetched_account))| {
if local.is_some() {
Ok((pk, local.clone()))
} else if let Some(account) = fetched_account {
state_reader
.svm
.set_account(*pk, account.clone())
.map_err(|err| {
Error::invalid_params(format!(
"failed to save fetched account {pk:?}: {err:?}"
))
})?;
Ok((pk, Some(account)))
} else {
Ok((pk, None))
}
})
.collect::<Result<Vec<_>>>()?;

Ok(RpcResponse {
context: RpcResponseContext::new(absolute_slot),
value: combined_accounts
.into_iter()
.map(|(pk, account)| {
let encoding = encoding.clone();
let data_slice = data_slice.clone();

account.map(|account| {
encode_ui_account(
&pk,
&account,
encoding.unwrap_or(UiAccountEncoding::Base64),
None,
data_slice,
)
})
})
.collect::<Vec<_>>(),
})
})
}

fn get_block_commitment(
Expand Down
73 changes: 2 additions & 71 deletions crates/core/src/rpc/utils.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use std::{any::type_name, io::Write, sync::Arc};
use std::{any::type_name, sync::Arc};

use base64::prelude::*;
use bincode::Options;
use jsonrpc_core::{Error, Result};
use solana_account::Account;
use solana_account_decoder::{UiAccount, UiAccountData, UiAccountEncoding};
use solana_client::{
rpc_config::{RpcAccountInfoConfig, RpcTokenAccountsFilter},
rpc_config::RpcTokenAccountsFilter,
rpc_custom_error::RpcCustomError,
rpc_filter::RpcFilterType,
rpc_request::{TokenAccountsFilter, MAX_GET_CONFIRMED_SIGNATURES_FOR_ADDRESS2_LIMIT},
Expand Down Expand Up @@ -170,70 +168,3 @@ where
})
.map(|output| (wire_output, output))
}

pub fn transform_account_to_ui_account(
account: &Option<Account>,
config: &RpcAccountInfoConfig,
) -> Result<Option<UiAccount>> {
if let Some(account) = account {
Ok(Some(UiAccount {
lamports: account.lamports,
owner: account.owner.to_string(),
data: {
let account_data = if let Some(data_slice) = config.data_slice {
let end =
std::cmp::min(account.data.len(), data_slice.offset + data_slice.length);
account.data.clone()[data_slice.offset..end].to_vec()
} else {
account.data.clone()
};

match config.encoding {
Some(UiAccountEncoding::Base58) => UiAccountData::Binary(
bs58::encode(account_data).into_string(),
UiAccountEncoding::Base58,
),
Some(UiAccountEncoding::Base64) => UiAccountData::Binary(
BASE64_STANDARD.encode(account_data),
UiAccountEncoding::Base64,
),
Some(UiAccountEncoding::Base64Zstd) => {
let mut data = Vec::with_capacity(account_data.len());

// Default compression level
match zstd::Encoder::new(&mut data, 0).and_then(|mut encoder| {
encoder
.write_all(&account_data)
.and_then(|_| encoder.finish())
}) {
Ok(_) => UiAccountData::Binary(
BASE64_STANDARD.encode(&data),
UiAccountEncoding::Base64Zstd,
),
// Falling back on standard base64 encoding if compression failed
Err(err) => {
eprintln!("Zstd compression failed: {err}");
UiAccountData::Binary(
BASE64_STANDARD.encode(&account_data),
UiAccountEncoding::Base64,
)
}
}
}
None => UiAccountData::Binary(
bs58::encode(account_data.clone()).into_string(),
UiAccountEncoding::Base58,
),
encoding => Err(jsonrpc_core::Error::invalid_params(format!(
"Encoding {encoding:?} is not supported yet."
)))?,
}
},
executable: account.executable,
rent_epoch: account.rent_epoch,
space: Some(account.data.len() as u64),
}))
} else {
Ok(None)
}
}
12 changes: 7 additions & 5 deletions crates/core/src/simnet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use crossbeam_channel::Sender;
use jsonrpc_core::MetaIoHandler;
use jsonrpc_http_server::{DomainsValidation, ServerBuilder};
use litesvm::LiteSVM;
use solana_rpc_client::rpc_client::RpcClient;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::{
clock::Clock, epoch_info::EpochInfo, pubkey::Pubkey, transaction::VersionedTransaction,
};
use std::{
net::SocketAddr,
sync::{mpsc::channel, Arc, RwLock},
sync::{Arc, RwLock},
thread::sleep,
time::Duration,
};
Expand All @@ -27,6 +27,7 @@ pub struct GlobalState {
pub svm: LiteSVM,
pub transactions_processed: u64,
pub epoch_info: EpochInfo,
pub rpc_client: Arc<RpcClient>,
}

#[derive(Debug)]
Expand All @@ -52,15 +53,16 @@ pub async fn start(
let svm = LiteSVM::new();

// Todo: should check config first
let rpc_client = RpcClient::new(&config.simnet.remote_rpc_url);
let epoch_info = rpc_client.get_epoch_info()?;
let rpc_client = Arc::new(RpcClient::new(config.simnet.remote_rpc_url.clone()));
let epoch_info = rpc_client.get_epoch_info().await?;
// Question: can the value `slots_in_epoch` fluctuate over time?
let slots_in_epoch = epoch_info.slots_in_epoch;

let context = GlobalState {
svm,
transactions_processed: 0,
epoch_info: epoch_info.clone(),
rpc_client: rpc_client.clone(),
};

let context = Arc::new(RwLock::new(context));
Expand Down Expand Up @@ -118,7 +120,7 @@ pub async fn start(
let program_id = &message.account_keys[instruction.program_id_index as usize];
if ctx.svm.get_account(&program_id).is_none() {
// println!("Retrieving account from Mainnet: {:?}", program_id);
let res = rpc_client.get_account(&program_id);
let res = rpc_client.get_account(&program_id).await;
let event = match res {
Ok(account) => {
let _ = ctx.svm.set_account(*program_id, account);
Expand Down