Skip to content

Commit 7e02326

Browse files
author
Uros Marolt
authored
Bugfix/sync script optimization (#1390)
1 parent c4f1ba1 commit 7e02326

26 files changed

+369
-91
lines changed

backend/src/database/migrations/U1693124397__searchSyncedAt-index-fix.sql

Whitespace-only changes.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
drop index if exists "ix_activities_searchSyncedAt";
2+
drop index if exists "ix_members_searchSyncedAt";
3+
create index if not exists "ix_organizations_tenantId_searchSyncedAt" on organizations ("tenantId", "searchSyncedAt");
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
import { DB_CONFIG, SQS_CONFIG } from '@/conf'
2+
import IntegrationRunRepository from '@/repo/integrationRun.repo'
3+
import { singleOrDefault, timeout } from '@crowd/common'
4+
import { DbStore, getDbConnection } from '@crowd/database'
5+
import { INTEGRATION_SERVICES } from '@crowd/integrations'
6+
import { getServiceLogger } from '@crowd/logging'
7+
import { IntegrationRunWorkerEmitter, getSqsClient } from '@crowd/sqs'
8+
9+
const log = getServiceLogger()
10+
11+
setImmediate(async () => {
12+
const sqsClient = getSqsClient(SQS_CONFIG())
13+
const emitter = new IntegrationRunWorkerEmitter(sqsClient, log)
14+
await emitter.init()
15+
16+
const dbConnection = getDbConnection(DB_CONFIG(), 1)
17+
const store = new DbStore(log, dbConnection)
18+
19+
const repo = new IntegrationRunRepository(store, log)
20+
21+
const tenantIds = await repo.getTenantsWithIntegrations()
22+
23+
for (const tenantId of tenantIds) {
24+
const integrations = await repo.getTenantIntegrations(tenantId)
25+
26+
for (const integration of integrations) {
27+
if (singleOrDefault(INTEGRATION_SERVICES, (s) => s.type === integration.type)) {
28+
log.info(
29+
{ integrationId: integration.id },
30+
'Integration found in new framework - triggering onboarding!',
31+
)
32+
await emitter.triggerIntegrationRun(
33+
integration.tenantId,
34+
integration.type,
35+
integration.id,
36+
true,
37+
)
38+
39+
log.info('Sleeping for 5 minutes between starts of integration onboarding!')
40+
await timeout(5 * 60 * 1000)
41+
}
42+
}
43+
44+
log.info('Sleeping for 15 minutes between starts of tenant onboarding!')
45+
await timeout(15 * 60 * 1000)
46+
}
47+
48+
log.info('Done!')
49+
process.exit(0)
50+
})

services/apps/integration_run_worker/src/repo/integrationRun.repo.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,34 @@ export default class IntegrationRunRepository extends RepositoryBase<Integration
7272
return result.id
7373
}
7474

75+
public async getTenantsWithIntegrations(): Promise<string[]> {
76+
const results = await this.db().any(
77+
`
78+
select distinct "tenantId" from integrations where "deletedAt" is null;
79+
`,
80+
)
81+
82+
return results.map((r) => r.tenantId)
83+
}
84+
85+
public async getTenantIntegrations(tenantId: string): Promise<IStartIntegrationRunData[]> {
86+
const results = await this.db().any(
87+
`
88+
select id,
89+
platform as type,
90+
status as state,
91+
"integrationIdentifier" as identifier,
92+
"tenantId"
93+
from integrations where "tenantId" = $(tenantId) and "deletedAt" is null
94+
`,
95+
{
96+
tenantId,
97+
},
98+
)
99+
100+
return results
101+
}
102+
75103
public async getIntegrationData(integrationId: string): Promise<IStartIntegrationRunData | null> {
76104
const results = await this.db().oneOrNone(
77105
`

services/apps/integration_run_worker/src/service/integrationRunService.ts

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,11 +66,6 @@ export default class IntegrationRunService extends LoggerBase {
6666
}
6767
}
6868

69-
if (count === 0) {
70-
this.log.error('This run has no streams!')
71-
return
72-
}
73-
7469
if (count === finishedCount) {
7570
const runInfo = await this.repo.getGenerateStreamData(runId)
7671

@@ -208,6 +203,7 @@ export default class IntegrationRunService extends LoggerBase {
208203
this.log.error({ err }, 'Error while starting integration sync remote!')
209204
}
210205

206+
this.log.info('Marking run and integration as successfully processed!')
211207
await this.repo.markRunProcessed(runId)
212208
await this.repo.markIntegration(runId, 'done')
213209

services/apps/integration_stream_worker/src/repo/integrationStream.repo.ts

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -147,16 +147,11 @@ export default class IntegrationStreamRepository extends RepositoryBase<Integrat
147147
this.checkUpdateRowCount(result.rowCount, 1)
148148
}
149149

150-
public async markStreamProcessed(streamId: string): Promise<void> {
150+
public async deleteStream(streamId: string): Promise<void> {
151151
const result = await this.db().result(
152-
`update integration.streams
153-
set state = $(state),
154-
"processedAt" = now(),
155-
"updatedAt" = now()
156-
where id = $(streamId)`,
152+
`delete from integration.streams where id = $(streamId)`,
157153
{
158154
streamId,
159-
state: IntegrationStreamState.PROCESSED,
160155
},
161156
)
162157

services/apps/integration_stream_worker/src/service/integrationStreamService.ts

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -335,14 +335,11 @@ export default class IntegrationStreamService extends LoggerBase {
335335
},
336336
}
337337

338-
this.log.debug('Marking webhook stream as in progress!')
339-
await this.repo.markStreamInProgress(streamId)
340-
341338
this.log.debug('Processing webhook stream!')
342339
try {
343340
await integrationService.processWebhookStream(context)
344341
this.log.debug('Finished processing webhook stream!')
345-
await this.repo.markStreamProcessed(streamId)
342+
await this.repo.deleteStream(streamId)
346343
await this.webhookRepo.markWebhookProcessed(webhookId)
347344
} catch (err) {
348345
this.log.error(err, 'Error while processing webhook stream!')
@@ -486,16 +483,11 @@ export default class IntegrationStreamService extends LoggerBase {
486483
},
487484
}
488485

489-
this.log.debug('Marking stream as in progress!')
490-
await this.repo.markStreamInProgress(streamId)
491-
// TODO we might need that later to check for stuck runs
492-
// await this.repo.touchRun(streamInfo.runId)
493-
494486
this.log.debug('Processing stream!')
495487
try {
496488
await integrationService.processStream(context)
497489
this.log.debug('Finished processing stream!')
498-
await this.repo.markStreamProcessed(streamId)
490+
await this.repo.deleteStream(streamId)
499491
} catch (err) {
500492
this.log.error(err, 'Error while processing stream!')
501493
await this.triggerStreamError(
@@ -506,8 +498,6 @@ export default class IntegrationStreamService extends LoggerBase {
506498
err,
507499
)
508500
} finally {
509-
// TODO we might need that later to check for stuck runs
510-
// await this.repo.touchRun(streamInfo.runId)
511501
await this.runWorkerEmitter.streamProcessed(
512502
streamInfo.tenantId,
513503
streamInfo.integrationType,

services/apps/search_sync_worker/src/bin/cleanup-all-activities.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ const log = getServiceLogger()
99

1010
setImmediate(async () => {
1111
const openSearchService = new OpenSearchService(log)
12-
await openSearchService.initialize()
1312

1413
const dbConnection = getDbConnection(DB_CONFIG())
1514
const store = new DbStore(log, dbConnection)

services/apps/search_sync_worker/src/bin/cleanup-all-members.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ const log = getServiceLogger()
1010

1111
setImmediate(async () => {
1212
const openSearchService = new OpenSearchService(log)
13-
await openSearchService.initialize()
1413

1514
const redis = await getRedisClient(REDIS_CONFIG(), true)
1615

services/apps/search_sync_worker/src/bin/cleanup-all-organizations.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ const log = getServiceLogger()
99

1010
setImmediate(async () => {
1111
const openSearchService = new OpenSearchService(log)
12-
await openSearchService.initialize()
1312

1413
const dbConnection = getDbConnection(DB_CONFIG())
1514
const store = new DbStore(log, dbConnection)

0 commit comments

Comments
 (0)