Skip to content

Commit aad50ae

Browse files
authored
fix: nango repos to delete logic (#3696)
Signed-off-by: Joana Maia <[email protected]>
1 parent d09c0ae commit aad50ae

File tree

4 files changed

+140
-55
lines changed

4 files changed

+140
-55
lines changed

services/apps/cron_service/src/jobs/nangoConnectionCleanup.job.ts

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { IS_DEV_ENV, IS_STAGING_ENV, singleOrDefault } from '@crowd/common'
44
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
55
import {
66
INangoIntegrationData,
7+
fetchNangoDeletedIntegrationData,
78
fetchNangoIntegrationData,
89
} from '@crowd/data-access-layer/src/integrations'
910
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
@@ -15,6 +16,7 @@ import {
1516
initNangoCloudClient,
1617
nangoIntegrationToPlatform,
1718
} from '@crowd/nango'
19+
import { PlatformType } from '@crowd/types'
1820

1921
import { IJobDefinition } from '../types'
2022

@@ -28,11 +30,24 @@ const job: IJobDefinition = {
2830

2931
await initNangoCloudClient()
3032
const dbConnection = await getDbConnection(READ_DB_CONFIG(), 3, 0)
33+
const qx = pgpQx(dbConnection)
3134

32-
const allIntegrations = await fetchNangoIntegrationData(pgpQx(dbConnection), [
35+
// Fetch all active integrations
36+
const activeIntegrations = await fetchNangoIntegrationData(qx, [
3337
...new Set(ALL_NANGO_INTEGRATIONS.map(nangoIntegrationToPlatform)),
3438
])
3539

40+
// Fetch deleted integrations to clean up their connections
41+
const deletedIntegrations = await fetchNangoDeletedIntegrationData(qx, [
42+
...new Set(ALL_NANGO_INTEGRATIONS.map(nangoIntegrationToPlatform)),
43+
])
44+
45+
// Build a set of deleted integration IDs for quick lookup
46+
const deletedIntegrationIds = new Set(deletedIntegrations.map((i) => i.id))
47+
48+
// Combine for lookup purposes
49+
const allIntegrations = [...activeIntegrations, ...deletedIntegrations]
50+
3651
const nangoConnections = await getNangoConnections()
3752

3853
for (const connection of nangoConnections.filter((c) =>
@@ -48,7 +63,7 @@ const job: IJobDefinition = {
4863
// github integrations have connection ids per repo in the settings nangoMapping object
4964
if (connection.provider_config_key === NangoIntegration.GITHUB) {
5065
integration = singleOrDefault(allIntegrations, (i) => {
51-
if (i.platform !== NangoIntegration.GITHUB) {
66+
if (i.platform !== PlatformType.GITHUB_NANGO) {
5267
return false
5368
}
5469

@@ -58,21 +73,27 @@ const job: IJobDefinition = {
5873

5974
return false
6075
})
76+
// for other integrations, check if the connection belongs to an integration
6177
} else {
6278
integration = singleOrDefault(allIntegrations, (i) => i.id === connection.connection_id)
6379
}
6480

65-
if (!integration) {
66-
// check if connection.created is older than 30 days
67-
const created = new Date(connection.created)
68-
69-
if (created < new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)) {
70-
ctx.log.info(`Deleting stale connection ${connection.connection_id}`)
71-
await deleteNangoConnection(
72-
connection.provider_config_key as NangoIntegration,
73-
connection.connection_id,
74-
)
75-
}
81+
const integrationCreatedAt = new Date(connection.created)
82+
const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000)
83+
84+
const shouldDelete =
85+
integrationCreatedAt < thirtyDaysAgo &&
86+
// If connection doesn't belong to any integration, delete after 30 days
87+
(!integration ||
88+
// If connection belongs to a deleted integration, delete after 30 days
89+
(integration && deletedIntegrationIds.has(integration?.id)))
90+
91+
if (shouldDelete) {
92+
ctx.log.info(`Deleting stale connection ${connection.connection_id}`)
93+
await deleteNangoConnection(
94+
connection.provider_config_key as NangoIntegration,
95+
connection.connection_id,
96+
)
7697
}
7798
}
7899
},

services/apps/nango_worker/src/activities/nangoActivities.ts

Lines changed: 1 addition & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import {
44
addGitHubRepoMapping,
55
addGithubNangoConnection,
66
addRepoToGitIntegration,
7-
fetchDeletedIntegrationById,
87
fetchIntegrationById,
98
findIntegrationDataForNangoWebhookProcessing,
109
removeGitHubRepoMapping,
@@ -342,31 +341,7 @@ export async function analyzeGithubIntegration(
342341
svc.log.warn(`Integration ${integrationId} is not a Github Nango integration!`)
343342
}
344343
} else {
345-
const deletedIntegration = await fetchDeletedIntegrationById(
346-
dbStoreQx(svc.postgres.writer),
347-
integrationId,
348-
)
349-
350-
if (deletedIntegration && deletedIntegration.platform === PlatformType.GITHUB_NANGO) {
351-
const settings = deletedIntegration.settings
352-
353-
if (settings.nangoMapping) {
354-
const nangoMapping = settings.nangoMapping as Record<string, IGithubRepoData>
355-
const connectionIds = Object.keys(nangoMapping)
356-
357-
for (const connectionId of connectionIds) {
358-
reposToDelete.push({
359-
repo: nangoMapping[connectionId],
360-
connectionId,
361-
})
362-
}
363-
}
364-
svc.log.info(
365-
`For deleted integration ${integrationId} found ${reposToDelete.length} connections to delete!`,
366-
)
367-
} else {
368-
svc.log.warn(`Integration ${integrationId} not found!`)
369-
}
344+
svc.log.warn(`Integration ${integrationId} not found!`)
370345
}
371346

372347
return {
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { READ_DB_CONFIG, getDbConnection } from '@crowd/data-access-layer/src/database'
2+
import { fetchNangoDeletedIntegrationData } from '@crowd/data-access-layer/src/integrations'
3+
import { pgpQx } from '@crowd/data-access-layer/src/queryExecutor'
4+
import { getServiceLogger } from '@crowd/logging'
5+
import {
6+
ALL_NANGO_INTEGRATIONS,
7+
NangoIntegration,
8+
deleteNangoConnection,
9+
getNangoConnections,
10+
initNangoCloudClient,
11+
nangoIntegrationToPlatform,
12+
} from '@crowd/nango'
13+
import { PlatformType } from '@crowd/types'
14+
15+
const log = getServiceLogger()
16+
17+
const processArguments = process.argv.slice(2)
18+
19+
let deleteDisconnectedIntegrations = false
20+
if (processArguments.includes('--delete')) {
21+
deleteDisconnectedIntegrations = true
22+
}
23+
24+
setImmediate(async () => {
25+
const db = await getDbConnection(READ_DB_CONFIG())
26+
27+
await initNangoCloudClient()
28+
29+
const deletedNangoIntegrations = await fetchNangoDeletedIntegrationData(pgpQx(db), [
30+
...new Set(
31+
ALL_NANGO_INTEGRATIONS.map(nangoIntegrationToPlatform).filter(
32+
(platform) => platform !== null,
33+
),
34+
),
35+
])
36+
37+
const nangoConnections = await getNangoConnections()
38+
39+
const connectionIds: string[] = []
40+
// Map connectionId -> integrationId to track which integration each connection belongs to
41+
const connectionToIntegrationMap = new Map<string, string>()
42+
43+
for (const int of deletedNangoIntegrations) {
44+
if (int.platform === PlatformType.GITHUB_NANGO) {
45+
if (int.settings?.nangoMapping) {
46+
const connectionIdsForIntegration = Object.keys(int.settings.nangoMapping)
47+
connectionIds.push(...connectionIdsForIntegration)
48+
// Map each connection ID to this integration ID
49+
for (const connectionId of connectionIdsForIntegration) {
50+
connectionToIntegrationMap.set(connectionId, int.id)
51+
}
52+
} else {
53+
log.info(
54+
{ integrationId: int.id, platform: int.platform },
55+
'Integration has no nangoMapping in settings, skipping',
56+
)
57+
}
58+
} else {
59+
connectionIds.push(int.id)
60+
// For non-GitHub integrations, the connection_id is the integration ID
61+
connectionToIntegrationMap.set(int.id, int.id)
62+
}
63+
}
64+
65+
for (const nangoConnection of nangoConnections.filter(
66+
(c) => !c.connection_id.startsWith('github-token-'),
67+
)) {
68+
if (connectionIds.includes(nangoConnection.connection_id)) {
69+
const integrationId = connectionToIntegrationMap.get(nangoConnection.connection_id)
70+
if (deleteDisconnectedIntegrations) {
71+
log.warn(
72+
`Connection ${nangoConnection.connection_id} (${nangoConnection.provider_config_key}) is connected to a deleted integration ${integrationId} - deleting it...`,
73+
)
74+
75+
await deleteNangoConnection(
76+
nangoConnection.provider_config_key as NangoIntegration,
77+
nangoConnection.connection_id,
78+
)
79+
} else {
80+
log.info(
81+
`Connection ${nangoConnection.connection_id} (${nangoConnection.provider_config_key}) is connected to a deleted integration ${integrationId} and should be deleted!`,
82+
)
83+
}
84+
}
85+
}
86+
87+
process.exit(0)
88+
})

services/libs/data-access-layer/src/integrations/index.ts

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -233,22 +233,6 @@ export async function fetchIntegrationById(
233233
)
234234
}
235235

236-
export async function fetchDeletedIntegrationById(
237-
qx: QueryExecutor,
238-
id: string,
239-
): Promise<INangoIntegrationData | null> {
240-
return qx.selectOneOrNone(
241-
`
242-
select id, platform, settings, "segmentId"
243-
from integrations
244-
where "deletedAt" is not null and id = $(id)
245-
`,
246-
{
247-
id,
248-
},
249-
)
250-
}
251-
252236
export async function setGithubIntegrationSettingsOrgs(
253237
qx: QueryExecutor,
254238
integrationId: string,
@@ -301,6 +285,23 @@ export async function fetchNangoIntegrationData(
301285
)
302286
}
303287

288+
export async function fetchNangoDeletedIntegrationData(
289+
qx: QueryExecutor,
290+
platforms: string[],
291+
): Promise<INangoIntegrationData[]> {
292+
return qx.select(
293+
`
294+
select id, platform, settings
295+
from integrations
296+
where platform in ($(platforms:csv)) and "deletedAt" is not null
297+
order by "updatedAt" asc
298+
`,
299+
{
300+
platforms,
301+
},
302+
)
303+
}
304+
304305
export async function findIntegrationDataForNangoWebhookProcessing(
305306
qx: QueryExecutor,
306307
id: string,

0 commit comments

Comments
 (0)