-
Notifications
You must be signed in to change notification settings - Fork 728
chore: add script to fix activityRelations with wrong memberId #3686
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
skwowet
wants to merge
19
commits into
main
Choose a base branch
from
script/fix-activityRelations-memberId
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+468
−0
Open
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
4434ffb
chore: add script to fix activityRelations with wrong memberId
skwowet 0b58663
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet f3a0d75
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet ef00b0b
chore(deps): bump js-yaml from 4.1.0 to 4.1.1 (#3635)
dependabot[bot] 9f15ecb
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet 76208af
fix: improve error logging and conditional console output in activity…
skwowet c494da2
fix: dedup activity relations by memberId, username, and platform in …
skwowet c75e2fb
refactor: rename func to trigger temporal rebuild
skwowet 99f3c54
chore: test with normal script instead of temporal workflow for perf
skwowet 6fc1886
refactor: separate read and write connections
skwowet c42d842
refactor: rename and update findMembersWithWrongActivityRelatin
skwowet 6564992
fix: improve fixActivityRelationsMemberId workflow with buffer manage…
skwowet 02b87bb
refactor: streamline fixActivityRelationsMemberId workflow
skwowet be59679
chore: process by identities instead of relations to speed up
skwowet 42fa067
feat: track total fixed memberIds in fixActivityRelationsMemberId script
skwowet 13a68d9
feat: add functionality to fix incorrect objectMemberIds
skwowet 534603a
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet 4a780db
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet b8b732e
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
64 changes: 64 additions & 0 deletions
64
services/apps/script_executor_worker/src/activities/fix-activityRelations-memberId.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,64 @@ | ||
| import { moveActivityRelationsWithIdentityToAnotherMember, pgpQx } from '@crowd/data-access-layer' | ||
| import { IDbActivityRelation } from '@crowd/data-access-layer/src/activityRelations/types' | ||
| import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo' | ||
|
|
||
| import { svc } from '../main' | ||
|
|
||
| export async function findMembersWithWrongActivityRelations( | ||
| platform: string, | ||
| batchSize: number, | ||
| ): Promise<Partial<IDbActivityRelation>[]> { | ||
| try { | ||
| const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log) | ||
| const records = await memberRepo.findMembersWithIncorrectActivityRelations(platform, batchSize) | ||
|
|
||
| // Deduplicate by (memberId, username, platform) | ||
| const seen = new Set<string>() | ||
| const uniqueRecords: Partial<IDbActivityRelation>[] = [] | ||
|
|
||
| for (const record of records) { | ||
| const key = `${record.memberId}:${record.username}:${record.platform}` | ||
|
|
||
| if (!seen.has(key)) { | ||
| seen.add(key) | ||
| uniqueRecords.push(record) | ||
| } | ||
| } | ||
|
|
||
| return uniqueRecords | ||
| } catch (error) { | ||
| svc.log.error(error, 'Error finding activity relations with wrong member id!') | ||
| throw error | ||
| } | ||
| } | ||
|
|
||
| export async function findMemberIdByUsernameAndPlatform( | ||
| username: string, | ||
| platform: string, | ||
| ): Promise<string> { | ||
| try { | ||
| const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log) | ||
| return memberRepo.getMemberIdByUsernameAndPlatform(username, platform) | ||
| } catch (error) { | ||
| svc.log.error( | ||
| { error, username, platform }, | ||
| 'Error getting member id by username and platform!', | ||
| ) | ||
| throw error | ||
| } | ||
| } | ||
|
|
||
| export async function moveActivityRelations( | ||
| fromId: string, | ||
| toId: string, | ||
| username: string, | ||
| platform: string, | ||
| ): Promise<void> { | ||
| try { | ||
| const qx = pgpQx(svc.postgres.writer.connection()) | ||
| await moveActivityRelationsWithIdentityToAnotherMember(qx, fromId, toId, username, platform) | ||
| } catch (error) { | ||
| svc.log.error(error, 'Error fixing activity relations!') | ||
| throw error | ||
| } | ||
| } |
262 changes: 262 additions & 0 deletions
262
services/apps/script_executor_worker/src/bin/fix-activityRelations-memberId.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,262 @@ | ||
| /* eslint-disable @typescript-eslint/no-unused-vars */ | ||
| import { moveActivityRelationsWithIdentityToAnotherMember } from '@crowd/data-access-layer' | ||
| import { | ||
| READ_DB_CONFIG, | ||
| WRITE_DB_CONFIG, | ||
| getDbConnection, | ||
| } from '@crowd/data-access-layer/src/database' | ||
| import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' | ||
| import { getServiceChildLogger } from '@crowd/logging' | ||
|
|
||
| import { chunkArray } from '../utils/common' | ||
|
|
||
| const log = getServiceChildLogger('fix-activity-relations-memberId-script') | ||
|
|
||
| interface IPostgresClient { | ||
| reader: QueryExecutor | ||
| writer: QueryExecutor | ||
| } | ||
|
|
||
| interface IMemberIdentity { | ||
| id: string | ||
| memberId: string | ||
| username: string | ||
| platform: string | ||
| } | ||
|
|
||
| async function initPostgresClient(): Promise<IPostgresClient> { | ||
| const reader = await getDbConnection(READ_DB_CONFIG()) | ||
| const writer = await getDbConnection(WRITE_DB_CONFIG()) | ||
| return { | ||
| reader: pgpQx(reader), | ||
| writer: pgpQx(writer), | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Get the total number of member verified identities. | ||
| */ | ||
| async function getTotalMemberVerifiedIdentities(qx: QueryExecutor): Promise<number> { | ||
| const result = await qx.selectOne( | ||
| ` | ||
| SELECT COUNT(*) FROM "memberIdentities" WHERE type = 'username' AND verified = true; | ||
| `, | ||
| ) | ||
|
|
||
| return Number(result?.count ?? 0) | ||
| } | ||
|
|
||
| /** | ||
| * Get batch of member identities using cursor-based pagination. | ||
| * Uses primary key index - very fast. | ||
| */ | ||
| async function getMemberIdentitiesBatch( | ||
| qx: QueryExecutor, | ||
| lastId: string | null, | ||
| batchSize: number, | ||
| ): Promise<IMemberIdentity[]> { | ||
| return qx.select( | ||
| ` | ||
| SELECT id, "memberId", value as username, platform | ||
| FROM "memberIdentities" | ||
| WHERE type = 'username' | ||
| AND verified = true | ||
| AND id > $(lastId) | ||
| ORDER BY id | ||
| LIMIT $(batchSize); | ||
| `, | ||
| { lastId: lastId || '00000000-0000-0000-0000-000000000000', batchSize }, | ||
| ) | ||
| } | ||
|
|
||
| /** | ||
| * Get the wrong memberId for this identity (to pass to moveActivityRelations). | ||
| * Uses ix_activityrelations_platform_username_memberid index - very fast. | ||
| */ | ||
| async function findIncorrectActivityRelationMemberId( | ||
| qx: QueryExecutor, | ||
| platform: string, | ||
| username: string, | ||
| correctMemberId: string, | ||
| ): Promise<string | null> { | ||
| const result = await qx.selectOneOrNone( | ||
| ` | ||
| SELECT "memberId" | ||
| FROM "activityRelations" | ||
| WHERE platform = $(platform) | ||
| AND username = $(username) | ||
| AND "memberId" != $(correctMemberId) | ||
| LIMIT 1; | ||
| `, | ||
| { platform, username, correctMemberId }, | ||
| ) | ||
| return result?.memberId || null | ||
| } | ||
|
|
||
| async function findIncorrectObjectActivityRelationMemberId( | ||
| qx: QueryExecutor, | ||
| platform: string, | ||
| objectMemberUsername: string, | ||
| correctObjectMemberId: string, | ||
| ): Promise<string | null> { | ||
| const result = await qx.selectOneOrNone( | ||
| ` | ||
| SELECT "objectMemberId" | ||
| FROM "activityRelations" | ||
| WHERE platform = $(platform) | ||
| AND "objectMemberUsername" = $(objectMemberUsername) | ||
| AND "objectMemberId" != $(correctObjectMemberId) | ||
| LIMIT 1; | ||
| `, | ||
| { platform, objectMemberUsername, correctObjectMemberId }, | ||
| ) | ||
|
|
||
| return result?.objectMemberId || null | ||
| } | ||
|
|
||
| async function moveActivityRelations( | ||
| qx: QueryExecutor, | ||
| fromId: string, | ||
| toId: string, | ||
| username: string, | ||
| platform: string, | ||
| ): Promise<void> { | ||
| await moveActivityRelationsWithIdentityToAnotherMember(qx, fromId, toId, username, platform) | ||
| } | ||
|
|
||
| async function main() { | ||
| const args = Object.fromEntries( | ||
| process.argv.slice(2).map((arg) => { | ||
| const [key, val] = arg.split('=') | ||
| return [key.replace(/^--/, ''), val ?? true] | ||
| }), | ||
| ) | ||
|
|
||
| const batchSize = Number(args['batch-size'] ?? 5000) | ||
| const testRun = Boolean(args['test-run']) | ||
| let lastId: string | null = (args['start-id'] as string | null) ?? null | ||
skwowet marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| log.info('Running script with args', { batchSize, testRun, lastId }) | ||
|
|
||
| const { reader: qxReader, writer: qxWriter } = await initPostgresClient() | ||
|
|
||
| let totalProcessed = 0 | ||
| let totalMemberIdFixes = 0 | ||
| let totalObjectMemberIdFixes = 0 | ||
|
|
||
| const totalMemberVerifiedIdentities = await getTotalMemberVerifiedIdentities(qxReader) | ||
|
|
||
| let identities = await getMemberIdentitiesBatch(qxReader, lastId, batchSize) | ||
|
|
||
| while (identities.length > 0) { | ||
| for (const chunk of chunkArray(identities, 50)) { | ||
| const tasks = chunk.map(async (identity) => { | ||
| let memberIdFixCount = 0 | ||
| let objectMemberIdFixCount = 0 | ||
|
|
||
| // Fix all wrong memberIds for this identity | ||
| let wrongMemberId = await findIncorrectActivityRelationMemberId( | ||
| qxReader, | ||
| identity.platform, | ||
| identity.username, | ||
| identity.memberId, | ||
| ) | ||
|
|
||
| while (wrongMemberId) { | ||
| log.info('Moving activity relations (memberId)', { | ||
| fromId: wrongMemberId, | ||
| toId: identity.memberId, | ||
| username: identity.username, | ||
| platform: identity.platform, | ||
| }) | ||
|
|
||
| await moveActivityRelations( | ||
| qxWriter, | ||
| wrongMemberId, | ||
| identity.memberId, | ||
| identity.username, | ||
| identity.platform, | ||
| ) | ||
|
|
||
| memberIdFixCount++ | ||
|
|
||
| // Check again for more wrong memberIds | ||
| wrongMemberId = await findIncorrectActivityRelationMemberId( | ||
| qxReader, | ||
| identity.platform, | ||
| identity.username, | ||
| identity.memberId, | ||
| ) | ||
| } | ||
cursor[bot] marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| // Fix all wrong objectMemberIds for this identity | ||
| let wrongObjectMemberId = await findIncorrectObjectActivityRelationMemberId( | ||
| qxReader, | ||
| identity.platform, | ||
| identity.username, | ||
| identity.memberId, | ||
| ) | ||
|
|
||
| while (wrongObjectMemberId) { | ||
| log.info('Moving activity relations (objectMemberId)', { | ||
| fromId: wrongObjectMemberId, | ||
| toId: identity.memberId, | ||
| username: identity.username, | ||
| platform: identity.platform, | ||
| }) | ||
|
|
||
| await moveActivityRelations( | ||
| qxWriter, | ||
| wrongObjectMemberId, | ||
| identity.memberId, | ||
| identity.username, | ||
| identity.platform, | ||
| ) | ||
|
|
||
| objectMemberIdFixCount++ | ||
|
|
||
| // Check again for more wrong objectMemberIds | ||
| wrongObjectMemberId = await findIncorrectObjectActivityRelationMemberId( | ||
| qxReader, | ||
| identity.platform, | ||
| identity.username, | ||
| identity.memberId, | ||
| ) | ||
| } | ||
|
|
||
| return { | ||
| memberIdFixes: memberIdFixCount, | ||
| objectMemberIdFixes: objectMemberIdFixCount, | ||
| } | ||
| }) | ||
|
|
||
| const results = await Promise.all(tasks) | ||
| for (const result of results) { | ||
| totalMemberIdFixes += result.memberIdFixes | ||
| totalObjectMemberIdFixes += result.objectMemberIdFixes | ||
| } | ||
| } | ||
|
|
||
| totalProcessed += identities.length | ||
| lastId = identities[identities.length - 1].id | ||
|
|
||
| log.info(`Progress`, { | ||
| progress: ((totalProcessed / totalMemberVerifiedIdentities) * 100).toFixed(2) + '%', | ||
| lastIdentityId: lastId, | ||
| }) | ||
|
|
||
| if (testRun) { | ||
| log.info('Test run - stopping after first batch') | ||
| break | ||
| } | ||
|
|
||
| identities = await getMemberIdentitiesBatch(qxReader, lastId, batchSize) | ||
| } | ||
|
|
||
| log.info('Script completed', { totalMemberIdFixes, totalObjectMemberIdFixes }) | ||
| } | ||
|
|
||
| main().catch((error) => { | ||
| log.error('Unexpected error:', error) | ||
| process.exit(1) | ||
| }) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.