Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
somnusfish committed Mar 30, 2022
1 parent 38bd652 commit d0d7eb7
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 28 deletions.
66 changes: 42 additions & 24 deletions agent/src/agentclient.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@ const AGENT_SLEEP_DURATION: Duration = Duration::from_secs(5);
/// The module-wide result type.
pub type Result<T> = std::result::Result<T, agentclient_error::Error>;

#[derive(Clone, Default, Debug)]
struct ShadowErrorInfo {
pub crash_count: u32,
pub state_transition_failure_timestamp: Option<DateTime<Utc>>,
}

impl ShadowErrorInfo {
fn new(crash_count: u32, state_transition_failure_timestamp: Option<DateTime<Utc>>) -> Self {
ShadowErrorInfo {
crash_count,
state_transition_failure_timestamp,
}
}
}

#[derive(Clone)]
pub struct BrupopAgent<T: APIServerClient> {
k8s_client: kube::client::Client,
Expand Down Expand Up @@ -162,11 +177,10 @@ impl<T: APIServerClient> BrupopAgent<T> {

/// Gather metadata about the system, node status provided by spec.
#[instrument(skip(self, state), err)]
async fn gather_system_metadata(
async fn shadow_status_with_refreshed_system_matadata(
&self,
state: BottlerocketShadowState,
crash_count: u32,
state_transition_failure_timestamp: Option<DateTime<Utc>>,
shadow_error_info: ShadowErrorInfo,
) -> Result<BottlerocketShadowStatus> {
let os_info = get_os_info()
.await
Expand All @@ -184,8 +198,8 @@ impl<T: APIServerClient> BrupopAgent<T> {
os_info.version_id.clone(),
update_version,
state,
crash_count,
state_transition_failure_timestamp,
shadow_error_info.crash_count,
shadow_error_info.state_transition_failure_timestamp,
))
}

Expand Down Expand Up @@ -227,7 +241,10 @@ impl<T: APIServerClient> BrupopAgent<T> {
#[instrument(skip(self), err)]
async fn initialize_metadata_shadow(&self) -> Result<()> {
let update_node_status = self
.gather_system_metadata(BottlerocketShadowState::Idle, 0, None)
.shadow_status_with_refreshed_system_matadata(
BottlerocketShadowState::Idle,
ShadowErrorInfo::default(),
)
.await?;

self.update_metadata_shadow(update_node_status).await?;
Expand Down Expand Up @@ -285,16 +302,15 @@ impl<T: APIServerClient> BrupopAgent<T> {
&self,
bottlerocket_shadow: &BottlerocketShadow,
state: BottlerocketShadowState,
crash_count: u32,
state_transition_failure_timestamp: Option<DateTime<Utc>>,
shadow_error_info: ShadowErrorInfo,
) -> Result<()> {
let bottlerocket_shadow_status = bottlerocket_shadow
.status
.as_ref()
.context(agentclient_error::MissingBottlerocketShadowStatus)?;

let updated_node_status = self
.gather_system_metadata(state, crash_count, state_transition_failure_timestamp)
.shadow_status_with_refreshed_system_matadata(state, shadow_error_info)
.await?;

if updated_node_status != *bottlerocket_shadow_status {
Expand Down Expand Up @@ -337,7 +353,7 @@ impl<T: APIServerClient> BrupopAgent<T> {
BottlerocketShadowState::Idle => match bottlerocket_shadow_status.current_state {
BottlerocketShadowState::ErrorReset => {
self.handle_recover().await?;
event!(Level::INFO, "Recover from ErrorReset");
event!(Level::INFO, "Recovered from ErrorReset");
}
_ => {
event!(
Expand Down Expand Up @@ -368,8 +384,10 @@ impl<T: APIServerClient> BrupopAgent<T> {
self.update_status_in_shadow(
bottlerocket_shadow,
bottlerocket_shadow_spec.state.clone(),
bottlerocket_shadow_status.crash_count(),
bottlerocket_shadow_status.failure_timestamp().unwrap(),
ShadowErrorInfo::new(
bottlerocket_shadow_status.crash_count(),
bottlerocket_shadow_status.failure_timestamp().unwrap(),
),
)
.await?;
} else {
Expand All @@ -396,7 +414,7 @@ impl<T: APIServerClient> BrupopAgent<T> {
}
}
} else {
event!(Level::INFO, "Did not detect action demand.");
event!(Level::DEBUG, "Did not detect action demand.");
}
Ok(())
}
Expand Down Expand Up @@ -441,26 +459,24 @@ impl<T: APIServerClient> BrupopAgent<T> {

match self.handle_state_transition(&bottlerocket_shadow).await {
Ok(()) => {
let crash_count;
let state_transition_failure_timestamp;
let shadow_error_info;
match bottlerocket_shadow_status.current_state {
// Reset crash_count and state_transition_failure_timestamp for a successful update loop
BottlerocketShadowState::MonitoringUpdate => {
crash_count = 0;
state_transition_failure_timestamp = None;
shadow_error_info = ShadowErrorInfo::default();
}
_ => {
crash_count = bottlerocket_shadow_status.crash_count();
state_transition_failure_timestamp =
bottlerocket_shadow_status.failure_timestamp().unwrap();
shadow_error_info = ShadowErrorInfo::new(
bottlerocket_shadow_status.crash_count(),
bottlerocket_shadow_status.failure_timestamp().unwrap(),
)
}
}
match self
.update_status_in_shadow(
&bottlerocket_shadow,
bottlerocket_shadow.spec.state.clone(),
crash_count,
state_transition_failure_timestamp,
shadow_error_info,
)
.await
{
Expand Down Expand Up @@ -493,8 +509,10 @@ impl<T: APIServerClient> BrupopAgent<T> {
.update_status_in_shadow(
&bottlerocket_shadow,
BottlerocketShadowState::ErrorReset,
bottlerocket_shadow_status.crash_count() + 1,
Some(Utc::now()),
ShadowErrorInfo::new(
bottlerocket_shadow_status.crash_count() + 1,
Some(Utc::now()),
),
)
.await
{
Expand Down
2 changes: 1 addition & 1 deletion controller/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl<T: BottlerocketShadowClient> BrupopController<T> {
async fn find_and_update_ready_node(&self) -> Option<BottlerocketShadow> {
let mut shadows: Vec<BottlerocketShadow> = self.all_nodes();

shadows.sort_by(|a, b| a.compare_status(b));
shadows.sort_by(|a, b| a.compare_crash_count(b));
for brs in shadows {
// If we determine that the spec should change, this node is a candidate to begin updating.
let next_spec = determine_next_node_spec(&brs);
Expand Down
7 changes: 5 additions & 2 deletions controller/src/statemachine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub fn determine_next_node_spec(brs: &BottlerocketShadow) -> BottlerocketShadowS
if node_status.current_version() != target_version {
// Node crashed before but reached time to retry
// Or node just start or completed without crashing
if is_time_reached(node_status) {
if node_allowed_to_update(node_status) {
BottlerocketShadowSpec::new_starting_now(
BottlerocketShadowState::StagedUpdate,
Some(target_version.clone()),
Expand Down Expand Up @@ -72,7 +72,10 @@ pub fn determine_next_node_spec(brs: &BottlerocketShadow) -> BottlerocketShadowS
}
}

fn is_time_reached(node_status: &BottlerocketShadowStatus) -> bool {
/// Returns whether or not an Idle node is allowed to enter an update workflow.
/// This returns false if the node has previously encountered an error and not yet
/// passed its retry timer.
fn node_allowed_to_update(node_status: &BottlerocketShadowStatus) -> bool {
if let Some(crash_time) = node_status.failure_timestamp().unwrap() {
let time_gap = (Utc::now() - crash_time).num_minutes();
exponential_backoff_time_with_upper_limit(
Expand Down
2 changes: 1 addition & 1 deletion models/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ impl BottlerocketShadow {
/// Order BottleRocketShadow based on crash_count in status
/// to determine the priority to be handled by the controller.
/// Uninitialized status should be considered as lowest priority.
pub fn compare_status(&self, other: &Self) -> Ordering {
pub fn compare_crash_count(&self, other: &Self) -> Ordering {
match (self.status.as_ref(), other.status.as_ref()) {
(None, None) => Ordering::Equal,
(Some(_), None) => Ordering::Less,
Expand Down

0 comments on commit d0d7eb7

Please sign in to comment.