Skip to content

Commit

Permalink
break out code that waits for new discovery handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
kate-goldenring committed Feb 24, 2021
1 parent 8d28ea5 commit 80e02b7
Showing 1 changed file with 55 additions and 38 deletions.
93 changes: 55 additions & 38 deletions agent/src/util/discovery_operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ impl DiscoveryOperator {
.unwrap()
.into_inner(),
))
// Check if is a UDS connection
// Check if it is a UDS connection
} else if endpoint.starts_with(discovery_handler_path) {
let path = endpoint.to_string();
match Endpoint::try_from("lttp://[::]:50051")
Expand All @@ -147,7 +147,7 @@ impl DiscoveryOperator {
{
Ok(channel) => {
trace!(
"get_stream - external discovery handler for protocol {}",
"get_stream - external discovery handler for protocol {} over UDS",
self.config.spec.protocol.name
);
let mut discovery_client = DiscoveryClient::new(channel);
Expand Down Expand Up @@ -519,14 +519,11 @@ pub mod start_discovery {

/// This is spawned as a task for each Configuration and continues to run
/// until the Configuration is deleted, at which point, this function is signaled to stop.
/// In a separate task, it calls connects to each discovery handler in the RegisteredDiscoveryHandlerMap
/// with the same protocol name as the Configuration (Configuration.protocol.name). Then, it listens for
/// updates from the discovery handler on what devices are currently visible to the node.
/// Passes this list to a function that updates the InstanceConnectivityStatus of the Configuration's Instances
/// or deletes Instance CRs if needed. If a new instance becomes visible that isn't in the Configuration's InstanceMap,
/// a DevicePluginService and Instance CRD are created for it, and it is added to the InstanceMap.
///
/// It also spawns a task to check whether Offline Instances have exceeded their grace period, in which case it
/// It consists of three subtasks:
/// 1) Initiates discovery on all already registered discovery handlers in the RegisteredDiscoveryHandlerMap
/// with the same protocol name as the Configuration (Configuration.protocol.name).
/// 2) Listens for new discover handlers to come online for this Configuration and initiates discovery.
/// 3) Checks whether Offline Instances have exceeded their grace period, in which case it
/// deletes the Instance.
pub async fn start_discovery(
discovery_operator: DiscoveryOperator,
Expand All @@ -542,8 +539,9 @@ pub mod start_discovery {
let config_name = config.metadata.name.clone();
let mut tasks = Vec::new();
let discovery_operator = Arc::new(discovery_operator);
let task1_discovery_operator = discovery_operator.clone();

// Call discover on already registered Discovery Handlers for this Configuration's protocol
let task1_discovery_operator = discovery_operator.clone();
tasks.push(tokio::spawn(async move {
do_discover(
task1_discovery_operator,
Expand All @@ -552,39 +550,25 @@ pub mod start_discovery {
.await
.unwrap();
}));

// Listen for new discovery handlers to call discover on
let mut stop_all_discovery_receiver = stop_all_discovery_sender.subscribe();
let mut new_discovery_handler_receiver = new_discovery_handler_sender.subscribe();
let task2_discovery_operator = discovery_operator.clone();
tasks.push(tokio::spawn(async move {
let mut inner_tasks = Vec::new();
loop {
tokio::select! {
_ = try_receive(&mut stop_all_discovery_receiver) => {
trace!("start_discovery - received message to stop discovery for configuration {}", task2_discovery_operator.get_config().metadata.name);
// stop_offline_checks_sender.send(()).unwrap();
task2_discovery_operator.stop_all_discovery().await;
break;
},
result = try_receive(&mut new_discovery_handler_receiver) => {
// check if it is this protocol
if let Ok(protocol) = result {
if protocol == task2_discovery_operator.get_config().spec.protocol.name {
trace!("start_discovery - received new registered discovery handler for configuration {}", task2_discovery_operator.get_config().metadata.name);
let new_discovery_operator = task2_discovery_operator.clone();
inner_tasks.push(tokio::spawn(async move {
do_discover(new_discovery_operator, Arc::new(Box::new(k8s::create_kube_interface()))).await.unwrap();
}));
}
}
}
}
}
futures::future::try_join_all(inner_tasks).await.unwrap();
listen_for_new_discovery_handlers(
task2_discovery_operator,
&mut new_discovery_handler_receiver,
&mut stop_all_discovery_receiver,
)
.await
.unwrap();
}));
let mut stop_all_discovery_receiver = stop_all_discovery_sender.subscribe();
let task3_discovery_operator = discovery_operator.clone();

// Non-local devices are only allowed to be offline for `SHARED_INSTANCE_OFFLINE_GRACE_PERIOD_SECS` minutes before being removed.
// This task periodically checks if devices have been offline for too long.
let mut stop_all_discovery_receiver = stop_all_discovery_sender.subscribe();
let task3_discovery_operator = discovery_operator.clone();
tasks.push(tokio::spawn(async move {
let kube_interface: Arc<Box<dyn k8s::KubeInterface>> = Arc::new(Box::new(k8s::create_kube_interface()));
loop {
Expand All @@ -608,6 +592,39 @@ pub mod start_discovery {
Ok(())
}

/// Waits to be notified of new discovery handlers. If the discovery handler does discovery for this Configuration's protocol,
/// discovery is kicked off.
async fn listen_for_new_discovery_handlers(
discovery_operator: Arc<DiscoveryOperator>,
new_discovery_handler_receiver: &mut broadcast::Receiver<String>,
stop_all_discovery_receiver: &mut broadcast::Receiver<()>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let mut tasks = Vec::new();
loop {
tokio::select! {
_ = try_receive(stop_all_discovery_receiver) => {
trace!("listen_for_new_discovery_handlers - received message to stop discovery for configuration {}", discovery_operator.get_config().metadata.name);
discovery_operator.stop_all_discovery().await;
break;
},
result = try_receive(new_discovery_handler_receiver) => {
// Check if it is this protocol
if let Ok(protocol) = result {
if protocol == discovery_operator.get_config().spec.protocol.name {
trace!("listen_for_new_discovery_handlers - received new registered discovery handler for configuration {}", discovery_operator.get_config().metadata.name);
let new_discovery_operator = discovery_operator.clone();
tasks.push(tokio::spawn(async move {
do_discover(new_discovery_operator, Arc::new(Box::new(k8s::create_kube_interface()))).await.unwrap();
}));
}
}
}
}
}
futures::future::try_join_all(tasks).await?;
Ok(())
}

/// For each Discovery Handler registered for this DiscoveryOperator's protocol,
/// tries to establish connection with the DiscoveryHandler and spawns a discovery thread for each connection.
/// This function also manages the DiscoveryHandlerConnectivityStatus of each Discovery Handler as follows:
Expand All @@ -617,7 +634,6 @@ pub mod start_discovery {
/// /// DiscoveryHandlerConnectivityStatus::Offline if a connection cannot be established via a call to get_stream
/// If a connection cannot be established, continues to try, sleeping between iteration.
/// Removes the discovery handler from the RegisteredDiscoveryHandlerMap if it has been offline for longer than the grace period.
pub async fn do_discover(
discovery_operator: Arc<DiscoveryOperator>,
kube_interface: Arc<Box<dyn k8s::KubeInterface>>,
Expand Down Expand Up @@ -710,6 +726,7 @@ pub mod start_discovery {
Ok(())
}
}

async fn try_receive<T>(
receiver: &mut broadcast::Receiver<T>,
) -> Result<T, tokio::sync::broadcast::RecvError>
Expand Down

0 comments on commit 80e02b7

Please sign in to comment.