Skip to content

Commit

Permalink
Re-introduce terminate.
Browse files Browse the repository at this point in the history
  • Loading branch information
ioquatix committed Aug 6, 2024
1 parent 67d11ac commit bdc83da
Showing 1 changed file with 39 additions and 36 deletions.
75 changes: 39 additions & 36 deletions lib/async/scheduler.rb
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,33 @@ def load
# Invoked when the fiber scheduler is being closed.
#
# Executes the run loop until all tasks are finished, then closes the scheduler.
def scheduler_close
def scheduler_close(error = $!)
# If the execution context (thread) was handling an exception, we want to exit as quickly as possible:
unless $!
unless error
self.run
end
ensure
self.close
end

private def shutdown!
# It's critical to stop all tasks. Otherwise they might be holding on to resources which are never closed/released correctly.
self.stop

self.run_loop do
unless @children.nil?
run_once!
end
# Terminate all child tasks.
def terminate
# If that doesn't work, take more serious action:
@children&.each do |child|
child.terminate
end

return @children.nil?
end

# Terminate all child tasks and close the scheduler.
# @public Since `stable-v1`.
def close
self.shutdown!
self.run_loop do
until self.terminate
self.run_once!
end
end

Kernel.raise "Closing scheduler with blocked operations!" if @blocked > 0
ensure
Expand Down Expand Up @@ -288,21 +291,6 @@ def process_wait(pid, flags)
return @selector.process_wait(Fiber.current, pid, flags)
end

# Run one iteration of the event loop.
# Does not handle interrupts.
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
def run_once(timeout = nil)
Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?

# If we are finished, we stop the task tree and exit:
if @children.nil?
return false
end

return run_once!(timeout)
end

# Run one iteration of the event loop.
#
# When terminating the event loop, we already know we are finished. So we don't need to check the task tree. This is a logical requirement because `run_once` ignores transient tasks. For example, a single top level transient task is not enough to keep the reactor running, but during termination we must still process it in order to terminate child tasks.
Expand Down Expand Up @@ -346,6 +334,25 @@ def run_once(timeout = nil)
return true
end

# Run one iteration of the event loop.
# Does not handle interrupts.
# @parameter timeout [Float | Nil] The maximum timeout, or if nil, indefinite.
# @returns [Boolean] Whether there is more work to do.
def run_once(timeout = nil)
Kernel.raise "Running scheduler on non-blocking fiber!" unless Fiber.blocking?

if self.finished?
self.stop
end

# If we are finished, we stop the task tree and exit:
if @children.nil?
return false
end

return run_once!(timeout)
end

# Checks and clears the interrupted state of the scheduler.
# @returns [Boolean] Whether the reactor has been interrupted.
private def interrupted?
Expand All @@ -363,10 +370,8 @@ def run_once(timeout = nil)

# Stop all children, including transient children, ignoring any signals.
def stop
Thread.handle_interrupt(::SignalException => :never) do
@children&.each do |child|
child.stop
end
@children&.each do |child|
child.stop
end
end

Expand All @@ -382,7 +387,9 @@ def stop
end
end
rescue Interrupt => interrupt
self.stop
Thread.handle_interrupt(::SignalException => :never) do
self.stop
end

retry
end
Expand All @@ -398,11 +405,7 @@ def run(...)
initial_task = self.async(...) if block_given?

self.run_loop do
if self.finished?
self.stop
end

run_once!
run_once
end

return initial_task
Expand Down

0 comments on commit bdc83da

Please sign in to comment.