11import { continueAsNew , proxyActivities } from '@temporalio/workflow'
22
3+ import { PlatformType } from '@crowd/types'
4+
35import * as activities from '../activities'
46import { IFixActivityRelationsMemberIdArgs } from '../types'
5- import { chunkArray } from '../utils/common'
67
78const {
89 findMembersWithWrongActivityRelations,
@@ -17,21 +18,43 @@ export async function fixActivityRelationsMemberId(
1718 args : IFixActivityRelationsMemberIdArgs ,
1819) : Promise < void > {
1920 const BATCH_SIZE = args . batchSize ?? 500
20- const platform = args . platform
21+ const BUFFER_TARGET = args . bufferTargetSize ?? 50
22+
23+ // Always use all platforms every run
24+ const platforms = Object . values ( PlatformType )
25+ let currentPlatformIndex = args . currentPlatformIndex ?? 0
26+
27+ const recordsBuffer = [ ]
28+
29+ while ( recordsBuffer . length < BUFFER_TARGET && currentPlatformIndex < platforms . length ) {
30+ const platform = platforms [ currentPlatformIndex ]
31+
32+ if ( args . testRun ) {
33+ console . log ( 'Current platform:' , platform )
34+ }
35+
36+ const records = await findMembersWithWrongActivityRelations ( platform , BATCH_SIZE )
2137
22- // get wrong memberId, username, platform from activity relations
23- const records = await findMembersWithWrongActivityRelations ( platform , BATCH_SIZE )
38+ if ( records . length === 0 ) {
39+ console . log ( `Platform ${ platform } returned 0 results. Skipping.` )
40+ currentPlatformIndex ++ // Move to next platform
41+ continue
42+ }
2443
25- if ( records . length === 0 ) {
44+ // Fill buffer from this platform
45+ const slotsLeft = BUFFER_TARGET - recordsBuffer . length
46+ recordsBuffer . push ( ...records . slice ( 0 , slotsLeft ) )
47+ }
48+
49+ // No more platforms and empty buffer → stop workflow
50+ if ( recordsBuffer . length === 0 ) {
2651 console . log ( 'No more activity relations to fix!' )
2752 return
2853 }
2954
30- for ( const chunk of chunkArray ( records , 50 ) ) {
31- if ( args . testRun ) console . log ( 'Processing chunk' , chunk )
32-
33- const tasks = chunk . map ( async ( record ) => {
34- // find the correct member id by username and platform
55+ // Process the batch (even if < BUFFER_TARGET)
56+ await Promise . all (
57+ recordsBuffer . map ( async ( record ) => {
3558 const correctMemberId = await findMemberIdByUsernameAndPlatform (
3659 record . username ,
3760 record . platform ,
@@ -46,24 +69,23 @@ export async function fixActivityRelationsMemberId(
4669 } )
4770 }
4871
49- // move activity relations to the correct member
5072 await moveActivityRelations (
5173 record . memberId ,
5274 correctMemberId ,
5375 record . username ,
5476 record . platform ,
5577 )
56- } )
57-
58- // parallel process the updates
59- await Promise . all ( tasks )
60- }
78+ } ) ,
79+ )
6180
6281 if ( args . testRun ) {
6382 console . log ( 'Test run completed - stopping after first batch!' )
6483 return
6584 }
6685
67- // Continue as new for the next batch
68- await continueAsNew < typeof fixActivityRelationsMemberId > ( args )
86+ // Continue workflow at the next platform
87+ await continueAsNew < typeof fixActivityRelationsMemberId > ( {
88+ ...args ,
89+ currentPlatformIndex,
90+ } )
6991}
0 commit comments