Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a config for enforcing a minimum job reserve interval (default: 0 for now) #48

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Gemfile.lock
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
PATH
remote: .
specs:
delayed (0.5.5)
delayed (0.6.0)
activerecord (>= 5.2)
concurrent-ruby

Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,10 @@ Delayed::Worker.read_ahead = 5

# If a worker finds no jobs, it will sleep this number of seconds in between attempts:
Delayed::Worker.sleep_delay = 5

# Until version 1.0, the worker will not sleep at all between attemps if it finds jobs.
# This can be configured by setting the minimum reserve interval:
Delayed::Worker.min_reserve_interval = 0.5 # seconds
```

If a job fails, it will be rerun up to 25 times (with an exponential back-off). Jobs will also
Expand Down
2 changes: 1 addition & 1 deletion delayed.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ['lib']
spec.summary = 'a multi-threaded, SQL-driven ActiveJob backend used at Betterment to process millions of background jobs per day'

spec.version = '0.5.5'
spec.version = '0.6.0'
spec.metadata = {
'changelog_uri' => 'https://github.com/betterment/delayed/blob/main/CHANGELOG.md',
'bug_tracker_uri' => 'https://github.com/betterment/delayed/issues',
Expand Down
2 changes: 1 addition & 1 deletion lib/delayed/runnable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def start
def on_exit!; end

def interruptable_sleep(seconds)
pipe[0].wait_readable(seconds)
pipe[0].wait_readable(seconds) if seconds.positive?
end

def stop
Expand Down
9 changes: 9 additions & 0 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class Worker
include Runnable

cattr_accessor :sleep_delay, instance_writer: false, default: 5
cattr_accessor :min_reserve_interval, instance_writer: false, default: 0
cattr_accessor :max_attempts, instance_writer: false, default: 25
cattr_accessor :max_claims, instance_writer: false, default: 5
cattr_accessor :max_run_time, instance_writer: false, default: 20.minutes
Expand Down Expand Up @@ -92,6 +93,7 @@ def work_off(num = 100)
total = 0

while total < num
start = clock_time
jobs = reserve_jobs
break if jobs.empty?

Expand All @@ -107,6 +109,9 @@ def work_off(num = 100)
pool.wait_for_termination

break if stop? # leave if we're exiting

elapsed = clock_time - start
interruptable_sleep(self.class.min_reserve_interval - elapsed)
end

[success.value, total - success.value]
Expand Down Expand Up @@ -227,5 +232,9 @@ def reserve_jobs
def reload!
Rails.application.reloader.reload! if defined?(Rails.application.reloader) && Rails.application.reloader.check!
end

def clock_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
end
4 changes: 2 additions & 2 deletions spec/delayed/tasks_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def stub_env(key, value)
.to change { Delayed::Worker.min_priority }.from(nil).to(6)
.and change { Delayed::Worker.max_priority }.from(nil).to(8)
.and change { Delayed::Worker.queues }.from([]).to(%w(foo bar))
.and change { Delayed::Worker.sleep_delay }.from(5).to(1)
.and change { Delayed::Worker.sleep_delay }.from(TEST_SLEEP_DELAY).to(1)
.and change { Delayed::Worker.read_ahead }.from(5).to(3)
.and change { Delayed::Worker.max_claims }.from(5).to(3)
end
Expand Down Expand Up @@ -96,7 +96,7 @@ def stub_env(key, value)
.to change { Delayed::Worker.min_priority }.from(nil).to(6)
.and change { Delayed::Worker.max_priority }.from(nil).to(8)
.and change { Delayed::Worker.queues }.from([]).to(%w(foo))
.and change { Delayed::Worker.sleep_delay }.from(5).to(1)
.and change { Delayed::Worker.sleep_delay }.from(TEST_SLEEP_DELAY).to(1)
.and change { Delayed::Worker.read_ahead }.from(5).to(3)
.and change { Delayed::Worker.max_claims }.from(5).to(3)
end
Expand Down
14 changes: 14 additions & 0 deletions spec/helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,11 @@ class SingletonClass
include Singleton
end

# Negative values are treated as sleep(0),
# so we can use different values to test the sleep behavior:
TEST_MIN_RESERVE_INTERVAL = -10
TEST_SLEEP_DELAY = -100

RSpec.configure do |config|
config.around(:each) do |example|
aj_priority_was = ActiveJob::Base.priority
Expand All @@ -113,6 +118,14 @@ class SingletonClass
queues_was = Delayed::Worker.queues
read_ahead_was = Delayed::Worker.read_ahead
sleep_delay_was = Delayed::Worker.sleep_delay
min_reserve_interval_was = Delayed::Worker.min_reserve_interval

if Gem.loaded_specs['delayed'].version >= Gem::Version.new('1.0') && min_reserve_interval_was.zero?
raise "Min reserve interval should be nonzero in v1.0 release"
end

Delayed::Worker.sleep_delay = TEST_SLEEP_DELAY
Delayed::Worker.min_reserve_interval = TEST_MIN_RESERVE_INTERVAL

example.run
ensure
Expand All @@ -130,6 +143,7 @@ class SingletonClass
Delayed::Worker.queues = queues_was
Delayed::Worker.read_ahead = read_ahead_was
Delayed::Worker.sleep_delay = sleep_delay_was
Delayed::Worker.min_reserve_interval = min_reserve_interval_was

Delayed::Job.delete_all
end
Expand Down
106 changes: 57 additions & 49 deletions spec/worker_spec.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
require 'helper'

describe Delayed::Worker do
before do
described_class.sleep_delay = 0
end

describe 'start' do
it 'runs the :execute lifecycle hook' do
performances = []
Expand Down Expand Up @@ -32,62 +28,74 @@
allow(subject).to receive(:interruptable_sleep).and_call_original
end

context 'when there are no jobs' do
before do
allow(Delayed::Job).to receive(:reserve).and_return([])
end
around do |example|
max_claims_was = described_class.max_claims
described_class.max_claims = max_claims
example.run
ensure
described_class.max_claims = max_claims_was
end

it 'does not log and then sleeps' do
before do
allow(Delayed::Job).to receive(:reserve).and_return((0...jobs_returned).map { job }, [])
end

let(:max_claims) { 1 }
let(:jobs_returned) { 1 }
let(:job) do
instance_double(
Delayed::Job,
id: 123,
max_run_time: 10,
name: 'MyJob',
run_at: Delayed::Job.db_time_now,
created_at: Delayed::Job.db_time_now,
priority: Delayed::Priority.interactive,
queue: 'testqueue',
attempts: 0,
invoke_job: true,
destroy: true,
)
end

it 'logs the count and sleeps only within the loop' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/)
expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL))
expect(subject).not_to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY)
end

context 'when no jobs are returned' do
let(:jobs_returned) { 0 }

it 'does not log and then sleeps only outside of the loop' do
subject.run!
expect(Delayed.logger).not_to have_received(:info)
expect(subject).to have_received(:interruptable_sleep)
expect(subject).to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY)
end
end

context 'when there is a job worked off' do
around do |example|
max_claims_was = described_class.max_claims
described_class.max_claims = max_claims
example.run
ensure
described_class.max_claims = max_claims_was
end

before do
allow(Delayed::Job).to receive(:reserve).and_return([job], [])
end

let(:max_claims) { 1 }
let(:job) do
instance_double(
Delayed::Job,
id: 123,
max_run_time: 10,
name: 'MyJob',
run_at: Delayed::Job.db_time_now,
created_at: Delayed::Job.db_time_now,
priority: Delayed::Priority.interactive,
queue: 'testqueue',
attempts: 0,
invoke_job: true,
destroy: true,
)
end
context 'when max_claims is 3 and 3 jobs are returned' do
let(:max_claims) { 3 }
let(:jobs_returned) { 3 }

it 'logs the count and does not sleep' do
it 'logs the count and sleeps only in the loop' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/)
expect(subject).not_to have_received(:interruptable_sleep)
expect(Delayed.logger).to have_received(:info).with(/3 jobs processed/)
expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL))
expect(subject).not_to have_received(:interruptable_sleep).with(TEST_SLEEP_DELAY)
end
end

context 'when max_claims is 2' do
let(:max_claims) { 2 }
context 'when max_claims is 3 and 2 jobs are returned' do
let(:max_claims) { 3 }
let(:jobs_returned) { 2 }

it 'logs the count and sleeps' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/1 jobs processed/)
expect(subject).to have_received(:interruptable_sleep)
end
it 'logs the count and sleeps both in the loop and outside of the loop' do
subject.run!
expect(Delayed.logger).to have_received(:info).with(/2 jobs processed/)
expect(subject).to have_received(:interruptable_sleep).once.with(a_value_within(1).of(TEST_MIN_RESERVE_INTERVAL))
expect(subject).to have_received(:interruptable_sleep).once.with(TEST_SLEEP_DELAY)
end
end
end
Expand Down
Loading