diff --git a/crates/e2e/src/node_proc.rs b/crates/e2e/src/node_proc.rs index f7358b33e5c..8c8256a3b20 100644 --- a/crates/e2e/src/node_proc.rs +++ b/crates/e2e/src/node_proc.rs @@ -1,4 +1,4 @@ -// Copyright (C) Parity Technologies (UK) Ltd. +// Copyright 2018-2022 Parity Technologies (UK) Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,22 +14,18 @@ use sp_keyring::AccountKeyring; use std::{ - ffi::{ - OsStr, - OsString, - }, - io::{ - BufRead, - BufReader, - Read, - }, + ffi::{OsStr, OsString}, + io::{BufRead, BufReader, Read}, process, + sync::mpsc, }; use subxt::{ - Config, - OnlineClient, + Config, + OnlineClient }; +use tokio::task; + /// Spawn a local substrate node for testing. pub struct TestNodeProcess { proc: process::Child, @@ -60,10 +56,8 @@ where /// Attempt to kill the running substrate process. pub fn kill(&mut self) -> Result<(), String> { - tracing::info!("Killing node process {}", self.proc.id()); if let Err(err) = self.proc.kill() { let err = format!("Error killing node process {}: {}", self.proc.id(), err); - tracing::error!("{}", err); return Err(err) } Ok(()) @@ -134,22 +128,23 @@ where // Wait for RPC port to be logged (it's logged to stderr): let stderr = proc.stderr.take().unwrap(); - let port = find_substrate_port_from_output(stderr); - let url = format!("ws://127.0.0.1:{port}"); + let port_future = + tokio::task::spawn_blocking(move || find_substrate_port_from_output(stderr)); + let port = port_future + .await + .map_err(|_| "Failed to spawn blocking task".to_string())?; + let url = format!("ws://127.0.0.1:{}", port.await); // Connect to the node with a `subxt` client: let client = OnlineClient::from_url(url.clone()).await; match client { - Ok(client) => { - Ok(TestNodeProcess { - proc, - client, - url: url.clone(), - }) - } + Ok(client) => Ok(TestNodeProcess { + proc, + client, + url: url.clone(), + }), Err(err) => { let err = format!("Failed to connect to node rpc at {url}: {err}"); - tracing::error!("{}", err); proc.kill().map_err(|e| { format!("Error killing substrate process '{}': {}", proc.id(), e) })?; @@ -159,37 +154,74 @@ where } } -// Consume a stderr reader from a spawned substrate command and -// locate the port number that is logged out to it. -fn find_substrate_port_from_output(r: impl Read + Send + 'static) -> u16 { - BufReader::new(r) - .lines() - .find_map(|line| { - let line = - line.expect("failed to obtain next line from stdout for port discovery"); - - // does the line contain our port (we expect this specific output from - // substrate). - let line_end = line - .rsplit_once("Listening for new connections on 127.0.0.1:") - .or_else(|| { - line.rsplit_once("Running JSON-RPC WS server: addr=127.0.0.1:") - }) - .or_else(|| line.rsplit_once("Running JSON-RPC server: addr=127.0.0.1:")) - .map(|(_, port_str)| port_str)?; - - // trim non-numeric chars from the end of the port part of the line. - let port_str = line_end.trim_end_matches(|b: char| !b.is_ascii_digit()); - - // expect to have a number here (the chars after '127.0.0.1:') and parse them - // into a u16. - let port_num = port_str.parse().unwrap_or_else(|_| { - panic!("valid port expected for tracing line, got '{port_str}'") - }); - - Some(port_num) - }) - .expect("We should find a port before the reader ends") +/// Extracting the port number from the output of a spawned substrate command. +/// ========================================================================= +/// +/// Here I'm basically reading the output lines until it finds the port number +/// being used by the substrate instance; specifically looking for lines +/// that contain the local host address (127.0.0.1) followed by the port number. +/// +/// # Parameters: +/// - r: A reader that provides access to the output stream +/// of the substrate command. +/// +/// # Returns: +/// - The port number as a `u16`. +/// +/// # Panics: +/// - If no line contains the expected port number format. +/// - If the extracted port string isn't a valid number. +/// +/// # Notes: +/// Due to the asynchronous nature of reading the output and the potential +/// blocking behavior, the idea here is to spawns a blocking task to ensure +/// smooth operations in an asynchronous context; this way, it communicates +/// the found port number back to the main context using a channel. +async fn find_substrate_port_from_output(r: impl Read + Send + 'static) -> u16 { + // This creates a channel to communicate the port number back to the main context. + let (tx, rx) = mpsc::channel(); + + // Using tokio::task to spawn a new task for reading + task::spawn_blocking(move || { + let port = BufReader::new(r) + .lines() + .find_map(|line| { + let line = line + .expect("failed to obtain next line from stdout for port discovery"); + + // does the line contain our port (we expect this specific output from + // substrate). + let line_end = line + .rsplit_once("Listening for new connections on 127.0.0.1:") + .or_else(|| { + line.rsplit_once("Running JSON-RPC WS server: addr=127.0.0.1:") + }) + .or_else(|| { + line.rsplit_once(r"Running JSON-RPC server: addr=127.0.0.1:") + }) + .map(|(_, port_str)| port_str)?; + + // trim non-numeric chars from the end of the port part of the line. + let port_str = line_end.trim_end_matches(|b: char| !b.is_ascii_digit()); + + // expect to have a number here (the chars after '127.0.0.1:') and parse + // them into a u16. + let port_num = port_str.parse().unwrap_or_else(|_| { + panic!("valid port expected for tracing line, got '{port_str}'") + }); + + Some(port_num) + }) + .expect("We should find a port before the reader ends"); + + // This sends the port via the channel + tx.send(port) + .expect("Failed to send port from the reader task"); + }); + + // This receives the port from the channel + rx.recv() + .expect("Failed to receive port from the reader task") } #[cfg(test)]