From a81cfb2a4ae47d50322c6df00c56a704add88a36 Mon Sep 17 00:00:00 2001 From: smudge Date: Wed, 18 Dec 2024 10:23:13 -0500 Subject: [PATCH 1/3] Add min_reserve_interval config to workers --- lib/delayed/runnable.rb | 2 +- lib/delayed/worker.rb | 9 ++++ spec/delayed/tasks_spec.rb | 4 +- spec/helper.rb | 10 ++++ spec/worker_spec.rb | 106 ++++++++++++++++++++----------------- 5 files changed, 79 insertions(+), 52 deletions(-) diff --git a/lib/delayed/runnable.rb b/lib/delayed/runnable.rb index b467ccfd..afa5869b 100644 --- a/lib/delayed/runnable.rb +++ b/lib/delayed/runnable.rb @@ -21,7 +21,7 @@ def start def on_exit!; end def interruptable_sleep(seconds) - pipe[0].wait_readable(seconds) + pipe[0].wait_readable(seconds) if seconds.positive? end def stop diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index f9751742..b2ca131d 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -12,6 +12,7 @@ class Worker include Runnable cattr_accessor :sleep_delay, instance_writer: false, default: 5 + cattr_accessor :min_reserve_interval, instance_writer: false, default: 1 cattr_accessor :max_attempts, instance_writer: false, default: 25 cattr_accessor :max_claims, instance_writer: false, default: 5 cattr_accessor :max_run_time, instance_writer: false, default: 20.minutes @@ -92,6 +93,7 @@ def work_off(num = 100) total = 0 while total < num + start = clock_time jobs = reserve_jobs break if jobs.empty? @@ -107,6 +109,9 @@ def work_off(num = 100) pool.wait_for_termination break if stop? # leave if we're exiting + + elapsed = clock_time - start + interruptable_sleep(self.class.min_reserve_interval - elapsed) end [success.value, total - success.value] @@ -227,5 +232,9 @@ def reserve_jobs def reload! Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check! end + + def clock_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end end end diff --git a/spec/delayed/tasks_spec.rb b/spec/delayed/tasks_spec.rb index 077fa835..d792ba3f 100644 --- a/spec/delayed/tasks_spec.rb +++ b/spec/delayed/tasks_spec.rb @@ -64,7 +64,7 @@ def stub_env(key, value) .to change { Delayed::Worker.min_priority }.from(nil).to(6) .and change { Delayed::Worker.max_priority }.from(nil).to(8) .and change { Delayed::Worker.queues }.from([]).to(%w(foo bar)) - .and change { Delayed::Worker.sleep_delay }.from(5).to(1) + .and change { Delayed::Worker.sleep_delay }.from(TEST_SLEEP_DELAY).to(1) .and change { Delayed::Worker.read_ahead }.from(5).to(3) .and change { Delayed::Worker.max_claims }.from(5).to(3) end @@ -96,7 +96,7 @@ def stub_env(key, value) .to change { Delayed::Worker.min_priority }.from(nil).to(6) .and change { Delayed::Worker.max_priority }.from(nil).to(8) .and change { Delayed::Worker.queues }.from([]).to(%w(foo)) - .and change { Delayed::Worker.sleep_delay }.from(5).to(1) + .and change { Delayed::Worker.sleep_delay }.from(TEST_SLEEP_DELAY).to(1) .and change { Delayed::Worker.read_ahead }.from(5).to(3) .and change { Delayed::Worker.max_claims }.from(5).to(3) end diff --git a/spec/helper.rb b/spec/helper.rb index d3947fd5..6ad99932 100644 --- a/spec/helper.rb +++ b/spec/helper.rb @@ -97,6 +97,11 @@ class SingletonClass include Singleton end +# Negative values are treated as sleep(0), +# so we can use different values to test the sleep behavior: +TEST_MIN_RESERVE_INTERVAL = -10 +TEST_SLEEP_DELAY = -100 + RSpec.configure do |config| config.around(:each) do |example| aj_priority_was = ActiveJob::Base.priority @@ -113,6 +118,10 @@ class SingletonClass queues_was = Delayed::Worker.queues read_ahead_was = Delayed::Worker.read_ahead sleep_delay_was = Delayed::Worker.sleep_delay + min_reserve_interval_was = Delayed::Worker.min_reserve_interval + + Delayed::Worker.sleep_delay = TEST_SLEEP_DELAY + Delayed::Worker.min_reserve_interval = TEST_MIN_RESERVE_INTERVAL example.run ensure @@ -130,6 +139,7 @@ class SingletonClass Delayed::Worker.queues = queues_was Delayed::Worker.read_ahead = read_ahead_was Delayed::Worker.sleep_delay = sleep_delay_was + Delayed::Worker.min_reserve_interval = min_reserve_interval_was Delayed::Job.delete_all end diff --git a/spec/worker_spec.rb b/spec/worker_spec.rb index b641fe48..47ececa8 100644 --- a/spec/worker_spec.rb +++ b/spec/worker_spec.rb @@ -1,10 +1,6 @@ require 'helper' describe Delayed::Worker do - before do - described_class.sleep_delay = 0 - end - describe 'start' do it 'runs the :execute lifecycle hook' do performances = [] @@ -32,62 +28,74 @@ allow(subject).to receive(:interruptable_sleep).and_call_original end - context 'when there are no jobs' do - before do - allow(Delayed::Job).to receive(:reserve).and_return([]) - end + around do |example| + max_claims_was = described_class.max_claims + described_class.max_claims = max_claims + example.run + ensure + described_class.max_claims = max_claims_was + end - it 'does not log and then sleeps' do + before do + allow(Delayed::Job).to receive(:reserve).and_return((0...jobs_returned).map { job }, []) + end + + let(:max_claims) { 1 } + let(:jobs_returned) { 1 } + let(:job) do + instance_double( + Delayed::Job, + id: 123, + max_run_time: 10, + name: 'MyJob', + run_at: Delayed::Job.db_time_now, + created_at: Delayed::Job.db_time_now, + priority: Delayed::Priority.interactive, + queue: 'testqueue', + attempts: 0, + invoke_job: true, + destroy: true, + ) + end + + it 'logs the count and sleeps only within the loop' do + subject.run! + expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/) + expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL)) + expect(subject).not_to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY) + end + + context 'when no jobs are returned' do + let(:jobs_returned) { 0 } + + it 'does not log and then sleeps only outside of the loop' do subject.run! expect(Delayed.logger).not_to have_received(:info) - expect(subject).to have_received(:interruptable_sleep) + expect(subject).to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY) end end - context 'when there is a job worked off' do - around do |example| - max_claims_was = described_class.max_claims - described_class.max_claims = max_claims - example.run - ensure - described_class.max_claims = max_claims_was - end - - before do - allow(Delayed::Job).to receive(:reserve).and_return([job], []) - end - - let(:max_claims) { 1 } - let(:job) do - instance_double( - Delayed::Job, - id: 123, - max_run_time: 10, - name: 'MyJob', - run_at: Delayed::Job.db_time_now, - created_at: Delayed::Job.db_time_now, - priority: Delayed::Priority.interactive, - queue: 'testqueue', - attempts: 0, - invoke_job: true, - destroy: true, - ) - end + context 'when max_claims is 3 and 3 jobs are returned' do + let(:max_claims) { 3 } + let(:jobs_returned) { 3 } - it 'logs the count and does not sleep' do + it 'logs the count and sleeps only in the loop' do subject.run! - expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/) - expect(subject).not_to have_received(:interruptable_sleep) + expect(Delayed.logger).to have_received(:info).with(/3 jobs processed/) + expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL)) + expect(subject).not_to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY) end + end - context 'when max_claims is 2' do - let(:max_claims) { 2 } + context 'when max_claims is 3 and 2 jobs are returned' do + let(:max_claims) { 3 } + let(:jobs_returned) { 2 } - it 'logs the count and sleeps' do - subject.run! - expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/) - expect(subject).to have_received(:interruptable_sleep) - end + it 'logs the count and sleeps both in the loop and outside of the loop' do + subject.run! + expect(Delayed.logger).to have_received(:info).with(/2 jobs processed/) + expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL)) + expect(subject).to have_received(:interruptable_sleep).once.with(TEST_SLEEP_DELAY) end end end From f8ac3349f135e983944b9d64cfc8abde4e3eee76 Mon Sep 17 00:00:00 2001 From: smudge Date: Wed, 18 Dec 2024 10:28:40 -0500 Subject: [PATCH 2/3] Bump version --- Gemfile.lock | 2 +- delayed.gemspec | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index a8773ad4..773ef54a 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -1,7 +1,7 @@ PATH remote: . specs: - delayed (0.5.5) + delayed (0.6.0) activerecord (>= 5.2) concurrent-ruby diff --git a/delayed.gemspec b/delayed.gemspec index 866141fe..de651a36 100644 --- a/delayed.gemspec +++ b/delayed.gemspec @@ -18,7 +18,7 @@ Gem::Specification.new do |spec| spec.require_paths = ['lib'] spec.summary = 'a multi-threaded, SQL-driven ActiveJob backend used at Betterment to process millions of background jobs per day' - spec.version = '0.5.5' + spec.version = '0.6.0' spec.metadata = { 'changelog_uri' => 'https://github.com/betterment/delayed/blob/main/CHANGELOG.md', 'bug_tracker_uri' => 'https://github.com/betterment/delayed/issues', From 200beb5dc2f3f727a4e5ef8e341590373ac30159 Mon Sep 17 00:00:00 2001 From: smudge Date: Wed, 18 Dec 2024 11:53:29 -0500 Subject: [PATCH 3/3] Default to 0 (until v1.0) and document in README --- README.md | 4 ++++ lib/delayed/worker.rb | 2 +- spec/helper.rb | 4 ++++ 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index fd9f292b..beab83ec 100644 --- a/README.md +++ b/README.md @@ -432,6 +432,10 @@ Delayed::Worker.read_ahead = 5 # If a worker finds no jobs, it will sleep this number of seconds in between attempts: Delayed::Worker.sleep_delay = 5 + +# Until version 1.0, the worker will not sleep at all between attemps if it finds jobs. +# This can be configured by setting the minimum reserve interval: +Delayed::Worker.min_reserve_interval = 0.5 # seconds ``` If a job fails, it will be rerun up to 25 times (with an exponential back-off). Jobs will also diff --git a/lib/delayed/worker.rb b/lib/delayed/worker.rb index b2ca131d..c8cff374 100644 --- a/lib/delayed/worker.rb +++ b/lib/delayed/worker.rb @@ -12,7 +12,7 @@ class Worker include Runnable cattr_accessor :sleep_delay, instance_writer: false, default: 5 - cattr_accessor :min_reserve_interval, instance_writer: false, default: 1 + cattr_accessor :min_reserve_interval, instance_writer: false, default: 0 cattr_accessor :max_attempts, instance_writer: false, default: 25 cattr_accessor :max_claims, instance_writer: false, default: 5 cattr_accessor :max_run_time, instance_writer: false, default: 20.minutes diff --git a/spec/helper.rb b/spec/helper.rb index 6ad99932..baec5905 100644 --- a/spec/helper.rb +++ b/spec/helper.rb @@ -120,6 +120,10 @@ class SingletonClass sleep_delay_was = Delayed::Worker.sleep_delay min_reserve_interval_was = Delayed::Worker.min_reserve_interval + if Gem.loaded_specs['delayed'].version >= Gem::Version.new('1.0') && min_reserve_interval_was.zero? + raise "Min reserve interval should be nonzero in v1.0 release" + end + Delayed::Worker.sleep_delay = TEST_SLEEP_DELAY Delayed::Worker.min_reserve_interval = TEST_MIN_RESERVE_INTERVAL