diff --git a/lib/async/scheduler.rb b/lib/async/scheduler.rb index 0f883c51..d7ec45d5 100644 --- a/lib/async/scheduler.rb +++ b/lib/async/scheduler.rb @@ -75,30 +75,33 @@ def load # Invoked when the fiber scheduler is being closed. # # Executes the run loop until all tasks are finished, then closes the scheduler. - def scheduler_close + def scheduler_close(error = $!) # If the execution context (thread) was handling an exception, we want to exit as quickly as possible: - unless $! + unless error self.run end ensure self.close end - private def shutdown! - # It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly. - self.stop - - self.run_loop do - unless @children.nil? - run_once! - end + # Terminate all child tasks. + def terminate + # If that doesn't work, take more serious action: + @children&.each do |child| + child.terminate end + + return @children.nil? end # Terminate all child tasks and close the scheduler. # @public Since `stable-v1`. def close - self.shutdown! + self.run_loop do + until self.terminate + self.run_once! + end + end Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0 ensure @@ -288,21 +291,6 @@ def process_wait(pid, flags) return @selector.process_wait(Fiber.current, pid, flags) end - # Run one iteration of the event loop. - # Does not handle interrupts. - # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. - # @returns [Boolean] Whether there is more work to do. - def run_once(timeout = nil) - Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? - - # If we are finished, we stop the task tree and exit: - if @children.nil? - return false - end - - return run_once!(timeout) - end - # Run one iteration of the event loop. # # When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks. @@ -346,6 +334,25 @@ def run_once(timeout = nil) return true end + # Run one iteration of the event loop. + # Does not handle interrupts. + # @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite. + # @returns [Boolean] Whether there is more work to do. + def run_once(timeout = nil) + Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking? + + if self.finished? + self.stop + end + + # If we are finished, we stop the task tree and exit: + if @children.nil? + return false + end + + return run_once!(timeout) + end + # Checks and clears the interrupted state of the scheduler. # @returns [Boolean] Whether the reactor has been interrupted. private def interrupted? @@ -363,10 +370,8 @@ def run_once(timeout = nil) # Stop all children, including transient children, ignoring any signals. def stop - Thread.handle_interrupt(::SignalException => :never) do - @children&.each do |child| - child.stop - end + @children&.each do |child| + child.stop end end @@ -382,7 +387,9 @@ def stop end end rescue Interrupt => interrupt - self.stop + Thread.handle_interrupt(::SignalException => :never) do + self.stop + end retry end @@ -398,11 +405,7 @@ def run(...) initial_task = self.async(...) if block_given? self.run_loop do - if self.finished? - self.stop - end - - run_once! + run_once end return initial_task