Skip to content

Commit 3efbbbe

Browse files
authored
fix: handle organizationUpdate duplicate start during unmerge (CM-852) (#3700)
1 parent bcfb5b4 commit 3efbbbe

File tree

4 files changed

+54
-28
lines changed

4 files changed

+54
-28
lines changed

services/apps/entity_merging_worker/src/activities.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export {
1515
notifyFrontendOrganizationMergeSuccessful,
1616
notifyFrontendOrganizationUnmergeSuccessful,
1717
syncOrganization,
18-
recalculateActivityAffiliationsOfOrganizationSynchronous,
18+
recalculateActivityAffiliationsOfOrganizationAsync,
1919
} from './activities/organizations'
2020

2121
export { setMergeAction } from './activities/common'

services/apps/entity_merging_worker/src/activities/members.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { WorkflowIdReusePolicy } from '@temporalio/workflow'
1+
import { WorkflowIdConflictPolicy, WorkflowIdReusePolicy } from '@temporalio/workflow'
22

33
import { DEFAULT_TENANT_ID } from '@crowd/common'
44
import {
@@ -47,8 +47,8 @@ export async function recalculateActivityAffiliationsOfMemberAsync(
4747
await svc.temporal.workflow.start('memberUpdate', {
4848
taskQueue: 'profiles',
4949
workflowId,
50-
// if the workflow is already running, this policy will throw an error
51-
workflowIdReusePolicy: WorkflowIdReusePolicy.REJECT_DUPLICATE,
50+
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE,
51+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.FAIL,
5252
retry: {
5353
maximumAttempts: 10,
5454
},

services/apps/entity_merging_worker/src/activities/organizations.ts

Lines changed: 47 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import { WorkflowIdConflictPolicy, WorkflowIdReusePolicy } from '@temporalio/workflow'
2+
13
import { DEFAULT_TENANT_ID } from '@crowd/common'
24
import { moveActivityRelationsToAnotherOrganization } from '@crowd/data-access-layer/src/activityRelations'
35
import {
@@ -35,31 +37,55 @@ export async function finishOrganizationMergingUpdateActivities(
3537
await moveActivityRelationsToAnotherOrganization(qx, secondaryId, primaryId)
3638
}
3739

38-
export async function recalculateActivityAffiliationsOfOrganizationSynchronous(
40+
export async function recalculateActivityAffiliationsOfOrganizationAsync(
3941
organizationId: string,
4042
): Promise<void> {
41-
await svc.temporal.workflow.start('organizationUpdate', {
42-
taskQueue: 'profiles',
43-
workflowId: `${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}`,
44-
followRuns: true,
45-
retry: {
46-
maximumAttempts: 10,
47-
},
48-
args: [
49-
{
50-
organization: {
51-
id: organizationId,
52-
},
53-
recalculateAffiliations: true,
54-
syncOptions: {
55-
doSync: false,
56-
withAggs: false,
57-
},
43+
const workflowId = `${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}`
44+
45+
try {
46+
const handle = svc.temporal.workflow.getHandle(workflowId)
47+
const { status } = await handle.describe()
48+
49+
if (status.name === 'RUNNING') {
50+
await handle.result()
51+
}
52+
} catch (err) {
53+
if (err.name !== 'WorkflowNotFoundError') {
54+
svc.log.error({ err }, 'Failed to check workflow state')
55+
throw err
56+
}
57+
}
58+
59+
try {
60+
await svc.temporal.workflow.start('organizationUpdate', {
61+
taskQueue: 'profiles',
62+
workflowId,
63+
workflowIdReusePolicy: WorkflowIdReusePolicy.ALLOW_DUPLICATE,
64+
workflowIdConflictPolicy: WorkflowIdConflictPolicy.FAIL,
65+
retry: {
66+
maximumAttempts: 10,
5867
},
59-
],
60-
})
68+
args: [
69+
{
70+
organization: {
71+
id: organizationId,
72+
},
73+
recalculateAffiliations: true,
74+
syncOptions: {
75+
doSync: false,
76+
withAggs: false,
77+
},
78+
},
79+
],
80+
})
81+
} catch (err) {
82+
if (err.name === 'WorkflowExecutionAlreadyStartedError') {
83+
svc.log.info({ workflowId }, 'Workflow already started, skipping')
84+
return
85+
}
6186

62-
await svc.temporal.workflow.result(`${TemporalWorkflowId.ORGANIZATION_UPDATE}/${organizationId}`)
87+
throw err
88+
}
6389
}
6490

6591
export async function syncOrganization(organizationId: string, syncStart: Date): Promise<void> {

services/apps/entity_merging_worker/src/workflows/all.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ const {
1111
notifyFrontendOrganizationMergeSuccessful,
1212
notifyFrontendOrganizationUnmergeSuccessful,
1313
recalculateActivityAffiliationsOfMemberAsync,
14-
recalculateActivityAffiliationsOfOrganizationSynchronous,
14+
recalculateActivityAffiliationsOfOrganizationAsync,
1515
setMergeAction,
1616
syncMember,
1717
syncOrganization,
@@ -118,8 +118,8 @@ export async function finishOrganizationUnmerging(
118118
await setMergeAction(primaryId, secondaryId, {
119119
step: MergeActionStep.UNMERGE_ASYNC_STARTED,
120120
})
121-
await recalculateActivityAffiliationsOfOrganizationSynchronous(primaryId)
122-
await recalculateActivityAffiliationsOfOrganizationSynchronous(secondaryId)
121+
await recalculateActivityAffiliationsOfOrganizationAsync(primaryId)
122+
await recalculateActivityAffiliationsOfOrganizationAsync(secondaryId)
123123
const syncStart = new Date()
124124
await syncOrganization(primaryId, syncStart)
125125
await syncOrganization(secondaryId, syncStart)

0 commit comments

Comments
 (0)