-
Notifications
You must be signed in to change notification settings - Fork 290
feat(sync-service): Send change batches to Consumer #3541
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3541 +/- ##
==========================================
+ Coverage 75.21% 75.36% +0.14%
==========================================
Files 51 51
Lines 2744 2744
Branches 409 405 -4
==========================================
+ Hits 2064 2068 +4
+ Misses 678 674 -4
Partials 2 2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
msfstef
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice work! Would love to see benchmarks as well!
packages/sync-service/lib/electric/postgres/replication_client/message_converter.ex
Outdated
Show resolved
Hide resolved
| Received TransactionFragment that has already been partially processed. | ||
| This scenario is not currently supported. It could occur if the | ||
| batch size was changed while restarting the replication client. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the consumer (I suppose in the TransactionBuilder) simply discards parts of the transaction fragment it receives that it has already seen based on the changes' log offsets, wouldn't this not be a problem anymore?
A little while back I had modified the consumer to idempotently handle transactions so the collector can safely replay them and know that consumers can discard them - shouldn't consumers also idempotently handle transaction fragments regardless of how they are batched?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, this PR was dragging on so this was one of the things I deferred.
| defp add_changes(%{transaction: txn} = state, %TransactionFragment{} = fragment) do | ||
| txn = %{ | ||
| txn | ||
| | changes: Enum.reverse(fragment.changes) ++ txn.changes, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding from above - couldn't we keep track of the last collected change log offset and only append changes that are larger than that to make the transaction building resilient to transactions being fragmented in different ways?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit late to do it here, you need to guard against it at the beginning of ShapeLogCollector so that the EventRouter doesn't get messed up, and at the beginning of Consumer (potentially in the builder but the idea is we remove the builder wherever possible so it would be better to be decoupled)
3a7cff2 to
15b6b00
Compare
|
benchmark this |
f19600e to
007acb3
Compare
Fix for #3414
This PR reduces the memory footprint of a consumer by only sending it changes that affect it. It does this by sending TransactionFragments to the Consumer, which also lays the groundwork to reduce the memory footprint even further (e.g. #3415 )
Performance is comparable to current (but there's a fair amount of variation in the benchmarks as shown by these two different runs!)

