@@ -22,6 +22,7 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
2222 :stack_id ,
2323 :publication_name ,
2424 :publication_refresh_period ,
25+ oid_to_relation: % { } ,
2526 relation_ref_counts: % { } ,
2627 prepared_relation_filters: MapSet . new ( ) ,
2728 submitted_relation_filters: MapSet . new ( ) ,
@@ -35,14 +36,17 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
3536 ]
3637
3738 @ type relation_filters ( ) :: MapSet . t ( Electric . oid_relation ( ) )
38- @ typep publication_filter ( ) :: { Electric . oid_relation ( ) , with_generated_cols :: boolean ( ) }
39+ @ typep internal_relation_filters ( ) :: MapSet . t ( Electric . oid ( ) )
40+ @ typep publication_filter ( ) :: { Electric . oid ( ) , with_generated_cols :: boolean ( ) }
41+ @ typep waiter ( ) :: { GenServer . from ( ) , Electric.ShapeCache . shape_handle ( ) }
3942 @ typep state ( ) :: % __MODULE__ {
4043 stack_id: Electric . stack_id ( ) ,
41- relation_ref_counts: % { Electric . oid_relation ( ) => non_neg_integer ( ) } ,
42- prepared_relation_filters: relation_filters ( ) ,
43- submitted_relation_filters: relation_filters ( ) ,
44- committed_relation_filters: relation_filters ( ) ,
45- waiters: % { Electric . oid_relation ( ) => [ GenServer . from ( ) , ... ] } ,
44+ relation_ref_counts: % { Electric . oid ( ) => non_neg_integer ( ) } ,
45+ oid_to_relation: % { Electric . oid ( ) => Electric . relation ( ) } ,
46+ prepared_relation_filters: internal_relation_filters ( ) ,
47+ submitted_relation_filters: internal_relation_filters ( ) ,
48+ committed_relation_filters: internal_relation_filters ( ) ,
49+ waiters: % { Electric . oid ( ) => [ waiter ( ) , ... ] } ,
4650 tracked_shape_handles: % { shape_handle ( ) => publication_filter ( ) } ,
4751 publication_name: String . t ( ) ,
4852 publishes_generated_columns?: boolean ( ) ,
@@ -140,19 +144,24 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
140144 [ ] ,
141145 state . stack_id ,
142146 fn ->
147+ # Build initial state in an ephemeral Task process so that to avoid
148+ # retaining the data from list_shapes in this process's heap.
143149 state =
144- state . stack_id
145- |> Electric.ShapeCache.ShapeStatus . list_shapes ( )
146- |> Enum . reduce (
147- state ,
148- fn { shape_handle , shape } , state ->
149- add_shape_to_publication_filters (
150- shape_handle ,
151- get_publication_filter_from_shape ( shape ) ,
152- state
153- )
154- end
155- )
150+ Task . async ( fn ->
151+ state . stack_id
152+ |> Electric.ShapeCache.ShapeStatus . list_shapes ( )
153+ |> Enum . reduce (
154+ state ,
155+ fn { shape_handle , shape } , state ->
156+ add_shape_to_publication_filters (
157+ shape_handle ,
158+ get_publication_filter_from_shape ( shape ) ,
159+ state
160+ )
161+ end
162+ )
163+ end )
164+ |> Task . await ( :infinity )
156165
157166 # filters will be pulled by the configurator on startup, so no
158167 # need to explicitly call for an update here
@@ -163,13 +172,13 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
163172
164173 @ impl true
165174 def handle_call ( { :add_shape , shape_handle , publication_filter } , from , state ) do
166- { oid_rel , with_gen_cols } = publication_filter
175+ { { oid , _relation } , with_gen_cols } = publication_filter
167176
168177 # if the relation is already committed AND part of the last made
169178 # update submission, we can consider it ready
170179 relation_ready? =
171- MapSet . member? ( state . submitted_relation_filters , oid_rel ) and
172- MapSet . member? ( state . committed_relation_filters , oid_rel )
180+ MapSet . member? ( state . submitted_relation_filters , oid ) and
181+ MapSet . member? ( state . committed_relation_filters , oid )
173182
174183 state = add_shape_to_publication_filters ( shape_handle , publication_filter , state )
175184 state = update_publication_if_necessary ( state )
@@ -215,7 +224,8 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
215224 end
216225
217226 def handle_call ( :fetch_current_filters , _from , state ) do
218- { :reply , state . prepared_relation_filters , state , state . publication_refresh_period }
227+ { :reply , expand_oids ( state . prepared_relation_filters , state ) , state ,
228+ state . publication_refresh_period }
219229 end
220230
221231 @ impl true
@@ -232,16 +242,16 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
232242 state . publication_refresh_period }
233243 end
234244
235- def handle_cast ( { :relation_configuration_result , oid_rel , { :ok , :dropped } } , state ) do
236- new_committed_filters = MapSet . delete ( state . committed_relation_filters , oid_rel )
245+ def handle_cast ( { :relation_configuration_result , { oid , _rel } , { :ok , :dropped } } , state ) do
246+ new_committed_filters = MapSet . delete ( state . committed_relation_filters , oid )
237247
238248 { :noreply , % { state | committed_relation_filters: new_committed_filters } ,
239249 state . publication_refresh_period }
240250 end
241251
242- def handle_cast ( { :relation_configuration_result , oid_rel , { :ok , :configured } } , state ) do
252+ def handle_cast ( { :relation_configuration_result , { oid , _ } = oid_rel , { :ok , :configured } } , state ) do
243253 state = reply_to_relation_waiters ( oid_rel , :ok , state )
244- new_committed_filters = MapSet . put ( state . committed_relation_filters , oid_rel )
254+ new_committed_filters = MapSet . put ( state . committed_relation_filters , oid )
245255
246256 { :noreply , % { state | committed_relation_filters: new_committed_filters } ,
247257 state . publication_refresh_period }
@@ -295,12 +305,22 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
295305 defp update_publication ( state ) do
296306 Electric.Replication.PublicationManager.Configurator . configure_publication (
297307 state . stack_id ,
298- state . prepared_relation_filters
308+ expand_oids ( state . prepared_relation_filters , state )
299309 )
300310
301311 % { state | submitted_relation_filters: state . prepared_relation_filters }
302312 end
303313
314+ @ spec expand_oids ( MapSet . t ( Electric . oid ( ) ) , state ( ) ) :: MapSet . t ( Electric . oid_relation ( ) )
315+ defp expand_oids ( % MapSet { } = oids , state ) do
316+ MapSet . new ( oids , & expand_oid ( & 1 , state ) )
317+ end
318+
319+ @ spec expand_oid ( Electric . oid ( ) , state ( ) ) :: Electric . oid_relation ( )
320+ defp expand_oid ( oid , % { oid_to_relation: oid_to_relation } ) do
321+ { oid , Map . fetch! ( oid_to_relation , oid ) }
322+ end
323+
304324 defp update_needed? ( % __MODULE__ {
305325 prepared_relation_filters: prepared ,
306326 submitted_relation_filters: submitted ,
@@ -319,7 +339,12 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
319339 state
320340 end
321341
322- defp add_shape_to_publication_filters ( shape_handle , { rel_key , _ } = pub_filter , state ) do
342+ defp add_shape_to_publication_filters (
343+ shape_handle ,
344+ { { oid , relation } = rel_key , _ } = pub_filter ,
345+ state
346+ ) do
347+ state = Map . update! ( state , :oid_to_relation , & Map . put_new ( & 1 , oid , relation ) )
323348 state = track_shape_handle ( shape_handle , pub_filter , state )
324349 do_update_relation_filters ( rel_key , :add , state )
325350 end
@@ -342,12 +367,14 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
342367 :add | :remove ,
343368 state ( )
344369 ) :: state ( )
345- defp do_update_relation_filters (
346- { _oid , _rel } = rel_key ,
347- operation ,
348- % __MODULE__ { prepared_relation_filters: prepared , relation_ref_counts: counts } = state
349- ) do
350- current = Map . get ( counts , rel_key , 0 )
370+ defp do_update_relation_filters ( { oid , _rel } = rel_key , operation , % __MODULE__ { } = state ) do
371+ % {
372+ prepared_relation_filters: prepared ,
373+ relation_ref_counts: counts ,
374+ oid_to_relation: oid_lookup
375+ } = state
376+
377+ current = Map . get ( counts , oid , 0 )
351378
352379 new_count =
353380 case operation do
@@ -356,52 +383,60 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
356383 end
357384
358385 # we could rederive the prepared filters from the keys of the counts map
359- # but since we're keeping both arouond might as well not iterate over the
386+ # but since we're keeping both around might as well not iterate over the
360387 # whole map every time
361- { prepared , counts } =
388+ { prepared , counts , oid_lookup } =
362389 cond do
363390 new_count == 0 and current > 0 ->
364- { MapSet . delete ( prepared , rel_key ) , Map . delete ( counts , rel_key ) }
391+ # if the oid is not referenced then remove from the lookup
392+ # so that if the table name has changed we get the new name
393+ # as shapes are defined on it
394+ { MapSet . delete ( prepared , oid ) , Map . delete ( counts , oid ) , Map . delete ( oid_lookup , oid ) }
365395
366396 current == 0 and new_count > 0 ->
367- { MapSet . put ( prepared , rel_key ) , Map . put ( counts , rel_key , new_count ) }
397+ { MapSet . put ( prepared , oid ) , Map . put ( counts , oid , new_count ) , oid_lookup }
368398
369399 new_count > 0 ->
370- { prepared , Map . put ( counts , rel_key , new_count ) }
400+ { prepared , Map . put ( counts , oid , new_count ) , oid_lookup }
371401
372402 true ->
373- { prepared , counts }
403+ { prepared , counts , oid_lookup }
374404 end
375405
376- if not MapSet . member? ( prepared , rel_key ) do
406+ if not MapSet . member? ( prepared , oid ) do
377407 reply_to_relation_waiters (
378408 rel_key ,
379409 { :error , % RuntimeError { message: "Shape removed before updating publication" } } ,
380410 state
381411 )
382412 end
383413
384- % { state | prepared_relation_filters: prepared , relation_ref_counts: counts }
414+ % {
415+ state
416+ | prepared_relation_filters: prepared ,
417+ relation_ref_counts: counts ,
418+ oid_to_relation: oid_lookup
419+ }
385420 end
386421
387422 @ spec add_waiter ( GenServer . from ( ) , shape_handle ( ) , publication_filter ( ) , state ( ) ) ::
388423 state ( )
389424 defp add_waiter ( from , shape_handle , pub_filter , % __MODULE__ { waiters: waiters } = state ) do
390- { oid_rel , _ } = pub_filter
425+ { { oid , _relaion } , _ } = pub_filter
391426 from_tuple = { from , shape_handle }
392- % { state | waiters: Map . update ( waiters , oid_rel , [ from_tuple ] , & [ from_tuple | & 1 ] ) }
427+ % { state | waiters: Map . update ( waiters , oid , [ from_tuple ] , & [ from_tuple | & 1 ] ) }
393428 end
394429
395430 @ spec reply_to_relation_waiters ( Electric . oid_relation ( ) , any ( ) , state ( ) ) :: state ( )
396- defp reply_to_relation_waiters ( oid_rel , reply , % __MODULE__ { waiters: waiters } = state ) do
397- rel_waiters = Map . get ( waiters , oid_rel , [ ] )
431+ defp reply_to_relation_waiters ( { oid , _rel } , reply , % __MODULE__ { waiters: waiters } = state ) do
432+ rel_waiters = Map . get ( waiters , oid , [ ] )
398433 for { from , _ } <- rel_waiters , do: GenServer . reply ( from , reply )
399- % { state | waiters: Map . delete ( waiters , oid_rel ) }
434+ % { state | waiters: Map . delete ( waiters , oid ) }
400435 end
401436
402437 @ spec reply_to_all_waiters ( any ( ) , state ( ) ) :: state ( )
403438 defp reply_to_all_waiters ( reply , % __MODULE__ { waiters: waiters } = state ) do
404- for { _oid_rel , rel_waiters } <- waiters ,
439+ for { _oid , rel_waiters } <- waiters ,
405440 { from , _ } <- rel_waiters ,
406441 do: GenServer . reply ( from , reply )
407442
@@ -419,31 +454,33 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
419454
420455 to_fail =
421456 state . tracked_shape_handles
422- |> Map . filter ( fn { _handle , { _oid_rel , with_gen_cols } } -> with_gen_cols end )
457+ |> Map . filter ( fn { _handle , { _oid , with_gen_cols } } -> with_gen_cols end )
423458
424459 # scan through and reply to any waiters for shapes that require generated columns
425460 new_waiters =
426461 to_fail
427- |> Enum . group_by ( fn { _handle , { oid_rel , _ } } -> oid_rel end , fn { handle , _ } -> handle end )
428- |> Enum . reduce ( state . waiters , fn { oid_rel , handles_to_fail } , waiters ->
429- if rel_waiters = Map . get ( waiters , oid_rel ) do
462+ |> Enum . group_by ( fn { _handle , { oid , _ } } -> oid end , fn { handle , _ } -> handle end )
463+ |> Enum . reduce ( state . waiters , fn { oid , handles_to_fail } , waiters ->
464+ if rel_waiters = Map . get ( waiters , oid ) do
430465 { to_fail , to_keep } =
431466 rel_waiters |> Enum . split_with ( fn { _from , handle } -> handle in handles_to_fail end )
432467
433468 for { from , _ } <- to_fail ,
434469 do: GenServer . reply ( from , { :error , missing_gen_col_error } )
435470
436471 if to_keep == [ ] ,
437- do: Map . delete ( waiters , oid_rel ) ,
438- else: Map . put ( waiters , oid_rel , to_keep )
472+ do: Map . delete ( waiters , oid ) ,
473+ else: Map . put ( waiters , oid , to_keep )
439474 else
440475 waiters
441476 end
442477 end )
443478
444- # schedule removals for any tracked shapes that require generated columns
445- for { handle , _ } <- to_fail do
446- ShapeCleaner . remove_shape_async ( handle , stack_id: state . stack_id )
479+ if not Enum . empty? ( to_fail ) do
480+ # schedule removals for any tracked shapes that require generated columns
481+ handles = for { handle , _ } <- to_fail , do: handle
482+
483+ ShapeCleaner . remove_shapes_async ( state . stack_id , handles )
447484 end
448485
449486 % { state | waiters: new_waiters }
@@ -457,18 +494,21 @@ defmodule Electric.Replication.PublicationManager.RelationTracker do
457494 } = state
458495 )
459496 when is_tracking_shape_handle? ( shape_handle , state ) do
460- { oid_rel , _ } = Map . fetch! ( tracked_shape_handles , shape_handle )
461- oid_rel
497+ { oid , _ } = Map . fetch! ( tracked_shape_handles , shape_handle )
498+ expand_oid ( oid , state )
462499 end
463500
464501 @ spec track_shape_handle ( shape_handle ( ) , publication_filter ( ) , state ( ) ) :: state ( )
465502 defp track_shape_handle (
466503 shape_handle ,
467- pub_filter ,
504+ { { oid , _relation } , generated? } ,
468505 % __MODULE__ { tracked_shape_handles: tracked_shape_handles } = state
469506 )
470507 when not is_tracking_shape_handle? ( shape_handle , state ) do
471- % { state | tracked_shape_handles: Map . put_new ( tracked_shape_handles , shape_handle , pub_filter ) }
508+ % {
509+ state
510+ | tracked_shape_handles: Map . put_new ( tracked_shape_handles , shape_handle , { oid , generated? } )
511+ }
472512 end
473513
474514 @ spec untrack_shape_handle ( shape_handle ( ) , state ( ) ) :: state ( )
0 commit comments