diff --git a/services/apps/script_executor_worker/package.json b/services/apps/script_executor_worker/package.json index c020c48e42..fec64ac062 100644 --- a/services/apps/script_executor_worker/package.json +++ b/services/apps/script_executor_worker/package.json @@ -7,6 +7,7 @@ "dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local", "dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug", "cleanup-fork-activities": "npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts", + "fix-activity-relations-memberId": "npx tsx src/bin/fix-activityRelations-memberId.ts", "lint": "npx eslint --ext .ts src --max-warnings=0", "format": "npx prettier --write \"src/**/*.ts\"", "format-check": "npx prettier --check .", diff --git a/services/apps/script_executor_worker/src/activities.ts b/services/apps/script_executor_worker/src/activities.ts index f0b9f96810..f43ebb75e7 100644 --- a/services/apps/script_executor_worker/src/activities.ts +++ b/services/apps/script_executor_worker/src/activities.ts @@ -27,6 +27,11 @@ import { findMemberIdentitiesGroupedByPlatform, findMemberMergeActions, } from './activities/dissect-member' +import { + findMemberIdByUsernameAndPlatform, + findMembersWithWrongActivityRelations, + moveActivityRelations, +} from './activities/fix-activityRelations-memberId' import { getBotMembersWithOrgAffiliation, removeBotMemberOrganization, @@ -86,4 +91,7 @@ export { blockMemberOrganizationAffiliation, getOrganizationMembers, calculateMemberAffiliations, + findMembersWithWrongActivityRelations, + findMemberIdByUsernameAndPlatform, + moveActivityRelations, } diff --git a/services/apps/script_executor_worker/src/activities/fix-activityRelations-memberId.ts b/services/apps/script_executor_worker/src/activities/fix-activityRelations-memberId.ts new file mode 100644 index 0000000000..c8aa87fcc2 --- /dev/null +++ b/services/apps/script_executor_worker/src/activities/fix-activityRelations-memberId.ts @@ -0,0 +1,64 @@ +import { moveActivityRelationsWithIdentityToAnotherMember, pgpQx } from '@crowd/data-access-layer' +import { IDbActivityRelation } from '@crowd/data-access-layer/src/activityRelations/types' +import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo' + +import { svc } from '../main' + +export async function findMembersWithWrongActivityRelations( + platform: string, + batchSize: number, +): Promise[]> { + try { + const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log) + const records = await memberRepo.findMembersWithIncorrectActivityRelations(platform, batchSize) + + // Deduplicate by (memberId, username, platform) + const seen = new Set() + const uniqueRecords: Partial[] = [] + + for (const record of records) { + const key = `${record.memberId}:${record.username}:${record.platform}` + + if (!seen.has(key)) { + seen.add(key) + uniqueRecords.push(record) + } + } + + return uniqueRecords + } catch (error) { + svc.log.error(error, 'Error finding activity relations with wrong member id!') + throw error + } +} + +export async function findMemberIdByUsernameAndPlatform( + username: string, + platform: string, +): Promise { + try { + const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log) + return memberRepo.getMemberIdByUsernameAndPlatform(username, platform) + } catch (error) { + svc.log.error( + { error, username, platform }, + 'Error getting member id by username and platform!', + ) + throw error + } +} + +export async function moveActivityRelations( + fromId: string, + toId: string, + username: string, + platform: string, +): Promise { + try { + const qx = pgpQx(svc.postgres.writer.connection()) + await moveActivityRelationsWithIdentityToAnotherMember(qx, fromId, toId, username, platform) + } catch (error) { + svc.log.error(error, 'Error fixing activity relations!') + throw error + } +} diff --git a/services/apps/script_executor_worker/src/bin/fix-activityRelations-memberId.ts b/services/apps/script_executor_worker/src/bin/fix-activityRelations-memberId.ts new file mode 100644 index 0000000000..d3e111ca60 --- /dev/null +++ b/services/apps/script_executor_worker/src/bin/fix-activityRelations-memberId.ts @@ -0,0 +1,262 @@ +/* eslint-disable @typescript-eslint/no-unused-vars */ +import { moveActivityRelationsWithIdentityToAnotherMember } from '@crowd/data-access-layer' +import { + READ_DB_CONFIG, + WRITE_DB_CONFIG, + getDbConnection, +} from '@crowd/data-access-layer/src/database' +import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor' +import { getServiceChildLogger } from '@crowd/logging' + +import { chunkArray } from '../utils/common' + +const log = getServiceChildLogger('fix-activity-relations-memberId-script') + +interface IPostgresClient { + reader: QueryExecutor + writer: QueryExecutor +} + +interface IMemberIdentity { + id: string + memberId: string + username: string + platform: string +} + +async function initPostgresClient(): Promise { + const reader = await getDbConnection(READ_DB_CONFIG()) + const writer = await getDbConnection(WRITE_DB_CONFIG()) + return { + reader: pgpQx(reader), + writer: pgpQx(writer), + } +} + +/** + * Get the total number of member verified identities. + */ +async function getTotalMemberVerifiedIdentities(qx: QueryExecutor): Promise { + const result = await qx.selectOne( + ` + SELECT COUNT(*) FROM "memberIdentities" WHERE type = 'username' AND verified = true; + `, + ) + + return Number(result?.count ?? 0) +} + +/** + * Get batch of member identities using cursor-based pagination. + * Uses primary key index - very fast. + */ +async function getMemberIdentitiesBatch( + qx: QueryExecutor, + lastId: string | null, + batchSize: number, +): Promise { + return qx.select( + ` + SELECT id, "memberId", value as username, platform + FROM "memberIdentities" + WHERE type = 'username' + AND verified = true + AND id > $(lastId) + ORDER BY id + LIMIT $(batchSize); + `, + { lastId: lastId || '00000000-0000-0000-0000-000000000000', batchSize }, + ) +} + +/** + * Get the wrong memberId for this identity (to pass to moveActivityRelations). + * Uses ix_activityrelations_platform_username_memberid index - very fast. + */ +async function findIncorrectActivityRelationMemberId( + qx: QueryExecutor, + platform: string, + username: string, + correctMemberId: string, +): Promise { + const result = await qx.selectOneOrNone( + ` + SELECT "memberId" + FROM "activityRelations" + WHERE platform = $(platform) + AND username = $(username) + AND "memberId" != $(correctMemberId) + LIMIT 1; + `, + { platform, username, correctMemberId }, + ) + return result?.memberId || null +} + +async function findIncorrectObjectActivityRelationMemberId( + qx: QueryExecutor, + platform: string, + objectMemberUsername: string, + correctObjectMemberId: string, +): Promise { + const result = await qx.selectOneOrNone( + ` + SELECT "objectMemberId" + FROM "activityRelations" + WHERE platform = $(platform) + AND "objectMemberUsername" = $(objectMemberUsername) + AND "objectMemberId" != $(correctObjectMemberId) + LIMIT 1; + `, + { platform, objectMemberUsername, correctObjectMemberId }, + ) + + return result?.objectMemberId || null +} + +async function moveActivityRelations( + qx: QueryExecutor, + fromId: string, + toId: string, + username: string, + platform: string, +): Promise { + await moveActivityRelationsWithIdentityToAnotherMember(qx, fromId, toId, username, platform) +} + +async function main() { + const args = Object.fromEntries( + process.argv.slice(2).map((arg) => { + const [key, val] = arg.split('=') + return [key.replace(/^--/, ''), val ?? true] + }), + ) + + const batchSize = Number(args['batch-size'] ?? 5000) + const testRun = Boolean(args['test-run']) + let lastId: string | null = (args['start-id'] as string | null) ?? null + + log.info('Running script with args', { batchSize, testRun, lastId }) + + const { reader: qxReader, writer: qxWriter } = await initPostgresClient() + + let totalProcessed = 0 + let totalMemberIdFixes = 0 + let totalObjectMemberIdFixes = 0 + + const totalMemberVerifiedIdentities = await getTotalMemberVerifiedIdentities(qxReader) + + let identities = await getMemberIdentitiesBatch(qxReader, lastId, batchSize) + + while (identities.length > 0) { + for (const chunk of chunkArray(identities, 50)) { + const tasks = chunk.map(async (identity) => { + let memberIdFixCount = 0 + let objectMemberIdFixCount = 0 + + // Fix all wrong memberIds for this identity + let wrongMemberId = await findIncorrectActivityRelationMemberId( + qxReader, + identity.platform, + identity.username, + identity.memberId, + ) + + while (wrongMemberId) { + log.info('Moving activity relations (memberId)', { + fromId: wrongMemberId, + toId: identity.memberId, + username: identity.username, + platform: identity.platform, + }) + + await moveActivityRelations( + qxWriter, + wrongMemberId, + identity.memberId, + identity.username, + identity.platform, + ) + + memberIdFixCount++ + + // Check again for more wrong memberIds + wrongMemberId = await findIncorrectActivityRelationMemberId( + qxReader, + identity.platform, + identity.username, + identity.memberId, + ) + } + + // Fix all wrong objectMemberIds for this identity + let wrongObjectMemberId = await findIncorrectObjectActivityRelationMemberId( + qxReader, + identity.platform, + identity.username, + identity.memberId, + ) + + while (wrongObjectMemberId) { + log.info('Moving activity relations (objectMemberId)', { + fromId: wrongObjectMemberId, + toId: identity.memberId, + username: identity.username, + platform: identity.platform, + }) + + await moveActivityRelations( + qxWriter, + wrongObjectMemberId, + identity.memberId, + identity.username, + identity.platform, + ) + + objectMemberIdFixCount++ + + // Check again for more wrong objectMemberIds + wrongObjectMemberId = await findIncorrectObjectActivityRelationMemberId( + qxReader, + identity.platform, + identity.username, + identity.memberId, + ) + } + + return { + memberIdFixes: memberIdFixCount, + objectMemberIdFixes: objectMemberIdFixCount, + } + }) + + const results = await Promise.all(tasks) + for (const result of results) { + totalMemberIdFixes += result.memberIdFixes + totalObjectMemberIdFixes += result.objectMemberIdFixes + } + } + + totalProcessed += identities.length + lastId = identities[identities.length - 1].id + + log.info(`Progress`, { + progress: ((totalProcessed / totalMemberVerifiedIdentities) * 100).toFixed(2) + '%', + lastIdentityId: lastId, + }) + + if (testRun) { + log.info('Test run - stopping after first batch') + break + } + + identities = await getMemberIdentitiesBatch(qxReader, lastId, batchSize) + } + + log.info('Script completed', { totalMemberIdFixes, totalObjectMemberIdFixes }) +} + +main().catch((error) => { + log.error('Unexpected error:', error) + process.exit(1) +}) diff --git a/services/apps/script_executor_worker/src/types.ts b/services/apps/script_executor_worker/src/types.ts index 8c94682e0c..0783b685a3 100644 --- a/services/apps/script_executor_worker/src/types.ts +++ b/services/apps/script_executor_worker/src/types.ts @@ -61,3 +61,7 @@ export interface IBlockOrganizationAffiliationArgs { organizationId: string offset?: number } + +export interface IFixActivityRelationsMemberIdArgs extends IScriptBatchTestArgs { + currentPlatformIndex?: number +} diff --git a/services/apps/script_executor_worker/src/workflows.ts b/services/apps/script_executor_worker/src/workflows.ts index 5a1cd5468b..02da6cb65d 100644 --- a/services/apps/script_executor_worker/src/workflows.ts +++ b/services/apps/script_executor_worker/src/workflows.ts @@ -5,6 +5,7 @@ import { cleanupOrganizations } from './workflows/cleanup/organizations' import { dissectMember } from './workflows/dissectMember' import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization' import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms' +import { fixActivityRelationsMemberId } from './workflows/fix-activityRelations-memberId' import { fixBotMembersAffiliation } from './workflows/fix-bot-members-affiliation' import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls' import { processLLMVerifiedMerges } from './workflows/processLLMVerifiedMerges' @@ -24,4 +25,5 @@ export { cleanupDuplicateMembers, fixBotMembersAffiliation, blockOrganizationAffiliation, + fixActivityRelationsMemberId, } diff --git a/services/apps/script_executor_worker/src/workflows/fix-activityRelations-memberId.ts b/services/apps/script_executor_worker/src/workflows/fix-activityRelations-memberId.ts new file mode 100644 index 0000000000..95e851259a --- /dev/null +++ b/services/apps/script_executor_worker/src/workflows/fix-activityRelations-memberId.ts @@ -0,0 +1,78 @@ +import { continueAsNew, proxyActivities } from '@temporalio/workflow' + +import { PlatformType } from '@crowd/types' + +import * as activities from '../activities' +import { IFixActivityRelationsMemberIdArgs } from '../types' + +const { + findMembersWithWrongActivityRelations, + findMemberIdByUsernameAndPlatform, + moveActivityRelations, +} = proxyActivities({ + startToCloseTimeout: '30 minutes', + retry: { maximumAttempts: 3, backoffCoefficient: 3 }, +}) + +export async function fixActivityRelationsMemberId( + args: IFixActivityRelationsMemberIdArgs, +): Promise { + const BATCH_SIZE = args.batchSize ?? 500 + + const platforms = Object.values(PlatformType) + let currentPlatformIndex = args.currentPlatformIndex ?? 0 + + if (currentPlatformIndex >= platforms.length) { + console.log('All platforms exhausted. Workflow complete!') + return + } + + const platform = platforms[currentPlatformIndex] + + if (args.testRun) console.log('Processing platform:', platform) + + // Take a batch from current platform + const records = await findMembersWithWrongActivityRelations(platform, BATCH_SIZE) + + if (records.length === 0) { + console.log(`Platform ${platform} has no more records. Moving to next platform.`) + currentPlatformIndex++ + } else { + // Process the batch immediately + await Promise.all( + records.map(async (record) => { + const correctMemberId = await findMemberIdByUsernameAndPlatform( + record.username, + record.platform, + ) + + if (args.testRun) { + console.log('Moving activity relations!', { + fromId: record.memberId, + toId: correctMemberId, + username: record.username, + platform: record.platform, + }) + } + + await moveActivityRelations( + record.memberId, + correctMemberId, + record.username, + record.platform, + ) + }), + ) + } + + if (args.testRun) { + console.log('Test run completed - stopping after first batch!') + return + } + + // Continue workflow for next batch (same platform or next platform) + await continueAsNew({ + ...args, + currentPlatformIndex, + }) +} diff --git a/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts b/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts index adc4300b8d..f14d1a3332 100644 --- a/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts +++ b/services/libs/data-access-layer/src/old/apps/script_executor_worker/member.repo.ts @@ -3,6 +3,8 @@ import { Logger } from '@crowd/logging' import { IndexedEntityType } from '@crowd/opensearch/src/repo/indexing.data' import { IMember } from '@crowd/types' +import { IDbActivityRelation } from '../../../activityRelations/types' + import { IDuplicateMembersToCleanup, IFindMemberIdentitiesGroupedByPlatformResult, @@ -382,6 +384,53 @@ class MemberRepository { return results.map((r) => r.memberId) } + + public async findMembersWithIncorrectActivityRelations( + platform: string, + batchSize: number, + ): Promise[]> { + return this.connection.query( + ` + SELECT ar."memberId", ar."username", ar."platform" + FROM "activityRelations" ar + WHERE ar.platform = $(platform) + AND EXISTS ( + SELECT 1 + FROM "memberIdentities" mi + WHERE mi.platform = ar.platform + AND mi.value = ar.username + AND mi.type = 'username' + AND mi.verified = true + AND ar."memberId" != mi."memberId" + ) + LIMIT $(batchSize); + `, + { platform, batchSize }, + ) + } + + public async getMemberIdByUsernameAndPlatform( + username: string, + platform: string, + ): Promise { + const result = await this.connection.oneOrNone( + ` + SELECT "memberId" as "memberId" + FROM "memberIdentities" + WHERE value = $(username) + AND platform = $(platform) + AND verified = TRUE + AND type = 'username' + `, + { username, platform }, + ) + + if (!result) { + throw new Error(`No verified member found!`) + } + + return result.memberId + } } export default MemberRepository