Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add a new cluster state "Unhealthy" #63

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,11 @@ All notable changes to this project will be documented in this file.
- Support configuring the scaler reconcile interval ([#61]).
- Add simple web-based dashboard that shows the current state and query counts of all clusters.
This makes it easier to debug state transitions of clusters ([#62]).
- Add a new cluster state `Unhealthy`.
This state is entered once the readiness check of a `Ready` cluster fails (the check is implemented in the scaler implementation).
The `Unhealthy` state is kept until the scaler marks that Cluster as ready again.
`Unhealthy` clusters won't get any new queries, if all queries are unhealthy queries are queued.
<br>Note: Use the now configurable scaler reconcile interval to detect cluster changes quickly ([#63]).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: I don't quite understand the sentence in line 15. It is either wrong, or worded weirdly.

note: markdownlint complains about the <br /> tag. I think we should remove it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, there was a typo: a9726af

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL a backslash also works: 5e5b022


### Changed

Expand All @@ -21,6 +26,7 @@ All notable changes to this project will be documented in this file.
[#57]: https://github.com/stackabletech/trino-lb/pull/57
[#61]: https://github.com/stackabletech/trino-lb/pull/61
[#62]: https://github.com/stackabletech/trino-lb/pull/62
[#63]: https://github.com/stackabletech/trino-lb/pull/63

## [0.3.2] - 2024-08-20

Expand Down
6 changes: 6 additions & 0 deletions trino-lb-core/src/trino_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ pub enum ClusterState {
Starting,
/// Up and running, ready to get queries
Ready,
/// Up, but not ready to accept queries. It should not be started or stopped, as it's healing itself.
Unhealthy,
/// No new queries should be submitted. Once all running queries are finished and a certain time period has passed
/// go to `Terminating`
Draining {
Expand All @@ -30,6 +32,7 @@ impl Display for ClusterState {
ClusterState::Stopped => f.write_str("Stopped"),
ClusterState::Starting => f.write_str("Starting"),
ClusterState::Ready => f.write_str("Ready"),
ClusterState::Unhealthy => f.write_str("Unhealthy"),
ClusterState::Draining { .. } => f.write_str("Draining"),
ClusterState::Terminating => f.write_str("Terminating"),
ClusterState::Deactivated => f.write_str("Deactivated"),
Expand All @@ -45,6 +48,7 @@ impl ClusterState {
| ClusterState::Starting
| ClusterState::Terminating => ClusterState::Starting,
ClusterState::Ready | ClusterState::Draining { .. } => ClusterState::Ready,
ClusterState::Unhealthy => ClusterState::Unhealthy,
ClusterState::Deactivated => ClusterState::Deactivated,
}
}
Expand All @@ -55,6 +59,7 @@ impl ClusterState {
ClusterState::Unknown
// No, because it is already started
| ClusterState::Starting
| ClusterState::Unhealthy
| ClusterState::Ready
| ClusterState::Terminating
| ClusterState::Deactivated => false,
Expand All @@ -64,6 +69,7 @@ impl ClusterState {
pub fn ready_to_accept_queries(&self) -> bool {
match self {
ClusterState::Unknown
| ClusterState::Unhealthy
| ClusterState::Stopped
| ClusterState::Draining { .. }
| ClusterState::Terminating
Expand Down
36 changes: 29 additions & 7 deletions trino-lb/src/maintenance/query_count_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,7 @@ impl QueryCountFetcher {
self.clusters
.iter()
.zip(cluster_states)
.filter_map(|(cluster, state)| match state{
ClusterState::Unknown | ClusterState::Stopped | ClusterState::Starting | ClusterState::Terminating | ClusterState::Deactivated => None,
ClusterState::Ready | ClusterState::Draining{ .. } => Some(cluster),
})
.map(|cluster| self.process_cluster(cluster)),
.map(|(cluster, state)| self.process_cluster(cluster, state))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: It seems like we are not actually mapping anything here. Does it maybe make more sense to use .for_each() here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, as we are using https://docs.rs/futures/latest/futures/future/fn.join_all.html here

    Checking trino-lb v0.3.2 (/home/sbernauer/stackable/trino-lb/trino-lb)
error[E0308]: mismatched types
   --> trino-lb/src/maintenance/query_count_fetcher.rs:139:54
    |
139 |                         .for_each(|(cluster, state)| self.process_cluster(cluster, state))
    |                                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ expected `()`, found future
    |
note: calling an async function returns a future
   --> trino-lb/src/maintenance/query_count_fetcher.rs:139:54
    |
139 |                         .for_each(|(cluster, state)| self.process_cluster(cluster, state))
    |                                                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
help: consider `await`ing on the `Future`
    |
139 |                         .for_each(|(cluster, state)| self.process_cluster(cluster, state).await)
    |                                                                                          ++++++
help: consider using a semicolon here
    |
139 |                         .for_each(|(cluster, state)| { self.process_cluster(cluster, state); })
    |                                                      +                                     +++

error[E0277]: `()` is not an iterator
   --> trino-lb/src/maintenance/query_count_fetcher.rs:135:30
    |
135 |                   let result = join_all(
    |  ______________________________^
136 | |                     self.clusters
137 | |                         .iter()
138 | |                         .zip(cluster_states)
139 | |                         .for_each(|(cluster, state)| self.process_cluster(cluster, state))
140 | |                 )
    | |_________________^ `()` is not an iterator
    |
    = help: the trait `Iterator` is not implemented for `()`
    = note: required for `()` to implement `IntoIterator`

Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.
error: could not compile `trino-lb` (bin "trino-lb") due to 2 previous errors

We could do some things around futures::stream::iter(self.clusters.iter().zip(cluster_states)).for_each_concurrent(1000000, |(cluster, state)| self.process_cluster(cluster, state)).await, but
a.) that a different approach than we take everywhere else.
b.) The result.len() breaks, we need to do it differently

Not sure if that's worth it

)
.await;

Expand All @@ -152,8 +148,34 @@ impl QueryCountFetcher {
}
}

#[instrument(skip(self))]
async fn process_cluster(&self, cluster: &TrinoClusterConfig) {
#[instrument(skip(self, cluster), fields(cluster_name = cluster.name))]
async fn process_cluster(&self, cluster: &TrinoClusterConfig, state: ClusterState) {
Comment on lines +157 to +158
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: You should add a small doc comment explaining what this method does.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in 373ab07

match state {
ClusterState::Ready | ClusterState::Unhealthy | ClusterState::Draining { .. } => {
self.fetch_and_store_query_count(cluster).await;
}
ClusterState::Unknown
| ClusterState::Stopped
| ClusterState::Starting
| ClusterState::Terminating
| ClusterState::Deactivated => {
if let Err(err) = self
.persistence
.set_cluster_query_count(&cluster.name, 0)
.await
Comment on lines +168 to +171
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Why exactly do we set the number of queries to zero here? Can you add a developer comment here which shortly explains it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added some rustdoc in 373ab07 explaining it. Is that sufficient in this case here?

{
error!(
cluster = cluster.name,
?err,
"QueryCountFetcher: Failed to set current cluster query count to zero"
);
}
}
}
}

#[instrument(skip(self, cluster), fields(cluster_name = cluster.name))]
async fn fetch_and_store_query_count(&self, cluster: &TrinoClusterConfig) {
let cluster_info =
get_cluster_info(&cluster.endpoint, self.ignore_certs, &cluster.credentials).await;

Expand Down
35 changes: 26 additions & 9 deletions trino-lb/src/scaling/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio::{
task::{JoinError, JoinSet},
time,
};
use tracing::{debug, error, info, instrument, Instrument, Span};
use tracing::{debug, error, info, instrument, warn, Instrument, Span};
use trino_lb_core::{
config::{Config, ScalerConfig, ScalerConfigImplementation},
trino_cluster::ClusterState,
Expand Down Expand Up @@ -266,7 +266,7 @@ impl Scaler {
Ok(())
}

#[instrument(name = "Scaler::reconcile_cluster_group", skip(self))]
#[instrument(name = "Scaler::reconcile_cluster_group", skip(self, clusters))]
pub async fn reconcile_cluster_group(
self: Arc<Self>,
cluster_group: String,
Expand Down Expand Up @@ -435,7 +435,7 @@ impl Scaler {
Ok(())
}

#[instrument(name = "Scaler::get_current_state", skip(self))]
#[instrument(name = "Scaler::get_current_state", skip(self, scaling_config))]
async fn get_current_cluster_state(
self: Arc<Self>,
cluster_name: TrinoClusterName,
Expand All @@ -461,8 +461,8 @@ impl Scaler {
// State not known in persistence, so let's determine current state
match (activated, ready) {
(true, true) => ClusterState::Ready,
// It could also be Terminating, but in that case it would need to be stored as Terminating
// in the persistence
// It could also be Terminating or Unhealthy, but in that case it would need to be stored as
// Terminating or Unhealthy in the persistence
(true, false) => ClusterState::Starting,
// This might happen for very short time periods. E.g. for the Stackable scaler, this can be
// the case when spec.clusterOperation.stopped was just set to true, but trino-operator did
Expand All @@ -480,8 +480,18 @@ impl Scaler {
}
}
ClusterState::Ready => {
// In the future we might want to check if the cluster is healthy and have a state `Unhealthy`.
ClusterState::Ready
if ready {
ClusterState::Ready
} else {
ClusterState::Unhealthy
}
}
ClusterState::Unhealthy => {
if ready {
ClusterState::Ready
} else {
ClusterState::Unhealthy
}
}
ClusterState::Draining {
last_time_seen_with_queries,
Expand Down Expand Up @@ -540,7 +550,11 @@ impl Scaler {
Ok((cluster_name, current_state))
}

#[instrument(name = "Scaler::apply_target_states", skip(self))]
#[instrument(
name = "Scaler::apply_cluster_target_state",
skip(self, cluster),
fields(cluster_name = cluster.name)
)]
async fn apply_cluster_target_state(
self: Arc<Self>,
cluster: TrinoCluster,
Expand All @@ -555,7 +569,10 @@ impl Scaler {
ClusterState::Stopped | ClusterState::Terminating => {
scaler.deactivate(&cluster.name).await?;
}
ClusterState::Starting | ClusterState::Ready | ClusterState::Draining { .. } => {
ClusterState::Starting
| ClusterState::Unhealthy
| ClusterState::Ready
| ClusterState::Draining { .. } => {
scaler.activate(&cluster.name).await?;
}
ClusterState::Deactivated => {
Expand Down
Loading