Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to relayer_stage and block_engine_stage #149

Merged
merged 1 commit into from
Sep 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 101 additions & 113 deletions core/src/proxy/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,32 +51,45 @@ pub(crate) mod token_manager {
auth_service_endpoint: Endpoint,
access_token: Arc<Mutex<Token>>,
cluster_info: Arc<ClusterInfo>,
retry_interval: Duration,
exit: Arc<AtomicBool>,
) {
const RETRY_INTERVAL: Duration = Duration::from_secs(5);
const SLEEP_INTERVAL: Duration = Duration::from_secs(60);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 60s instead of 10s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

less log spam


let mut num_refresh_loop_errors: u64 = 0;
let mut num_connect_errors: u64 = 0;
while !exit.load(Ordering::Relaxed) {
sleep(RETRY_INTERVAL).await;

match auth_service_endpoint.connect().await {
Ok(channel) => {
if let Err(e) = auth_tokens_update_loop_helper(
AuthServiceClient::new(channel),
auth_service_endpoint.uri().to_string(),
(access_token.clone(), Token::default()),
cluster_info.clone(),
Duration::from_secs(10),
SLEEP_INTERVAL,
exit.clone(),
)
.await
{
error!("auth_refresh_loop error: {:?}", e);
sleep(retry_interval).await;
num_refresh_loop_errors += 1;
datapoint_error!(
"auth_tokens_update_loop-refresh_loop_error",
("url", auth_service_endpoint.uri().to_string(), String),
("count", num_refresh_loop_errors, i64),
("error", e.to_string(), String)
);
}
}
Err(e) => {
error!(
"error connecting to auth service url: {} error: {}",
auth_service_endpoint.uri(),
e
num_connect_errors += 1;
datapoint_error!(
"auth_tokens_update_loop-refresh_connect_error",
("url", auth_service_endpoint.uri().to_string(), String),
("count", num_connect_errors, i64),
("error", e.to_string(), String)
);
sleep(retry_interval).await;
}
}
}
Expand All @@ -85,71 +98,71 @@ pub(crate) mod token_manager {
/// Responsible for keeping generating and refreshing the access token.
async fn auth_tokens_update_loop_helper(
mut auth_service_client: AuthServiceClient<Channel>,
url: String,
(access_token, mut refresh_token): (Arc<Mutex<Token>>, Token),
cluster_info: Arc<ClusterInfo>,
sleep_interval: Duration,
exit: Arc<AtomicBool>,
) -> crate::proxy::Result<()> {
const REFRESH_WITHIN_SECS: i64 = 300;
let mut num_full_refreshes = 0;
let mut num_refresh_access_token = 0;

while !exit.load(Ordering::Relaxed) {
let access_token_expiry: i64 = {
if let Some(ts) = access_token.lock().unwrap().expires_at_utc.as_ref() {
ts.seconds
} else {
0
}
};
let refresh_token_expiry: i64 = {
if let Some(ts) = refresh_token.expires_at_utc.as_ref() {
ts.seconds
} else {
0
}
};
let access_token_expiry: i64 = access_token
.lock()
.unwrap()
.expires_at_utc
.as_ref()
.map(|ts| ts.seconds)
.unwrap_or_default();
let refresh_token_expiry = refresh_token
.expires_at_utc
.as_ref()
.map(|ts| ts.seconds)
.unwrap_or_default();

let now = Utc::now().timestamp();
let should_refresh_access = {
let delta = access_token_expiry.checked_sub(now);
if delta.is_none() {
return Err(ProxyError::InvalidData(
"Received invalid access_token expiration".to_string(),
));
}
delta.unwrap() <= 300
};
let should_generate_new_tokens = {
let delta = refresh_token_expiry.checked_sub(now);
if delta.is_none() {
return Err(ProxyError::InvalidData(
"Received invalid refresh_token expiration".to_string(),
));
}
delta.unwrap() <= 300
};

let should_refresh_access = access_token_expiry.checked_sub(now).ok_or_else(|| {
ProxyError::InvalidData("Received invalid access_token expiration".to_string())
})? <= REFRESH_WITHIN_SECS;
let should_generate_new_tokens =
refresh_token_expiry.checked_sub(now).ok_or_else(|| {
ProxyError::InvalidData("Received invalid refresh_token expiration".to_string())
})? <= REFRESH_WITHIN_SECS;

match (should_refresh_access, should_generate_new_tokens) {
// Generate new tokens if the refresh_token is close to being expired.
(_, true) => {
let kp = cluster_info.keypair().clone();
match generate_auth_tokens(&mut auth_service_client, kp.as_ref()).await {
Ok((new_access_token, new_refresh_token)) => {
*access_token.lock().unwrap() = new_access_token.clone();
refresh_token = new_refresh_token;
}
Err(e) => {
return Err(e);
}
}

let (new_access_token, new_refresh_token) =
generate_auth_tokens(&mut auth_service_client, kp.as_ref()).await?;

*access_token.lock().unwrap() = new_access_token.clone();
refresh_token = new_refresh_token;

num_full_refreshes += 1;
datapoint_info!(
"auth_tokens_update_loop-tokens_generated",
("url", url, String),
("count", num_full_refreshes, i64),
);
}
// Invoke the refresh_access_token method if the access_token is close to being expired.
(true, _) => {
match refresh_access_token(&mut auth_service_client, refresh_token.clone())
.await
{
Ok(new_access_token) => {
*access_token.lock().unwrap() = new_access_token;
}
Err(e) => return Err(e),
}
let new_access_token =
refresh_access_token(&mut auth_service_client, refresh_token.clone())
.await?;
*access_token.lock().unwrap() = new_access_token;

num_refresh_access_token += 1;
datapoint_info!(
"auth_tokens_update_loop-refresh_access_token",
("url", url, String),
("count", num_refresh_access_token, i64),
);
}
// Sleep and do nothing if neither token is close to expired,
(false, false) => sleep(sleep_interval).await,
Expand All @@ -171,14 +184,8 @@ pub(crate) mod token_manager {
})
.await
{
Ok(resp) => validate_token(resp.into_inner().access_token).map_err(|e| {
error!("invalid access_token");
e
}),
Err(e) => {
debug!("error refreshing access token: {}", e);
Err(ProxyError::GrpcError(e))
}
Ok(resp) => get_validated_token(resp.into_inner().access_token),
Err(e) => Err(ProxyError::GrpcError(e)),
}
}

Expand All @@ -191,69 +198,50 @@ pub(crate) mod token_manager {
Token, /* access_token */
Token, /* refresh_token */
)> {
let challenge = match auth_service_client
let challenge_response = auth_service_client
.generate_auth_challenge(GenerateAuthChallengeRequest {
role: Role::Validator as i32,
pubkey: keypair.pubkey().as_ref().to_vec(),
})
.await
{
Ok(resp) => Ok(format!(
"{}-{}",
keypair.pubkey(),
resp.into_inner().challenge
)),
Err(e) => {
debug!("error generating auth challenge: {}", e);
Err(ProxyError::GrpcError(e))
}
}?;
.await?;

let signed_challenge = keypair.sign_message(challenge.as_bytes()).as_ref().to_vec();
match auth_service_client
let formatted_challenge = format!(
"{}-{}",
keypair.pubkey(),
challenge_response.into_inner().challenge
);
let signed_challenge = keypair
.sign_message(formatted_challenge.as_bytes())
.as_ref()
.to_vec();

let auth_tokens = auth_service_client
.generate_auth_tokens(GenerateAuthTokensRequest {
challenge,
challenge: formatted_challenge,
client_pubkey: keypair.pubkey().as_ref().to_vec(),
signed_challenge,
})
.await
{
Ok(resp) => {
let inner = resp.into_inner();
.await?;

let access_token = validate_token(inner.access_token).map_err(|e| {
error!("invalid access_token");
e
})?;
let refresh_token = validate_token(inner.refresh_token).map_err(|e| {
error!("invalid access_token");
e
})?;
let inner = auth_tokens.into_inner();
let access_token = get_validated_token(inner.access_token)?;
let refresh_token = get_validated_token(inner.refresh_token)?;

Ok((access_token, refresh_token))
}
Err(e) => {
debug!("error generating auth tokens: {}", e);
Err(ProxyError::GrpcError(e))
}
}
Ok((access_token, refresh_token))
}

/// An invalid token is one where any of its fields are None or the token itself is None.
/// Performs the necessary validations on the auth tokens before returning,
/// i.e. it is safe to call .unwrap() on the token fields from the call-site.
fn validate_token(maybe_token: Option<Token>) -> crate::proxy::Result<Token> {
match maybe_token {
Some(token) => {
if token.expires_at_utc.is_none() {
Err(ProxyError::InvalidData(
"expires_at_utc field is null".to_string(),
))
} else {
Ok(token)
}
}
None => Err(ProxyError::InvalidData("received a null token".to_string())),
fn get_validated_token(maybe_token: Option<Token>) -> crate::proxy::Result<Token> {
let token = maybe_token
.ok_or_else(|| ProxyError::InvalidData("received a null token".to_string()))?;
if token.expires_at_utc.is_none() {
Err(ProxyError::InvalidData(
"expires_at_utc field is null".to_string(),
))
} else {
Ok(token)
}
}
}
Loading