Skip to content

Commit 829712f

Browse files
authored
feat(sync-service): All configuring process GC settings for ShapeLogCollector (#3545)
Add an env var for permanent configuration of per-process flags to configure gc settings and add to ShapeLogCollector Also add a simple call to manually set values on the running SLC
1 parent 45e3490 commit 829712f

File tree

8 files changed

+152
-6
lines changed

8 files changed

+152
-6
lines changed

packages/sync-service/config/runtime.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,8 @@ config :electric,
247247
nil
248248
),
249249
process_registry_partitions: env!("ELECTRIC_TWEAKS_PROCESS_REGISTRY_PARTITIONS", :integer, nil),
250+
process_spawn_opts:
251+
env!("ELECTRIC_PROCESS_SPAWN_OPTS", &Electric.Config.parse_spawn_opts!/1, %{}),
250252
http_api_num_acceptors: env!("ELECTRIC_TWEAKS_HTTP_API_NUM_ACCEPTORS", :integer, 100),
251253
conn_max_requests: env!("ELECTRIC_TWEAKS_CONN_MAX_REQUESTS", :integer, nil),
252254
tcp_send_timeout:

packages/sync-service/lib/electric/application.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,8 @@ defmodule Electric.Application do
142142
cleanup_interval_ms: get_env(opts, :cleanup_interval_ms),
143143
shape_hibernate_after: get_env(opts, :shape_hibernate_after),
144144
shape_enable_suspend?: get_env(opts, :shape_enable_suspend?),
145-
conn_max_requests: get_env(opts, :conn_max_requests)
145+
conn_max_requests: get_env(opts, :conn_max_requests),
146+
process_spawn_opts: get_env(opts, :process_spawn_opts)
146147
],
147148
manual_table_publishing?: get_env(opts, :manual_table_publishing?)
148149
)

packages/sync-service/lib/electric/config.ex

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,12 @@ defmodule Electric.Config do
9393
conn_max_requests: 50,
9494
## Performance tweaks
9595
publication_alter_debounce_ms: 0,
96+
# allow for configuring per-process `Process.spawn_opt()`. In the form
97+
# %{process_id :: atom() => [Process.spawn_opt()]}
98+
# See `Process.flag/2`
99+
#
100+
# e.g. %{shape_log_collector: [min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024]}
101+
process_spawn_opts: %{},
96102
## Misc
97103
process_registry_partitions: &Electric.Config.Defaults.process_registry_partitions/0,
98104
feature_flags: if(Mix.env() == :test, do: @known_feature_flags, else: []),
@@ -466,6 +472,69 @@ defmodule Electric.Config do
466472
end
467473
end
468474

475+
@valid_spawn_opts ~w[min_bin_vheap_size min_heap_size priority fullsweep_after message_queue_data]
476+
477+
@doc """
478+
Parse `spawn_opts` from environment variable to keyword list suitable for passing to `GenServer.start_link/2`
479+
480+
## Examples
481+
482+
iex> parse_spawn_opts!(~S({"shape_log_collector":{"min_heap_size":234,"min_bin_vheap_size":123,"message_queue_data":"on_heap","priority":"high","fullsweep_after":104}}))
483+
%{shape_log_collector: [fullsweep_after: 104, message_queue_data: :on_heap, min_bin_vheap_size: 123, min_heap_size: 234, priority: :high]}
484+
485+
iex> parse_spawn_opts!(~S({"shape_log_collector":{"monkey":123,"message_queue_data":"on_fire","min_bin_vheap_size":-1,"priority":"high"}}))
486+
%{shape_log_collector: [priority: :high]}
487+
488+
iex> parse_spawn_opts!("")
489+
%{}
490+
491+
iex> parse_spawn_opts!("{}")
492+
%{}
493+
"""
494+
def parse_spawn_opts!("") do
495+
%{}
496+
end
497+
498+
def parse_spawn_opts!(str) do
499+
str
500+
|> Jason.decode!()
501+
|> then(fn
502+
opts when is_map(opts) ->
503+
for {process_name, process_opts} <- opts, is_map(process_opts), into: %{} do
504+
opts =
505+
for {opt_key, opt_val} <- process_opts,
506+
opt_key in @valid_spawn_opts,
507+
key = String.to_atom(opt_key),
508+
val = validate_spawn_opt(key, opt_val) do
509+
{key, val}
510+
end
511+
512+
{String.to_atom(process_name), opts}
513+
end
514+
515+
_invalid ->
516+
raise ArgumentError, message: "Invalid spawn opts: #{inspect(str)}"
517+
end)
518+
|> tap(fn process_spawn_opts ->
519+
if map_size(process_spawn_opts) > 0 do
520+
Logger.info("Process spawn opts: #{inspect(process_spawn_opts)}")
521+
end
522+
end)
523+
end
524+
525+
defp validate_spawn_opt(:min_bin_vheap_size, val) when is_integer(val) and val >= 0, do: val
526+
defp validate_spawn_opt(:min_heap_size, val) when is_integer(val) and val >= 0, do: val
527+
528+
defp validate_spawn_opt(:priority, val) when val in ["low", "normal", "high"],
529+
do: String.to_atom(val)
530+
531+
defp validate_spawn_opt(:fullsweep_after, val) when is_integer(val) and val >= 0, do: val
532+
533+
defp validate_spawn_opt(:message_queue_data, val) when val in ["off_heap", "on_heap"],
534+
do: String.to_atom(val)
535+
536+
defp validate_spawn_opt(_, _), do: nil
537+
469538
@doc false
470539
# helper function for use in doc tests
471540
def deobfuscate({:ok, connection_opts}),

packages/sync-service/lib/electric/replication/shape_log_collector.ex

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,13 @@ defmodule Electric.Replication.ShapeLogCollector do
3838
when is_map_key(state, :last_processed_lsn) and not is_nil(state.last_processed_lsn)
3939

4040
def start_link(opts) do
41-
with {:ok, opts} <- NimbleOptions.validate(opts, @schema) do
42-
GenServer.start_link(__MODULE__, Map.new(opts), name: name(opts[:stack_id]))
41+
with {:ok, opts} <- NimbleOptions.validate(Map.new(opts), @schema) do
42+
stack_id = opts[:stack_id]
43+
44+
GenServer.start_link(__MODULE__, opts,
45+
name: name(stack_id),
46+
spawn_opt: Electric.StackConfig.spawn_opts(stack_id, :shape_log_collector)
47+
)
4348
end
4449
end
4550

@@ -93,6 +98,34 @@ defmodule Electric.Replication.ShapeLogCollector do
9398
GenServer.call(server(server_ref), :active_shapes)
9499
end
95100

101+
@doc """
102+
Set process flags on the given ShapeLogCollector process.
103+
104+
Accepts a list of flags to set, see `Process.flag/2` for valid settings.
105+
106+
Doesn't crash if given an invalid flag or value - instead returns the list of
107+
invalid flags.
108+
109+
iex> ShapeLogCollector.set_process_flags("my-stack-id", min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024)
110+
{:ok, settings: [min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024], invalid: []}
111+
"""
112+
def set_process_flags(server_ref, flags) do
113+
GenServer.call(server(server_ref), {:set_process_flags, flags}, :infinity)
114+
end
115+
116+
def get_process_flags(server_ref) do
117+
if pid = server(server_ref) |> GenServer.whereis() do
118+
{:garbage_collection, gc_flags} = :erlang.process_info(pid, :garbage_collection)
119+
{:priority, priority} = :erlang.process_info(pid, :priority)
120+
121+
{:ok,
122+
[priority: priority] ++
123+
Keyword.take(gc_flags, [:min_bin_vheap_size, :min_heap_size, :fullsweep_after])}
124+
else
125+
:error
126+
end
127+
end
128+
96129
def init(opts) do
97130
activate_mocked_functions_from_test_process()
98131

@@ -234,6 +267,20 @@ defmodule Electric.Replication.ShapeLogCollector do
234267
{:reply, Filter.active_shapes(state.filter), state}
235268
end
236269

270+
def handle_call({:set_process_flags, flags}, _from, state) do
271+
{settings, invalid} =
272+
Enum.flat_map_reduce(flags, [], fn {flag, value}, invalid ->
273+
try do
274+
{[{flag, Process.flag(flag, value)}], invalid}
275+
rescue
276+
ArgumentError ->
277+
{[], [flag | invalid]}
278+
end
279+
end)
280+
281+
{:reply, {:ok, [settings: settings, invalid: invalid]}, state}
282+
end
283+
237284
def handle_cast({:writer_flushed, shape_id, offset}, state) do
238285
{:noreply,
239286
state

packages/sync-service/lib/electric/stack_config.ex

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ defmodule Electric.StackConfig do
99
:ets.lookup_element(table(stack_id), key, 2, default)
1010
end
1111

12+
def spawn_opts(stack_id, process_name) do
13+
stack_id
14+
|> lookup(:process_spawn_opts, %{})
15+
|> Map.get(process_name, [])
16+
end
17+
1218
def lookup!(stack_id, key) do
1319
:ets.lookup_element(table(stack_id), key, 2)
1420
rescue
@@ -25,7 +31,8 @@ defmodule Electric.StackConfig do
2531
shape_hibernate_after: Electric.Config.default(:shape_hibernate_after),
2632
shape_enable_suspend?: Electric.Config.default(:shape_enable_suspend?),
2733
chunk_bytes_threshold: Electric.ShapeCache.LogChunker.default_chunk_size_threshold(),
28-
feature_flags: []
34+
feature_flags: [],
35+
process_spawn_opts: %{}
2936
]
3037
end
3138

packages/sync-service/lib/electric/stack_supervisor.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,8 @@ defmodule Electric.StackSupervisor do
135135
conn_max_requests: [
136136
type: :pos_integer,
137137
default: Electric.Config.default(:conn_max_requests)
138-
]
138+
],
139+
process_spawn_opts: [type: :map, default: %{}]
139140
]
140141
],
141142
manual_table_publishing?: [
@@ -317,6 +318,7 @@ defmodule Electric.StackSupervisor do
317318

318319
shape_hibernate_after = Keyword.fetch!(config.tweaks, :shape_hibernate_after)
319320
shape_enable_suspend? = Keyword.fetch!(config.tweaks, :shape_enable_suspend?)
321+
process_spawn_opts = Keyword.fetch!(config.tweaks, :process_spawn_opts)
320322

321323
shape_cache_opts = [
322324
stack_id: stack_id
@@ -366,6 +368,7 @@ defmodule Electric.StackSupervisor do
366368
inspector: inspector,
367369
shape_hibernate_after: shape_hibernate_after,
368370
shape_enable_suspend?: shape_enable_suspend?,
371+
process_spawn_opts: process_spawn_opts,
369372
feature_flags: Map.get(config, :feature_flags, [])
370373
]},
371374
{Electric.AsyncDeleter,

packages/sync-service/test/electric/replication/shape_log_collector_test.exs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,22 @@ defmodule Electric.Replication.ShapeLogCollectorTest do
8282
%{server: pid, registry: registry_name, shape_cache: shape_cache_pid}
8383
end
8484

85+
describe "process gc configuration" do
86+
setup :setup_log_collector
87+
88+
@tag process_spawn_opts: %{
89+
shape_log_collector: [priority: :high, min_bin_vheap_size: 1024 * 1024]
90+
}
91+
test "are correctly passed to process", ctx do
92+
pid = ShapeLogCollector.name(ctx.stack_id) |> GenServer.whereis()
93+
94+
info = Process.info(pid)
95+
96+
assert :high == info[:priority]
97+
assert info[:garbage_collection][:min_bin_vheap_size] >= 1024 * 1024
98+
end
99+
end
100+
85101
describe "shape restoration" do
86102
setup :setup_log_collector
87103

packages/sync-service/test/support/component_setup.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,8 @@ defmodule Support.ComponentSetup do
111111
Map.get(ctx, :registry, Electric.StackSupervisor.registry_name(stack_id)),
112112
shape_hibernate_after: Map.get(ctx, :shape_hibernate_after, 1_000),
113113
shape_enable_suspend?: Map.get(ctx, :suspend, false),
114-
feature_flags: Electric.Config.get_env(:feature_flags)
114+
feature_flags: Electric.Config.get_env(:feature_flags),
115+
process_spawn_opts: Map.get(ctx, :process_spawn_opts, %{})
115116
],
116117
seed_config
117118
)}

0 commit comments

Comments
 (0)