Skip to content

Commit

Permalink
Catch fatal exceptions in futures (#4223)
Browse files Browse the repository at this point in the history
Fixes #4221

Otherwise they just silently terminate the future and leave downstream
`Await`s hanging, resulting in the process hanging waiting for a future
that will never complete. `def reportFailure` doesn't seem to log stuff
properly when this happens for some reason

Covered with additional integration tests
  • Loading branch information
lihaoyi authored Jan 3, 2025
1 parent c5cf3c8 commit d2df820
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 64 deletions.
23 changes: 23 additions & 0 deletions integration/failure/fatal-error/resources/build.mill
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package build
import mill._

def fatalException(msg: String) = {
// Needs to be a fatal error according to scala.util.control.NonFatal,
// not just any error!
val ex = new java.lang.LinkageError(msg)
assert(!scala.util.control.NonFatal.apply(ex))
ex
}
def fatalTask = Task{
throw fatalException("CUSTOM FATAL ERROR IN TASK")
123
}

def alwaysInvalidates = Task.Input(math.random())
def fatalCloseWorker = Task.Worker{
alwaysInvalidates()
new AutoCloseable {
override def close(): Unit =
throw fatalException("CUSTOM FATAL ERROR ON CLOSE")
}
}
27 changes: 27 additions & 0 deletions integration/failure/fatal-error/src/CompileErrorTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package mill.integration

import mill.testkit.UtestIntegrationTestSuite

import utest._

object CompileErrorTests extends UtestIntegrationTestSuite {
val tests: Tests = Tests {
test - integrationTest { tester =>
val res = tester.eval("fatalTask")

assert(res.isSuccess == false)
assert(res.err.contains("""java.lang.LinkageError: CUSTOM FATAL ERROR IN TASK"""))

// Only run this test in client-server mode, since workers are not shutdown
// with `close()` in no-server mode so the error does not trigger
if (clientServerMode) {
// This worker invalidates re-evaluates every time due to being dependent on
// an upstream `Task.Input`. Make sure that a fatal error in the `close()`
// call does not hang the Mill process
tester.eval("fatalCloseWorker")
val res3 = tester.eval("fatalCloseWorker")
assert(res3.err.contains("""java.lang.LinkageError: CUSTOM FATAL ERROR"""))
}
}
}
}
133 changes: 69 additions & 64 deletions main/eval/src/mill/eval/EvaluatorCore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,72 +124,77 @@ private[mill] trait EvaluatorCore extends GroupEvaluator {
)
} else {
futures(terminal) = Future.sequence(deps.map(futures)).map { upstreamValues =>
val countMsg = mill.util.Util.leftPad(
count.getAndIncrement().toString,
terminals.length.toString.length,
'0'
)

val verboseKeySuffix = s"/${terminals0.size}"
logger.setPromptHeaderPrefix(s"$countMsg$verboseKeySuffix")
if (failed.get()) None
else {
val upstreamResults = upstreamValues
.iterator
.flatMap(_.iterator.flatMap(_.newResults))
.toMap

val startTime = System.nanoTime() / 1000

// should we log progress?
val inputResults = for {
target <- group.indexed.filterNot(upstreamResults.contains)
item <- target.inputs.filterNot(group.contains)
} yield upstreamResults(item).map(_._1)
val logRun = inputResults.forall(_.result.isInstanceOf[Result.Success[_]])

val tickerPrefix = terminal.render.collect {
case targetLabel if logRun && logger.enableTicker => targetLabel
}

val contextLogger = new PrefixLogger(
logger0 = logger,
key0 = if (!logger.enableTicker) Nil else Seq(countMsg),
verboseKeySuffix = verboseKeySuffix,
message = tickerPrefix,
noPrefix = exclusive
try {
val countMsg = mill.util.Util.leftPad(
count.getAndIncrement().toString,
terminals.length.toString.length,
'0'
)

val res = evaluateGroupCached(
terminal = terminal,
group = sortedGroups.lookupKey(terminal),
results = upstreamResults,
countMsg = countMsg,
verboseKeySuffix = verboseKeySuffix,
zincProblemReporter = reporter,
testReporter = testReporter,
logger = contextLogger,
classToTransitiveClasses,
allTransitiveClassMethods,
forkExecutionContext,
exclusive
)

if (failFast && res.newResults.values.exists(_.result.asSuccess.isEmpty))
failed.set(true)

val endTime = System.nanoTime() / 1000
val duration = endTime - startTime

val threadId = threadNumberer.getThreadId(Thread.currentThread())
chromeProfileLogger.log(terminal, "job", startTime, duration, threadId, res.cached)

if (!res.cached) uncached.put(terminal, ())
if (res.valueHashChanged) changedValueHash.put(terminal, ())

profileLogger.log(terminal, duration, res, deps)

Some(res)
val verboseKeySuffix = s"/${terminals0.size}"
logger.setPromptHeaderPrefix(s"$countMsg$verboseKeySuffix")
if (failed.get()) None
else {
val upstreamResults = upstreamValues
.iterator
.flatMap(_.iterator.flatMap(_.newResults))
.toMap

val startTime = System.nanoTime() / 1000

// should we log progress?
val inputResults = for {
target <- group.indexed.filterNot(upstreamResults.contains)
item <- target.inputs.filterNot(group.contains)
} yield upstreamResults(item).map(_._1)
val logRun = inputResults.forall(_.result.isInstanceOf[Result.Success[_]])

val tickerPrefix = terminal.render.collect {
case targetLabel if logRun && logger.enableTicker => targetLabel
}

val contextLogger = new PrefixLogger(
logger0 = logger,
key0 = if (!logger.enableTicker) Nil else Seq(countMsg),
verboseKeySuffix = verboseKeySuffix,
message = tickerPrefix,
noPrefix = exclusive
)

val res = evaluateGroupCached(
terminal = terminal,
group = sortedGroups.lookupKey(terminal),
results = upstreamResults,
countMsg = countMsg,
verboseKeySuffix = verboseKeySuffix,
zincProblemReporter = reporter,
testReporter = testReporter,
logger = contextLogger,
classToTransitiveClasses,
allTransitiveClassMethods,
forkExecutionContext,
exclusive
)

if (failFast && res.newResults.values.exists(_.result.asSuccess.isEmpty))
failed.set(true)

val endTime = System.nanoTime() / 1000
val duration = endTime - startTime

val threadId = threadNumberer.getThreadId(Thread.currentThread())
chromeProfileLogger.log(terminal, "job", startTime, duration, threadId, res.cached)

if (!res.cached) uncached.put(terminal, ())
if (res.valueHashChanged) changedValueHash.put(terminal, ())

profileLogger.log(terminal, duration, res, deps)

Some(res)
}
} catch {
case e: Throwable if !scala.util.control.NonFatal(e) =>
throw new Exception(e)
}
}
}
Expand Down

0 comments on commit d2df820

Please sign in to comment.