-
Notifications
You must be signed in to change notification settings - Fork 44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix random test failure due to port collision #156
Comments
Hey @josecelano, I see that we have two cases where we can run into port collisions: Questions
Solutions
fn port_tcp_is_available(port: u16) -> bool {
match TcpListener::bind(("127.0.0.1", port)) {
Ok(_) => true,
Err(_) => false,
}
} What do you think? |
I settled for solution 1. Apparently it is an absolute pain to share functionality between tests made in different files. I wanted to separate the ephemeral_configuration and random_port functions from the production code and still be able to share it with our unit and integration tests. This proved however way more difficult than anticipated if not impossible. I'm working on an alternate fix by creating a "test-utlities" crate and importing it as a dev-dependency. |
hi @WarmBeer Port collisions happen:
Answers
SolutionsAt the beginning, I thought that the " singleton vector" solution could be the right one, but now I think this is a problem of several threads trying to compete for the resources of the operating system (ports), and we should solve be as we were installing those clients manually. I mean, the client should be responsible for asking for a free port to the OS. I suppose that is feasible. That would keep tests decoupled. There is another alternative solution. We could try to use fewer ports. In fact, tests should be run faster if we share the same tracker between tests. But I do not like this solution because you can end up having hidden coupling between tests. We can only create 168383 (65535−49152) tests :-) using a different port. |
hey @WarmBeer, I think there is one more solution, maybe the simplest one. We could just catch the error and retry with a new random port. Anyway, I like what you are doing in the PR. I think that approach could be very useful to extract functionality from tests that can potentially be published as an independent package. But I do not know how you will solve the problems that all tests (potentially running in independent threads) have to share the same |
Apparently we can just set the port to "0", to let the OS pick a random unused port. Got this solution from here. |
Hi @da2ce7 , @josecelano
Unfortunately this led to another problem. Because we set port If we follow this test for example: pub fn tracker_configuration() -> Arc<Configuration> {
Arc::new(ephemeral())
}
pub async fn start_default_api() -> Server {
let configuration = tracker_configuration();
start_custom_api(configuration.clone()).await
}
pub async fn start_custom_api(configuration: Arc<Configuration>) -> Server {
let server = start(&configuration);
tracker_apis::start_job(&configuration.http_api, server.tracker.clone()).await;
server
}
fn start(configuration: &Arc<Configuration>) -> Server {
let connection_info = ConnectionInfo::authenticated(
&configuration.http_api.bind_address.clone(),
&configuration.http_api.access_tokens.get_key_value("admin").unwrap().1.clone(),
);
// Set the time of Torrust app starting
lazy_static::initialize(&static_time::TIME_AT_APP_START);
// Initialize the Ephemeral Instance Random Seed
lazy_static::initialize(&ephemeral_instance_keys::RANDOM_SEED);
// Initialize stats tracker
let (stats_event_sender, stats_repository) = Keeper::new_active_instance();
// Initialize Torrust tracker
let tracker = match tracker::Tracker::new(configuration, Some(stats_event_sender), stats_repository) {
Ok(tracker) => Arc::new(tracker),
Err(error) => {
panic!("{}", error)
}
};
// Initialize logging
logging::setup(configuration);
Server {
tracker,
connection_info,
}
}
...
#[tokio::test]
async fn should_authenticate_requests_by_using_a_token_query_param() {
let api_server = start_default_api().await;
let token = api_server.get_connection_info().api_token.unwrap();
let response = Client::new(api_server.get_connection_info())
.get_request_with_query("stats", Query::params([QueryParam::new("token", &token)].to_vec()))
.await;
assert_eq!(response.status(), 200);
} The test uses To get access to the actual bind_address of the api server, we need to some how access the This brings us to the next problem. The api server is being spawned on a separate thread by the following code: pub async fn start_job(config: &HttpApi, tracker: Arc<tracker::Tracker>) -> JoinHandle<()> {
let bind_addr = config
.bind_address
.parse::<std::net::SocketAddr>()
.expect("Tracker API bind_address invalid.");
let ssl_enabled = config.ssl_enabled;
let ssl_cert_path = config.ssl_cert_path.clone();
let ssl_key_path = config.ssl_key_path.clone();
let (tx, rx) = oneshot::channel::<ApiServerJobStarted>();
// Run the API server
let join_handle = tokio::spawn(async move {
if !ssl_enabled {
info!("Starting Torrust APIs server on: http://{}", bind_addr);
let handle = server::start(bind_addr, &tracker);
tx.send(ApiServerJobStarted()).expect("the API server should not be dropped");
if let Ok(()) = handle.await {
info!("Torrust APIs server on http://{} stopped", bind_addr);
}
} else if ssl_enabled && ssl_cert_path.is_some() && ssl_key_path.is_some() {
info!("Starting Torrust APIs server on: https://{}", bind_addr);
let ssl_config = RustlsConfig::from_pem_file(ssl_cert_path.unwrap(), ssl_key_path.unwrap())
.await
.unwrap();
let handle = server::start_tls(bind_addr, ssl_config, &tracker);
tx.send(ApiServerJobStarted()).expect("the API server should not be dropped");
if let Ok(()) = handle.await {
info!("Torrust APIs server on https://{} stopped", bind_addr);
}
}
});
// Wait until the APIs server job is running
match rx.await {
Ok(_msg) => info!("Torrust APIs server started"),
Err(e) => panic!("the API server was dropped: {e}"),
}
join_handle
}
...
pub fn start(socket_addr: SocketAddr, tracker: &Arc<Tracker>) -> impl Future<Output = hyper::Result<()>> {
let app = router(tracker);
let server = axum::Server::bind(&socket_addr).serve(app.into_make_service());
server.with_graceful_shutdown(async move {
tokio::signal::ctrl_c().await.expect("Failed to listen to shutdown signal.");
info!("Stopping Torrust APIs server on http://{} ...", socket_addr);
})
} This makes it impossible for us to access the inner SolutionI think it would be nice if we could wrap all the server logic into a finite state machine. This makes it a lot easier to manage and test. I have made a working implementation: use std::net::{SocketAddr, TcpListener};
use std::sync::Arc;
use axum_server::tls_rustls::RustlsConfig;
use axum_server::Handle;
use futures::Future;
use log::info;
use tokio::signal;
use tokio::task::JoinHandle;
use warp::hyper;
use super::routes::router;
use crate::tracker::Tracker;
#[derive(Debug)]
pub enum Error {
Error(String)
}
pub struct ApiServer<S> {
cfg: torrust_tracker_configuration::HttpApi,
tracker: Arc<Tracker>,
state: S
}
struct Running {
bind_address: SocketAddr,
stop_job_sender: tokio::sync::oneshot::Sender<u8>,
job: JoinHandle<()>
}
struct Stopped;
impl ApiServer<Stopped> {
pub fn new(cfg: torrust_tracker_configuration::HttpApi, tracker: Arc<Tracker>) -> Self {
Self {
cfg,
tracker,
state: Stopped {}
}
}
pub fn start(self) -> Result<ApiServer<Running>, Error> {
let listener = TcpListener::bind(&self.cfg.bind_address)
.map_err(|e| Error::Error(e.to_string()))?;
let bind_address = listener.local_addr()
.map_err(|e| Error::Error(e.to_string()))?;
let tracker = self.tracker.clone();
let (sender, receiver) = tokio::sync::oneshot::channel::<u8>();
let job = tokio::spawn(async move {
// TODO: Listen for shutdown signals.
start_from_tcp_listener(listener, &tracker, receiver).await.unwrap();
});
let running_api_server: ApiServer<Running> = ApiServer {
cfg: self.cfg,
tracker: self.tracker,
state: Running {
bind_address,
stop_job_sender: sender,
job
}
};
Ok(running_api_server)
}
}
impl ApiServer<Running> {
pub async fn stop(self) -> Result<ApiServer<Stopped>, Error> {
self.state.stop_job_sender.send(1)
.map_err(|e| Error::Error(e.to_string()))?;
self.state.job.await;
let stopped_api_server: ApiServer<Stopped> = ApiServer {
cfg: self.cfg,
tracker: self.tracker,
state: Stopped {}
};
Ok(stopped_api_server)
}
}
pub fn start_from_tcp_listener(tcp_listener: TcpListener, tracker: &Arc<Tracker>, stop_receiver: tokio::sync::oneshot::Receiver<u8>) -> impl Future<Output = hyper::Result<()>> {
let app = router(tracker);
let context = tcp_listener.local_addr().expect("Could not get context.");
axum::Server::from_tcp(tcp_listener)
.expect("Could not bind to tcp listener.")
.serve(app.into_make_service())
.with_graceful_shutdown(shutdown_signal(stop_receiver, context.to_string()))
}
async fn shutdown_signal(stop_receiver: tokio::sync::oneshot::Receiver<u8>, context: String) {
let stop = async {
stop_receiver
.await
.expect("Failed to install stop signal.")
};
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = stop => {},
_ = ctrl_c => {},
_ = terminate => {},
}
info!("Stop signal received, starting graceful shutdown for: {context}");
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use torrust_tracker_configuration::Configuration;
use torrust_tracker_test_helpers::configuration::ephemeral;
use crate::apis::server::ApiServer;
use crate::tracker;
use crate::tracker::{statistics};
fn tracker_configuration() -> Arc<Configuration> {
Arc::new(ephemeral())
}
#[tokio::test]
async fn it_should_be_able_to_start_from_stopped_state_and_then_stop_again() {
let cfg = tracker_configuration();
let tracker = Arc::new(
tracker::Tracker::new(&cfg, None, statistics::Repo::new()).unwrap(),
);
let stopped_api_server = ApiServer::new(cfg.http_api.clone(), tracker);
let running_api_server_result = stopped_api_server.start();
assert!(running_api_server_result.is_ok());
let running_api_server = running_api_server_result.unwrap();
assert!(running_api_server.stop().await.is_ok());
}
} What do you think? |
Hey, @WarmBeer, I like this approach. In fact, the current HTTP server is already using a struct. You solved the problem by extracting the server dependency ( Since we only have two states and one state is "empty" (without an internal state) I was thinking if we could simplify it, but I only see these alternatives: pub struct RunningApiServer {
api_server: Arc<ApiServer>,
bind_address: SocketAddr,
stop_job_sender: tokio::sync::oneshot::Sender<u8>,
job: JoinHandle<()>
}
pub struct ApiServer {
cfg: torrust_tracker_configuration::HttpApi,
tracker: Arc<Tracker>,
} or: pub struct ApiServer {
api_server: Arc<ApiServer>,
bind_address: Option<SocketAddr>,
stop_job_sender: Option<tokio::sync::oneshot::Sender<u8>>,
job: Option<JoinHandle<()>>
} or: pub struct ApiServerHandler {
cfg: torrust_tracker_configuration::HttpApi,
tracker: Arc<Tracker>,
api_server_handle: Option<ApiServerHandle>,
}
impl ApiServerHandler {
pub fn start(...) -> {}
pub fn stop(...) -> {}
}
pub struct ApiServerHandle {
bind_address: SocketAddr,
stop_job_sender: tokio::sync::oneshot::Sender<u8>,
job: JoinHandle<()>
} We have two states: "ON" and "OFF", but conceptually for me, the "OFF" state means you do not have that running thread. It's more like an |
@WarmBeer @da2ce7 I've been trying to think why I see something wrong in @WarmBeer's proposal. I think the problem is I see two different parts in the pub struct ApiServer {
cfg: torrust_tracker_configuration::HttpApi,
tracker: Arc<Tracker>,
server_state: State,
}
enum State {
Running {
bind_address: SocketAddr,
stop_job_sender: tokio::sync::oneshot::Sender<u8>,
job: JoinHandle<()>
},
Stopped
} so that only the state changes, but that's not possible. This article talks about that: https://blog.yoshuawuyts.com/state-machines/ I think the main problem is states do not have any behaviour in common, which would allow us to use a trait. They are different because they have different internal data and because of the possible transitions. So this it's the only way I see to remove that "fixed" part from the state: pub struct ApiServer<S> {
state: S
}
struct Running {
bind_address: SocketAddr,
stop_job_sender: tokio::sync::oneshot::Sender<u8>,
job: JoinHandle<()>
}
struct Stopped;
pub struct ApiServerHandler {
cfg: torrust_tracker_configuration::HttpApi,
tracker: Arc<Tracker>
}
impl ApiServerState<Stopped> {
pub fn new() -> Self {
Self {
inner_state: Stopped {}
}
}
}
impl ApiServerHandler {
pub fn start(&self) -> Result<ApiServer<Running>, Error> {
let listener = TcpListener::bind(&self.cfg.bind_address)
.map_err(|e| Error::Error(e.to_string()))?;
let bind_address = listener.local_addr()
.map_err(|e| Error::Error(e.to_string()))?;
let tracker = self.tracker.clone();
let (sender, receiver) = tokio::sync::oneshot::channel::<u8>();
let job = tokio::spawn(async move {
// TODO: Listen for shutdown signals.
start_from_tcp_listener(listener, &tracker, receiver).await.unwrap();
});
let running_api_server: ApiServer<Running> = ApiServer {
state: Running {
bind_address,
stop_job_sender: sender,
job
}
};
Ok(running_api_server)
}
pub async fn stop(running_api_server: ApiServer<Running>) -> Result<ApiServer<Stopped>, Error> {
// It only consumes the state converting it into the new state
// this could be `running_api_server.stop(...)?`
running_api_server.state.stop_job_sender.send(1)
.map_err(|e| Error::Error(e.to_string()))?;
running_api_server.state.job.await;
let stopped_api_server: ApiServer<Stopped> = ApiServer {
state: Stopped {}
};
Ok(stopped_api_server)
}
} With this option, you do not need to move the context (config and tracker) around every time you change the state. But anyway, someone must own the state, so I guess I will end up with the same problem (template for the |
By the way, I think you only need the configuration and tracker in the pub struct ApiServer<S> {
state: S
}
struct Running {
bind_address: SocketAddr,
stop_job_sender: tokio::sync::oneshot::Sender<u8>,
job: JoinHandle<()>
}
struct Stopped {
cfg: torrust_tracker_configuration::HttpApi,
tracker: Arc<Tracker>,
} Because it's the info we need to start the server. The "running" state has it because it has to pass it to the stopped state. It is only a temporary holder for that state. Maybe you can pass it only to the impl ApiServer<Stopped> {
pub fn new() -> Self {
Self {
state: Stopped {}
}
}
pub fn start(self, cfg: torrust_tracker_configuration::HttpApi, tracker: Arc<Tracker>) -> Result<ApiServer<Running>, Error> {
let listener = TcpListener::bind(&self.cfg.bind_address)
.map_err(|e| Error::Error(e.to_string()))?;
let bind_address = listener.local_addr()
.map_err(|e| Error::Error(e.to_string()))?;
let tracker = self.tracker.clone();
let (sender, receiver) = tokio::sync::oneshot::channel::<u8>();
let job = tokio::spawn(async move {
// TODO: Listen for shutdown signals.
start_from_tcp_listener(listener, &tracker, receiver).await.unwrap();
});
let running_api_server: ApiServer<Running> = ApiServer {
cfg: cfg,
tracker: tracker.clone(),
state: Running {
bind_address,
stop_job_sender: sender,
job
}
};
Ok(running_api_server)
}
} |
Storing the configuration and tracker in the shared state is not absolutely necessary, but I think it is just easier to stop and start the server like that. Alternatively, you could pass the configuration and tracker when calling start() on the stopped state like you propose. In the case of our integration tests, we can then move ownership of the configuration and tracker to the |
If we are not storing the configuration in the running state, the state is no-longer circular, perhaps we should three uni-directional states: |
@da2ce7 why? do you consider a running instance with a different configuration a different state? If yes, we could keep the configuration in all states but remove the tracker. Anyway, right now, I do not think we need instances that remember their configuration. I think in all cases, we just start the server, we use it, and it's automatically stopped when it's dropped. Maybe in the future, we could have cases where we want to reuse the same configuration but for some reason, we want to stop the server temporarily. In fact, I do not know where we are going to call the |
Hi @josecelano ,
There are three cases where the server will stop:
In the event that a server is not stopped from the test itself (by calling |
I see that the state is either a circle or a process:
The question, do we start, stop and then restart the server in the same handler? Or is is a one-way process? |
I think all use cases are a one-way process now. I do not see any reason to restart the server. In fact, the only reason I see now is to apply a new configuration. UPDATE: and maybe in that case, we should start a new server in parallel and stop the old one when the new one is ready to accept requests. |
Thank you @WarmBeer. In fact, I was thinking only about testing. I think the server is automatically dropped when the test finishes. I guess tests are running in parallel on different threads, and when the parent thread is stopped, I supposed the child server is also stopped. But I'm not sure how that works. My point was we are not going to call the |
For the API testing, we are using an independent API server for each test because:
We assign a random port for each API server and test with a function like this:
We have reached a high number of parallel running tests that produces this conflict too often.
We could create a centralised free-of-race-conditions service which assigns the free port numbers.
We could also reuse API server for tests that use the same configuration and do not conflict, but I would like to avoid that because it can generate hidden coupling between tests.
The text was updated successfully, but these errors were encountered: