Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions app/models/organization.rb
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ def remaining_billing_entities
# logo :string
# name :string not null
# net_payment_term :integer default(0), not null
# pre_filter_events :boolean default(FALSE), not null
# premium_integrations :string default([]), not null, is an Array
# state :string
# tax_identification_number :string
Expand Down
113 changes: 100 additions & 13 deletions app/services/events/billing_period_filter_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,127 @@

module Events
class BillingPeriodFilterService < BaseService
Result = BaseResult[:charge_ids]
Result = BaseResult[:charges]

def initialize(subscription:, boundaries:)
@subscription = subscription
@boundaries = boundaries
super
end

# Return the list of charges and filters that will be used in the billing or usage computation
# The result will be a hash where the key is the charge id and the value is an array of filter ids
# filter ids could also include "nil" as a default filter
def call
values = plan.charges.joins(:billable_metric)
.where(billable_metrics: {code: distinct_event_codes})
.or(plan.charges.joins(:billable_metric).where(billable_metrics: {recurring: true}))
.pluck("DISTINCT(charges.id)")

result.charge_ids = values

result.charges = deduplicate_filters(charges_and_filters)
result
end

private

attr_reader :subscription, :boundaries

delegate :plan, to: :subscription
delegate :plan, :organization, to: :subscription

def distinct_event_codes
Events::Stores::StoreFactory.new_instance(
organization: subscription.organization,
def event_store
@event_store ||= Events::Stores::StoreFactory.new_instance(
organization: organization,
subscription:,
boundaries: {
from_datetime: boundaries.charges_from_datetime,
to_datetime: boundaries.charges_to_datetime
}
).distinct_codes
)
end

def distinct_event_codes
event_store.distinct_codes
end

def charges_and_filters
return charges_and_filters_from_event_codes unless should_check_clickhouse_events?

charges_and_filters_from_clickhouse_events
end

def should_check_clickhouse_events?
organization.clickhouse_events_store? && organization.pre_filter_events?
end

# Return the list of all charges and filters that matches the event codes received in the period
# It also includes the recurring charges and filters
# The result will be a hash where the key is the charge id and the value is an array of filter ids
# filter ids also include "nil" as a default filter
def charges_and_filters_from_event_codes
plan.charges.joins(:billable_metric).left_joins(:filters)
.where(billable_metrics: {code: distinct_event_codes})
.or(plan.charges.joins(:billable_metric).where(billable_metrics: {recurring: true}))
.group("charges.id, charge_filters.id")
.pluck("charges.id", "charge_filters.id")
.then { group_by_charge_id(it) }
.then { add_default_filter(it) }
end

# Return the list of charges and filters that matches the event enriched in clickhouse for the period
# It also includes the recurring charges and filters
# The result will be a hash where the key is the charge id and the value is an array of filter ids
# filter ids also include "nil" as a default filter when applicable
def charges_and_filters_from_clickhouse_events
values = event_store.distinct_charges_and_filters

charge_filter_id = values.map(&:last).reject(&:blank?)
charge_ids = values.map(&:first).uniq

existing_charge_ids = plan.charges.where(id: charge_ids).pluck(:id)
existing_charge_filters = plan.charges.joins(:filters)
.where(charge_filters: {id: charge_filter_id})
.pluck("charge_filters.id")

result = all_recurring_charges_and_filters

values.each do |charge_id, filter_id|
# Charge has been removed from the plan
next unless existing_charge_ids.include?(charge_id)

# Charge has no filters or only default bucket received usage in the period
if filter_id.blank?
result[charge_id] << nil
next
end

# Keep only existing filters
next unless existing_charge_filters.include?(filter_id)
result[charge_id] << filter_id
end

result
end

def all_recurring_charges_and_filters
plan.charges.joins(:billable_metric).left_joins(:filters)
.where(billable_metrics: {recurring: true})
.pluck("charges.id", "filters.id")
.then { group_by_charge_id(it) }
.then { add_default_filter(it) }
end

# Group all charges and filters by charge_id
def group_by_charge_id(rows)
rows.each_with_object(Hash.new { |h, k| h[k] = [] }) do |(charge_id, filter_id), hash|
hash[charge_id] << filter_id
end
end

# Include "default" bucket for recurring charges
def add_default_filter(charges_and_filters)
charges_and_filters.each_value { it << nil }
charges_and_filters
end

# Make sure all filters are unique for each charge
def deduplicate_filters(charges_and_filters)
charges_and_filters.each_value(&:uniq!)
charges_and_filters
end
end
end
8 changes: 8 additions & 0 deletions app/services/events/stores/clickhouse_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ def distinct_codes
end
end

def distinct_charges_and_filters
::Clickhouse::EventsEnrichedExpanded
.where(subscription_id: subscription.id)
.where(organization_id: subscription.organization_id)
.where(timestamp: from_datetime..to_datetime)
.pluck("DISTINCT(charge_filter_id)", "charge_id").map(&:reverse)
end

def events_values(limit: nil, force_from: false, exclude_event: false)
Events::Stores::Utils::ClickhouseConnection.with_retry do
scope = events(force_from:, ordered: true)
Expand Down
12 changes: 8 additions & 4 deletions app/services/fees/charge_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ def initialize(
boundaries:,
context: nil,
cache_middleware: nil,
bypass_aggregation: false,
filtered_aggregations: nil,
apply_taxes: false,
calculate_projected_usage: false,
with_zero_units_filters: true
Expand All @@ -29,7 +29,7 @@ def initialize(
)

# Allow the service to ignore events aggregation
@bypass_aggregation = bypass_aggregation
@filtered_aggregations = filtered_aggregations

super(nil)
end
Expand Down Expand Up @@ -68,7 +68,8 @@ def call

private

attr_accessor :invoice, :charge, :subscription, :boundaries, :context, :current_usage, :currency, :cache_middleware, :bypass_aggregation, :apply_taxes, :calculate_projected_usage, :with_zero_units_filters
attr_accessor :invoice, :charge, :subscription, :boundaries, :context, :current_usage, :currency, :cache_middleware,
:filtered_aggregations, :apply_taxes, :calculate_projected_usage, :with_zero_units_filters

delegate :billable_metric, to: :charge
delegate :organization, to: :subscription
Expand Down Expand Up @@ -305,6 +306,9 @@ def already_billed?
end

def aggregator(charge_filter:)
aggregate = true
aggregate = filtered_aggregations.include?(charge_filter&.id) unless filtered_aggregations.nil?

BillableMetrics::AggregationFactory.new_instance(
charge:,
current_usage:,
Expand All @@ -316,7 +320,7 @@ def aggregator(charge_filter:)
max_timestamp: boundaries.max_timestamp
},
filters: aggregation_filters(charge_filter:),
bypass_aggregation:
bypass_aggregation: !aggregate
)
end

Expand Down
12 changes: 9 additions & 3 deletions app/services/invoices/calculate_fees_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def charge_boundaries_valid?(boundaries)
def create_charges_fees(subscription, boundaries)
return unless charge_boundaries_valid?(boundaries)

filters = event_filters(subscription, boundaries)
filters = event_filters(subscription, boundaries).charges

subscription
.plan
Expand All @@ -116,8 +116,14 @@ def create_charges_fees(subscription, boundaries)
.find_each do |charge|
next if should_not_create_charge_fee?(charge, subscription)

bypass_aggregation = !filters.charge_ids.include?(charge.id)
Fees::ChargeService.call(invoice:, charge:, subscription:, boundaries:, context:, bypass_aggregation:).raise_if_error!
Fees::ChargeService.call!(
invoice:,
charge:,
subscription:,
boundaries:,
context:,
filtered_aggregations: filters[charge.id] || []
)
end
end

Expand Down
14 changes: 5 additions & 9 deletions app/services/invoices/customer_usage_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -91,22 +91,19 @@ def compute_usage
def compute_charge_fees
fees = []

filters = event_filters(subscription, boundaries)
filters = event_filters(subscription, boundaries).charges

subscription
.plan
.charges
.joins(:billable_metric)
.includes(:taxes, billable_metric: :organization, filters: {values: :billable_metric_filter})
.find_each do |charge|
bypass_aggregation = !filters.charge_ids.include?(charge.id)
fees += charge_usage(charge, bypass_aggregation)
end
.find_each { fees += charge_usage(it, filters[it.id] || []) }

fees.sort_by { |f| f.billable_metric.name.downcase }
end

def charge_usage(charge, bypass_aggregation)
def charge_usage(charge, applied_filters)
cache_middleware = Subscriptions::ChargeCacheMiddleware.new(
subscription:,
charge:,
Expand All @@ -118,7 +115,7 @@ def charge_usage(charge, bypass_aggregation)
applied_boundaries = boundaries.dup.tap { it.max_timestamp = max_timestamp } if max_timestamp

Fees::ChargeService
.call(
.call!(
invoice:,
charge:,
subscription:,
Expand All @@ -127,9 +124,8 @@ def charge_usage(charge, bypass_aggregation)
cache_middleware:,
calculate_projected_usage:,
with_zero_units_filters:,
bypass_aggregation:
filtered_aggregations: applied_filters
)
.raise_if_error!
.fees
end

Expand Down
17 changes: 16 additions & 1 deletion app/services/invoices/progressive_billing_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,17 @@ def create_generating_invoice
end

def create_fees
filters = event_filters(subscription, boundaries).charges

charges.find_each do |charge|
Fees::ChargeService.call(invoice:, charge:, subscription:, context: :finalize, boundaries:).raise_if_error!
Fees::ChargeService.call!(
invoice:,
charge:,
subscription:,
context: :finalize,
boundaries:,
filtered_aggregations: filters[charge.id] || []
)
end
end

Expand Down Expand Up @@ -155,5 +164,11 @@ def create_applied_prepaid_credit

invoice.total_amount_cents -= prepaid_credit_result.prepaid_credit_amount_cents
end

def event_filters(subscription, boundaries)
Events::BillingPeriodFilterService.call!(
subscription:, boundaries:
)
end
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# frozen_string_literal: true

class AddPreFilterEventsToOrganizations < ActiveRecord::Migration[8.0]
def change
add_column :organizations, :pre_filter_events, :boolean, default: false, null: false
end
end
2 changes: 2 additions & 0 deletions db/structure.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2582,6 +2582,7 @@ CREATE TABLE public.organizations (
hmac_key character varying NOT NULL,
authentication_methods character varying[] DEFAULT '{email_password,google_oauth}'::character varying[] NOT NULL,
audit_logs_period integer DEFAULT 30,
pre_filter_events boolean DEFAULT false NOT NULL,
CONSTRAINT check_organizations_on_invoice_grace_period CHECK ((invoice_grace_period >= 0)),
CONSTRAINT check_organizations_on_net_payment_term CHECK ((net_payment_term >= 0))
);
Expand Down Expand Up @@ -10594,6 +10595,7 @@ SET search_path TO "$user", public;

INSERT INTO "schema_migrations" (version) VALUES
('20251204142205'),
('20251204101451'),
('20251202141759'),
('20251128102055'),
('20251127145819'),
Expand Down
Loading
Loading