Skip to content

Commit

Permalink
Add metrics for relayer + block engine proxy (#149)
Browse files Browse the repository at this point in the history
  • Loading branch information
buffalu authored Sep 25, 2022
1 parent 743a715 commit 54104db
Show file tree
Hide file tree
Showing 3 changed files with 359 additions and 308 deletions.
214 changes: 101 additions & 113 deletions core/src/proxy/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,45 @@ pub(crate) mod token_manager {
auth_service_endpoint: Endpoint,
access_token: Arc<Mutex<Token>>,
cluster_info: Arc<ClusterInfo>,
retry_interval: Duration,
exit: Arc<AtomicBool>,
) {
const RETRY_INTERVAL: Duration = Duration::from_secs(5);
const SLEEP_INTERVAL: Duration = Duration::from_secs(60);

let mut num_refresh_loop_errors: u64 = 0;
let mut num_connect_errors: u64 = 0;
while !exit.load(Ordering::Relaxed) {
sleep(RETRY_INTERVAL).await;

match auth_service_endpoint.connect().await {
Ok(channel) => {
if let Err(e) = auth_tokens_update_loop_helper(
AuthServiceClient::new(channel),
auth_service_endpoint.uri().to_string(),
(access_token.clone(), Token::default()),
cluster_info.clone(),
Duration::from_secs(10),
SLEEP_INTERVAL,
exit.clone(),
)
.await
{
error!("auth_refresh_loop error: {:?}", e);
sleep(retry_interval).await;
num_refresh_loop_errors += 1;
datapoint_error!(
"auth_tokens_update_loop-refresh_loop_error",
("url", auth_service_endpoint.uri().to_string(), String),
("count", num_refresh_loop_errors, i64),
("error", e.to_string(), String)
);
}
}
Err(e) => {
error!(
"error connecting to auth service url: {} error: {}",
auth_service_endpoint.uri(),
e
num_connect_errors += 1;
datapoint_error!(
"auth_tokens_update_loop-refresh_connect_error",
("url", auth_service_endpoint.uri().to_string(), String),
("count", num_connect_errors, i64),
("error", e.to_string(), String)
);
sleep(retry_interval).await;
}
}
}
Expand All @@ -85,71 +98,71 @@ pub(crate) mod token_manager {
/// Responsible for keeping generating and refreshing the access token.
async fn auth_tokens_update_loop_helper(
mut auth_service_client: AuthServiceClient<Channel>,
url: String,
(access_token, mut refresh_token): (Arc<Mutex<Token>>, Token),
cluster_info: Arc<ClusterInfo>,
sleep_interval: Duration,
exit: Arc<AtomicBool>,
) -> crate::proxy::Result<()> {
const REFRESH_WITHIN_SECS: i64 = 300;
let mut num_full_refreshes = 0;
let mut num_refresh_access_token = 0;

while !exit.load(Ordering::Relaxed) {
let access_token_expiry: i64 = {
if let Some(ts) = access_token.lock().unwrap().expires_at_utc.as_ref() {
ts.seconds
} else {
0
}
};
let refresh_token_expiry: i64 = {
if let Some(ts) = refresh_token.expires_at_utc.as_ref() {
ts.seconds
} else {
0
}
};
let access_token_expiry: i64 = access_token
.lock()
.unwrap()
.expires_at_utc
.as_ref()
.map(|ts| ts.seconds)
.unwrap_or_default();
let refresh_token_expiry = refresh_token
.expires_at_utc
.as_ref()
.map(|ts| ts.seconds)
.unwrap_or_default();

let now = Utc::now().timestamp();
let should_refresh_access = {
let delta = access_token_expiry.checked_sub(now);
if delta.is_none() {
return Err(ProxyError::InvalidData(
"Received invalid access_token expiration".to_string(),
));
}
delta.unwrap() <= 300
};
let should_generate_new_tokens = {
let delta = refresh_token_expiry.checked_sub(now);
if delta.is_none() {
return Err(ProxyError::InvalidData(
"Received invalid refresh_token expiration".to_string(),
));
}
delta.unwrap() <= 300
};

let should_refresh_access = access_token_expiry.checked_sub(now).ok_or_else(|| {
ProxyError::InvalidData("Received invalid access_token expiration".to_string())
})? <= REFRESH_WITHIN_SECS;
let should_generate_new_tokens =
refresh_token_expiry.checked_sub(now).ok_or_else(|| {
ProxyError::InvalidData("Received invalid refresh_token expiration".to_string())
})? <= REFRESH_WITHIN_SECS;

match (should_refresh_access, should_generate_new_tokens) {
// Generate new tokens if the refresh_token is close to being expired.
(_, true) => {
let kp = cluster_info.keypair().clone();
match generate_auth_tokens(&mut auth_service_client, kp.as_ref()).await {
Ok((new_access_token, new_refresh_token)) => {
*access_token.lock().unwrap() = new_access_token.clone();
refresh_token = new_refresh_token;
}
Err(e) => {
return Err(e);
}
}

let (new_access_token, new_refresh_token) =
generate_auth_tokens(&mut auth_service_client, kp.as_ref()).await?;

*access_token.lock().unwrap() = new_access_token.clone();
refresh_token = new_refresh_token;

num_full_refreshes += 1;
datapoint_info!(
"auth_tokens_update_loop-tokens_generated",
("url", url, String),
("count", num_full_refreshes, i64),
);
}
// Invoke the refresh_access_token method if the access_token is close to being expired.
(true, _) => {
match refresh_access_token(&mut auth_service_client, refresh_token.clone())
.await
{
Ok(new_access_token) => {
*access_token.lock().unwrap() = new_access_token;
}
Err(e) => return Err(e),
}
let new_access_token =
refresh_access_token(&mut auth_service_client, refresh_token.clone())
.await?;
*access_token.lock().unwrap() = new_access_token;

num_refresh_access_token += 1;
datapoint_info!(
"auth_tokens_update_loop-refresh_access_token",
("url", url, String),
("count", num_refresh_access_token, i64),
);
}
// Sleep and do nothing if neither token is close to expired,
(false, false) => sleep(sleep_interval).await,
Expand All @@ -171,14 +184,8 @@ pub(crate) mod token_manager {
})
.await
{
Ok(resp) => validate_token(resp.into_inner().access_token).map_err(|e| {
error!("invalid access_token");
e
}),
Err(e) => {
debug!("error refreshing access token: {}", e);
Err(ProxyError::GrpcError(e))
}
Ok(resp) => get_validated_token(resp.into_inner().access_token),
Err(e) => Err(ProxyError::GrpcError(e)),
}
}

Expand All @@ -191,69 +198,50 @@ pub(crate) mod token_manager {
Token, /* access_token */
Token, /* refresh_token */
)> {
let challenge = match auth_service_client
let challenge_response = auth_service_client
.generate_auth_challenge(GenerateAuthChallengeRequest {
role: Role::Validator as i32,
pubkey: keypair.pubkey().as_ref().to_vec(),
})
.await
{
Ok(resp) => Ok(format!(
"{}-{}",
keypair.pubkey(),
resp.into_inner().challenge
)),
Err(e) => {
debug!("error generating auth challenge: {}", e);
Err(ProxyError::GrpcError(e))
}
}?;
.await?;

let signed_challenge = keypair.sign_message(challenge.as_bytes()).as_ref().to_vec();
match auth_service_client
let formatted_challenge = format!(
"{}-{}",
keypair.pubkey(),
challenge_response.into_inner().challenge
);
let signed_challenge = keypair
.sign_message(formatted_challenge.as_bytes())
.as_ref()
.to_vec();

let auth_tokens = auth_service_client
.generate_auth_tokens(GenerateAuthTokensRequest {
challenge,
challenge: formatted_challenge,
client_pubkey: keypair.pubkey().as_ref().to_vec(),
signed_challenge,
})
.await
{
Ok(resp) => {
let inner = resp.into_inner();
.await?;

let access_token = validate_token(inner.access_token).map_err(|e| {
error!("invalid access_token");
e
})?;
let refresh_token = validate_token(inner.refresh_token).map_err(|e| {
error!("invalid access_token");
e
})?;
let inner = auth_tokens.into_inner();
let access_token = get_validated_token(inner.access_token)?;
let refresh_token = get_validated_token(inner.refresh_token)?;

Ok((access_token, refresh_token))
}
Err(e) => {
debug!("error generating auth tokens: {}", e);
Err(ProxyError::GrpcError(e))
}
}
Ok((access_token, refresh_token))
}

/// An invalid token is one where any of its fields are None or the token itself is None.
/// Performs the necessary validations on the auth tokens before returning,
/// i.e. it is safe to call .unwrap() on the token fields from the call-site.
fn validate_token(maybe_token: Option<Token>) -> crate::proxy::Result<Token> {
match maybe_token {
Some(token) => {
if token.expires_at_utc.is_none() {
Err(ProxyError::InvalidData(
"expires_at_utc field is null".to_string(),
))
} else {
Ok(token)
}
}
None => Err(ProxyError::InvalidData("received a null token".to_string())),
fn get_validated_token(maybe_token: Option<Token>) -> crate::proxy::Result<Token> {
let token = maybe_token
.ok_or_else(|| ProxyError::InvalidData("received a null token".to_string()))?;
if token.expires_at_utc.is_none() {
Err(ProxyError::InvalidData(
"expires_at_utc field is null".to_string(),
))
} else {
Ok(token)
}
}
}
Loading

0 comments on commit 54104db

Please sign in to comment.