Skip to content

Commit

Permalink
throw root stage error through query_result_fetcher channel
Browse files Browse the repository at this point in the history
  • Loading branch information
stdrc committed Oct 8, 2024
1 parent 20699d4 commit 851adc9
Showing 1 changed file with 17 additions and 3 deletions.
20 changes: 17 additions & 3 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ impl StageRunner {

let shutdown_rx0 = shutdown_rx.clone();

expr_context_scope(expr_context, async {
let result = expr_context_scope(expr_context, async {
let executor = executor.build().await?;
let chunk_stream = executor.execute();
let cancelled = pin!(shutdown_rx.cancelled());
Expand All @@ -682,7 +682,19 @@ impl StageRunner {
}
}
Ok(())
}).await?;
}).await;

if let Err(err) = &result {
// If we encountered error when executing root stage locally, we have to notify the result fetcher, which is
// returned by `distribute_execute` and being listened by the FE handler task. Otherwise the FE handler cannot
// properly throw the error to the PG client.
if let Err(_e) = result_tx
.send(Err(TaskExecutionError(err.to_report_string())))
.await
{
warn!("Send task execution failed");
}
}

// Terminated by other tasks execution error, so no need to return error here.
match shutdown_rx0.message() {
Expand All @@ -701,7 +713,9 @@ impl StageRunner {
self.stage.id
);

Ok(())
// We still have to throw the error in this current task, so that `StageRunner::run` can further
// send `Failed` event to stop other stages.
result.map(|_| ())
}

async fn schedule_tasks_for_all(&mut self, shutdown_rx: ShutdownToken) -> SchedulerResult<()> {
Expand Down

0 comments on commit 851adc9

Please sign in to comment.