Skip to content

Commit

Permalink
Avoid load hierarchical settings: move to lazy load approach
Browse files Browse the repository at this point in the history
  • Loading branch information
jonatas committed Feb 25, 2025
1 parent a78398d commit 373a860
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 48 deletions.
51 changes: 25 additions & 26 deletions examples/all_in_one/all_in_one.rb
Original file line number Diff line number Diff line change
Expand Up @@ -12,42 +12,18 @@
# ruby all_in_one.rb postgres://user:pass@host:port/db_name
ActiveRecord::Base.establish_connection( ARGV.last)

# Setup Hypertable as in a migration
ActiveRecord::Base.connection.instance_exec do
ActiveRecord::Base.logger = Logger.new(STDOUT)

drop_table(:events, if_exists: true, cascade: true)

hypertable_options = {
time_column: 'time',
chunk_time_interval: '1 day',
}

create_table(:events, id: false, hypertable: hypertable_options) do |t|
t.timestamptz :time, null: false, default: -> { 'now()' }
t.string :identifier, null: false
t.jsonb :payload
end
end

# Simple example
class Event < ActiveRecord::Base
extend Timescaledb::ActsAsHypertable
include Timescaledb::ContinuousAggregatesHelper

acts_as_hypertable time_column: "time",
segment_by: "identifier",
value_column: "cast(payload->>'price' as float)"
segment_by: "identifier"

scope :count_clicks, -> { select("count(*)").where(identifier: "click") }
scope :count_views, -> { select("count(*)").where(identifier: "views") }
scope :purchase, -> { where(identifier: "purchase") }

scope :purchase_stats, -> { purchase.select("stats_agg(#{value_column}) as stats_agg") }

scope :stats, -> { select("average(stats_agg), stddev(stats_agg)") } # just for descendants aggregated classes

continuous_aggregates scopes: [:count_clicks, :count_views, :purchase_stats],
continuous_aggregates scopes: [:count_clicks, :count_views],
timeframes: [:minute, :hour, :day],
refresh_policy: {
minute: {
Expand All @@ -68,6 +44,29 @@ class Event < ActiveRecord::Base
}
end

# Setup Hypertable as in a migration
ActiveRecord::Base.connection.instance_exec do
ActiveRecord::Base.logger = Logger.new(STDOUT)

Event.drop_continuous_aggregates
drop_table(:events, if_exists: true, cascade: true)

hypertable_options = {
time_column: 'time',
chunk_time_interval: '1 day',
compress_after: '7 days',
compress_orderby: 'time',
compress_segmentby: 'identifier',
}

create_table(:events, id: false, hypertable: hypertable_options) do |t|
t.timestamptz :time, null: false, default: -> { 'now()' }
t.string :identifier, null: false
t.jsonb :payload
end
end


ActiveRecord::Base.connection.instance_exec do
Event.create_continuous_aggregates
end
Expand Down
53 changes: 33 additions & 20 deletions lib/timescaledb/continuous_aggregates_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -136,34 +136,47 @@ def define_continuous_aggregate_classes
class_name = "#{aggregate_name}_per_#{timeframe}".classify
const_set(class_name, Class.new(base_model) do
class << self
attr_accessor :config, :timeframe, :base_query, :base_model
attr_accessor :config, :timeframe, :base_query, :base_model, :previous_timeframe, :interval, :aggregate_name, :prev_klass
end

self.table_name = _table_name
self.config = config
self.timeframe = timeframe
self.previous_timeframe = previous_timeframe
self.aggregate_name = aggregate_name

interval = "'1 #{timeframe.to_s}'"
self.interval = "'1 #{timeframe.to_s}'"
self.base_model = base_model
tb = "time_bucket(#{interval}, #{time_column})"
if previous_timeframe
prev_klass = base_model.const_get("#{aggregate_name}_per_#{previous_timeframe}".classify)
select_clause = base_model.apply_rollup_rules("#{config[:select]}")
# Note there's no where clause here, because we're using the previous timeframe's data
self.base_query = "SELECT #{tb} as #{time_column}, #{select_clause} FROM \"#{prev_klass.table_name}\" GROUP BY #{[tb, *config[:group_by]].join(', ')}"
else
scope = base_model.public_send(config[:scope_name])
config[:select] = scope.select_values.select{|e|!e.downcase.start_with?("time_bucket")}.join(', ')
config[:group_by] = scope.group_values
config[:where] = if scope.where_values_hash.present?
scope.where_values_hash.to_sql
elsif scope.where_clause.ast.present? && scope.where_clause.ast.to_sql.present?
scope.where_clause.ast.to_sql

def self.prev_klass
base_model.const_get("#{aggregate_name}_per_#{previous_timeframe}".classify)
end

def self.base_query
@base_query ||= begin
tb = "time_bucket(#{interval}, #{time_column})"
if previous_timeframe
select_clause = base_model.apply_rollup_rules("#{config[:select]}")
# Note there's no where clause here, because we're using the previous timeframe's data
"SELECT #{tb} as #{time_column}, #{select_clause} FROM \"#{prev_klass.table_name}\" GROUP BY #{[tb, *config[:group_by]].join(', ')}"
else
scope = base_model.public_send(config[:scope_name])
config[:select] = scope.select_values.select{|e|!e.downcase.start_with?("time_bucket")}.join(', ')
config[:group_by] = scope.group_values
config[:where] =
if scope.where_values_hash.present?
scope.where_values_hash.map { |key, value| "#{key} = '#{value}'" }.join(' AND ')
elsif scope.where_clause.ast.present? && scope.where_clause.ast.to_sql.present?
scope.where_clause.ast.to_sql
end

sql = "SELECT #{tb} as #{time_column}, #{config[:select]}"
sql += " FROM \"#{base_model.table_name}\""
sql += " WHERE #{config[:where]}" if config[:where]
sql += " GROUP BY #{[tb, *config[:group_by]].join(', ')}"
sql
end
end
self.base_query = "SELECT #{tb} as #{time_column}, #{config[:select]}"
self.base_query += " FROM \"#{base_model.table_name}\""
self.base_query += " WHERE #{config[:where]}" if config[:where]
self.base_query += " GROUP BY #{[tb, *config[:group_by]].join(', ')}"
end

def self.refresh!(start_time = nil, end_time = nil)
Expand Down
7 changes: 5 additions & 2 deletions spec/timescaledb/continuous_aggregates_helper_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,23 @@
expected_config = {
scope_name: :total,
select: "count(*) as total",
group_by: [],
where: nil,
group_by: [],
refresh_policy: {
minute: { start_offset: "10 minutes", end_offset: "1 minute", schedule_interval: "1 minute" },
hour: { start_offset: "4 hour", end_offset: "1 hour", schedule_interval: "1 hour" },
day: { start_offset: "3 day", end_offset: "1 day", schedule_interval: "1 hour" },
month: { start_offset: "3 month", end_offset: "1 hour", schedule_interval: "1 hour" }
}
}
base_query = test_class::TotalPerMinute.base_query
expect(base_query).to eq("SELECT time_bucket('1 minute', ts) as ts, count(*) as total FROM \"hypertable_with_continuous_aggregates\" GROUP BY time_bucket('1 minute', ts)")
expect(test_class::TotalPerMinute.config).to eq(expected_config)
end

it "sets the where clause for each aggregate" do
expect(test_class::PurchaseStatsPerMinute.config[:where]).to eq("(identifier = 'purchase')")
base_query = test_class::PurchaseStatsPerMinute.base_query
expect(base_query).to include("WHERE (identifier = 'purchase')")
end


Expand Down

0 comments on commit 373a860

Please sign in to comment.