Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
4434ffb
chore: add script to fix activityRelations with wrong memberId
skwowet Dec 10, 2025
0b58663
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet Dec 10, 2025
f3a0d75
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet Dec 10, 2025
ef00b0b
chore(deps): bump js-yaml from 4.1.0 to 4.1.1 (#3635)
dependabot[bot] Dec 11, 2025
9f15ecb
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet Dec 11, 2025
76208af
fix: improve error logging and conditional console output in activity…
skwowet Dec 11, 2025
c494da2
fix: dedup activity relations by memberId, username, and platform in …
skwowet Dec 11, 2025
c75e2fb
refactor: rename func to trigger temporal rebuild
skwowet Dec 11, 2025
99f3c54
chore: test with normal script instead of temporal workflow for perf
skwowet Dec 11, 2025
6fc1886
refactor: separate read and write connections
skwowet Dec 12, 2025
c42d842
refactor: rename and update findMembersWithWrongActivityRelatin
skwowet Dec 12, 2025
6564992
fix: improve fixActivityRelationsMemberId workflow with buffer manage…
skwowet Dec 12, 2025
02b87bb
refactor: streamline fixActivityRelationsMemberId workflow
skwowet Dec 12, 2025
be59679
chore: process by identities instead of relations to speed up
skwowet Dec 12, 2025
42fa067
feat: track total fixed memberIds in fixActivityRelationsMemberId script
skwowet Dec 12, 2025
13a68d9
feat: add functionality to fix incorrect objectMemberIds
skwowet Dec 14, 2025
534603a
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet Dec 15, 2025
4a780db
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet Dec 17, 2025
b8b732e
Merge branch 'main' into script/fix-activityRelations-memberId
skwowet Dec 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/apps/script_executor_worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"dev:local": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug:local",
"dev": "nodemon --watch src --watch ../../libs --ext ts --exec pnpm run start:debug",
"cleanup-fork-activities": "npx tsx src/bin/cleanup-fork-activities-and-maintainers.ts",
"fix-activity-relations-memberId": "npx tsx src/bin/fix-activityRelations-memberId.ts",
"lint": "npx eslint --ext .ts src --max-warnings=0",
"format": "npx prettier --write \"src/**/*.ts\"",
"format-check": "npx prettier --check .",
Expand Down
8 changes: 8 additions & 0 deletions services/apps/script_executor_worker/src/activities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ import {
findMemberIdentitiesGroupedByPlatform,
findMemberMergeActions,
} from './activities/dissect-member'
import {
findMemberIdByUsernameAndPlatform,
findMembersWithWrongActivityRelations,
moveActivityRelations,
} from './activities/fix-activityRelations-memberId'
import {
getBotMembersWithOrgAffiliation,
removeBotMemberOrganization,
Expand Down Expand Up @@ -86,4 +91,7 @@ export {
blockMemberOrganizationAffiliation,
getOrganizationMembers,
calculateMemberAffiliations,
findMembersWithWrongActivityRelations,
findMemberIdByUsernameAndPlatform,
moveActivityRelations,
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { moveActivityRelationsWithIdentityToAnotherMember, pgpQx } from '@crowd/data-access-layer'
import { IDbActivityRelation } from '@crowd/data-access-layer/src/activityRelations/types'
import MemberRepository from '@crowd/data-access-layer/src/old/apps/script_executor_worker/member.repo'

import { svc } from '../main'

export async function findMembersWithWrongActivityRelations(
platform: string,
batchSize: number,
): Promise<Partial<IDbActivityRelation>[]> {
try {
const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log)
const records = await memberRepo.findMembersWithIncorrectActivityRelations(platform, batchSize)

// Deduplicate by (memberId, username, platform)
const seen = new Set<string>()
const uniqueRecords: Partial<IDbActivityRelation>[] = []

for (const record of records) {
const key = `${record.memberId}:${record.username}:${record.platform}`

if (!seen.has(key)) {
seen.add(key)
uniqueRecords.push(record)
}
}

return uniqueRecords
} catch (error) {
svc.log.error(error, 'Error finding activity relations with wrong member id!')
throw error
}
}

export async function findMemberIdByUsernameAndPlatform(
username: string,
platform: string,
): Promise<string> {
try {
const memberRepo = new MemberRepository(svc.postgres.reader.connection(), svc.log)
return memberRepo.getMemberIdByUsernameAndPlatform(username, platform)
} catch (error) {
svc.log.error(
{ error, username, platform },
'Error getting member id by username and platform!',
)
throw error
}
}

export async function moveActivityRelations(
fromId: string,
toId: string,
username: string,
platform: string,
): Promise<void> {
try {
const qx = pgpQx(svc.postgres.writer.connection())
await moveActivityRelationsWithIdentityToAnotherMember(qx, fromId, toId, username, platform)
} catch (error) {
svc.log.error(error, 'Error fixing activity relations!')
throw error
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,262 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
import { moveActivityRelationsWithIdentityToAnotherMember } from '@crowd/data-access-layer'
import {
READ_DB_CONFIG,
WRITE_DB_CONFIG,
getDbConnection,
} from '@crowd/data-access-layer/src/database'
import { QueryExecutor, pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
import { getServiceChildLogger } from '@crowd/logging'

import { chunkArray } from '../utils/common'

const log = getServiceChildLogger('fix-activity-relations-memberId-script')

interface IPostgresClient {
reader: QueryExecutor
writer: QueryExecutor
}

interface IMemberIdentity {
id: string
memberId: string
username: string
platform: string
}

async function initPostgresClient(): Promise<IPostgresClient> {
const reader = await getDbConnection(READ_DB_CONFIG())
const writer = await getDbConnection(WRITE_DB_CONFIG())
return {
reader: pgpQx(reader),
writer: pgpQx(writer),
}
}

/**
* Get the total number of member verified identities.
*/
async function getTotalMemberVerifiedIdentities(qx: QueryExecutor): Promise<number> {
const result = await qx.selectOne(
`
SELECT COUNT(*) FROM "memberIdentities" WHERE type = 'username' AND verified = true;
`,
)

return Number(result?.count ?? 0)
}

/**
* Get batch of member identities using cursor-based pagination.
* Uses primary key index - very fast.
*/
async function getMemberIdentitiesBatch(
qx: QueryExecutor,
lastId: string | null,
batchSize: number,
): Promise<IMemberIdentity[]> {
return qx.select(
`
SELECT id, "memberId", value as username, platform
FROM "memberIdentities"
WHERE type = 'username'
AND verified = true
AND id > $(lastId)
ORDER BY id
LIMIT $(batchSize);
`,
{ lastId: lastId || '00000000-0000-0000-0000-000000000000', batchSize },
)
}

/**
* Get the wrong memberId for this identity (to pass to moveActivityRelations).
* Uses ix_activityrelations_platform_username_memberid index - very fast.
*/
async function findIncorrectActivityRelationMemberId(
qx: QueryExecutor,
platform: string,
username: string,
correctMemberId: string,
): Promise<string | null> {
const result = await qx.selectOneOrNone(
`
SELECT "memberId"
FROM "activityRelations"
WHERE platform = $(platform)
AND username = $(username)
AND "memberId" != $(correctMemberId)
LIMIT 1;
`,
{ platform, username, correctMemberId },
)
return result?.memberId || null
}

async function findIncorrectObjectActivityRelationMemberId(
qx: QueryExecutor,
platform: string,
objectMemberUsername: string,
correctObjectMemberId: string,
): Promise<string | null> {
const result = await qx.selectOneOrNone(
`
SELECT "objectMemberId"
FROM "activityRelations"
WHERE platform = $(platform)
AND "objectMemberUsername" = $(objectMemberUsername)
AND "objectMemberId" != $(correctObjectMemberId)
LIMIT 1;
`,
{ platform, objectMemberUsername, correctObjectMemberId },
)

return result?.objectMemberId || null
}

async function moveActivityRelations(
qx: QueryExecutor,
fromId: string,
toId: string,
username: string,
platform: string,
): Promise<void> {
await moveActivityRelationsWithIdentityToAnotherMember(qx, fromId, toId, username, platform)
}

async function main() {
const args = Object.fromEntries(
process.argv.slice(2).map((arg) => {
const [key, val] = arg.split('=')
return [key.replace(/^--/, ''), val ?? true]
}),
)

const batchSize = Number(args['batch-size'] ?? 5000)
const testRun = Boolean(args['test-run'])
let lastId: string | null = (args['start-id'] as string | null) ?? null

log.info('Running script with args', { batchSize, testRun, lastId })

const { reader: qxReader, writer: qxWriter } = await initPostgresClient()

let totalProcessed = 0
let totalMemberIdFixes = 0
let totalObjectMemberIdFixes = 0

const totalMemberVerifiedIdentities = await getTotalMemberVerifiedIdentities(qxReader)

let identities = await getMemberIdentitiesBatch(qxReader, lastId, batchSize)

while (identities.length > 0) {
for (const chunk of chunkArray(identities, 50)) {
const tasks = chunk.map(async (identity) => {
let memberIdFixCount = 0
let objectMemberIdFixCount = 0

// Fix all wrong memberIds for this identity
let wrongMemberId = await findIncorrectActivityRelationMemberId(
qxReader,
identity.platform,
identity.username,
identity.memberId,
)

while (wrongMemberId) {
log.info('Moving activity relations (memberId)', {
fromId: wrongMemberId,
toId: identity.memberId,
username: identity.username,
platform: identity.platform,
})

await moveActivityRelations(
qxWriter,
wrongMemberId,
identity.memberId,
identity.username,
identity.platform,
)

memberIdFixCount++

// Check again for more wrong memberIds
wrongMemberId = await findIncorrectActivityRelationMemberId(
qxReader,
identity.platform,
identity.username,
identity.memberId,
)
}

// Fix all wrong objectMemberIds for this identity
let wrongObjectMemberId = await findIncorrectObjectActivityRelationMemberId(
qxReader,
identity.platform,
identity.username,
identity.memberId,
)

while (wrongObjectMemberId) {
log.info('Moving activity relations (objectMemberId)', {
fromId: wrongObjectMemberId,
toId: identity.memberId,
username: identity.username,
platform: identity.platform,
})

await moveActivityRelations(
qxWriter,
wrongObjectMemberId,
identity.memberId,
identity.username,
identity.platform,
)

objectMemberIdFixCount++

// Check again for more wrong objectMemberIds
wrongObjectMemberId = await findIncorrectObjectActivityRelationMemberId(
qxReader,
identity.platform,
identity.username,
identity.memberId,
)
}

return {
memberIdFixes: memberIdFixCount,
objectMemberIdFixes: objectMemberIdFixCount,
}
})

const results = await Promise.all(tasks)
for (const result of results) {
totalMemberIdFixes += result.memberIdFixes
totalObjectMemberIdFixes += result.objectMemberIdFixes
}
}

totalProcessed += identities.length
lastId = identities[identities.length - 1].id

log.info(`Progress`, {
progress: ((totalProcessed / totalMemberVerifiedIdentities) * 100).toFixed(2) + '%',
lastIdentityId: lastId,
})

if (testRun) {
log.info('Test run - stopping after first batch')
break
}

identities = await getMemberIdentitiesBatch(qxReader, lastId, batchSize)
}

log.info('Script completed', { totalMemberIdFixes, totalObjectMemberIdFixes })
}

main().catch((error) => {
log.error('Unexpected error:', error)
process.exit(1)
})
4 changes: 4 additions & 0 deletions services/apps/script_executor_worker/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,7 @@ export interface IBlockOrganizationAffiliationArgs {
organizationId: string
offset?: number
}

export interface IFixActivityRelationsMemberIdArgs extends IScriptBatchTestArgs {
currentPlatformIndex?: number
}
2 changes: 2 additions & 0 deletions services/apps/script_executor_worker/src/workflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { cleanupOrganizations } from './workflows/cleanup/organizations'
import { dissectMember } from './workflows/dissectMember'
import { findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization } from './workflows/findAndMergeMembersWithSamePlatformIdentitiesDifferentCapitalization'
import { findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms } from './workflows/findAndMergeMembersWithSameVerifiedEmailsInDifferentPlatforms'
import { fixActivityRelationsMemberId } from './workflows/fix-activityRelations-memberId'
import { fixBotMembersAffiliation } from './workflows/fix-bot-members-affiliation'
import { fixOrgIdentitiesWithWrongUrls } from './workflows/fixOrgIdentitiesWithWrongUrls'
import { processLLMVerifiedMerges } from './workflows/processLLMVerifiedMerges'
Expand All @@ -24,4 +25,5 @@ export {
cleanupDuplicateMembers,
fixBotMembersAffiliation,
blockOrganizationAffiliation,
fixActivityRelationsMemberId,
}
Loading