diff --git a/scripts/scaffold/kafka-connect/Dockerfile b/scripts/scaffold/kafka-connect/Dockerfile index 015bf97f41..1aacf70d27 100644 --- a/scripts/scaffold/kafka-connect/Dockerfile +++ b/scripts/scaffold/kafka-connect/Dockerfile @@ -5,7 +5,10 @@ USER root RUN yum install -y jq findutils unzip RUN confluent-hub install snowflakeinc/snowflake-kafka-connector:2.5.0 --no-prompt +RUN confluent-hub install confluentinc/kafka-connect-jdbc:10.7.4 --no-prompt + COPY tmp/kafka-connect-http/ /usr/share/confluent-hub-components/kafka-connect-http/ +COPY tmp/custom-plugins/tinybird-smt-1.0.1.jar /usr/share/java/tinybird-smt-1.0.1.jar VOLUME /storage diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe new file mode 100644 index 0000000000..03771a9cec --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_per_segments_sink.pipe @@ -0,0 +1,104 @@ +DESCRIPTION > + CDP dashboard metrics per segment with hierarchy aggregation. + Clean approach assuming segments table is properly synced with complete hierarchy data. + +NODE subprojectMetrics +SQL > + SELECT + segments.id AS segmentId, + 'subproject' AS segmentType, + segments.parentId, + segments.grandparentId, + segments.slug AS segmentSlug, + segments.parentSlug, + segments.grandparentSlug, + segments.name AS segmentName, + segments.parentName, + segments.grandparentName, + count() AS activitiesTotal, + countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + countDistinct(ar.memberId) AS membersTotal, + countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, + countDistinct(ar.organizationId) AS organizationsTotal, + countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days + FROM segments FINAL + LEFT JOIN activityRelations_deduplicated_ds AS ar ON segments.id = ar.segmentId + LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id + LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id + WHERE segments.parentSlug IS NOT NULL AND segments.grandparentSlug IS NOT NULL + AND segments.parentId IS NOT NULL AND segments.grandparentId IS NOT NULL + AND segments.parentId != '' AND segments.grandparentId != '' + GROUP BY segments.id, segments.parentId, segments.grandparentId, segments.slug, segments.parentSlug, segments.grandparentSlug, + segments.name, segments.parentName, segments.grandparentName + +NODE projectMetrics +SQL > + SELECT + s.parentId AS segmentId, + 'project' AS segmentType, + s.parentId, + s.grandparentId, + s.parentSlug AS segmentSlug, + s.parentSlug, + s.grandparentSlug, + s.parentName AS segmentName, + s.parentName, + s.grandparentName, + count() AS activitiesTotal, + countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + countDistinct(ar.memberId) AS membersTotal, + countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, + countDistinct(ar.organizationId) AS organizationsTotal, + countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days + FROM (SELECT * FROM segments FINAL) AS s + LEFT JOIN activityRelations_deduplicated_ds AS ar ON s.id = ar.segmentId + LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id + LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id + WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL + AND s.parentId IS NOT NULL AND s.grandparentId IS NOT NULL + AND s.parentId != '' AND s.grandparentId != '' + GROUP BY s.parentId, s.grandparentId, s.parentSlug, s.grandparentSlug, s.parentName, s.grandparentName + +NODE projectGroupMetrics +SQL > + SELECT + s.grandparentId AS segmentId, + 'projectGroup' AS segmentType, + s.grandparentId AS parentId, + s.grandparentId, + s.grandparentSlug AS segmentSlug, + s.grandparentSlug AS parentSlug, + s.grandparentSlug AS grandparentSlug, + s.grandparentName AS segmentName, + s.grandparentName AS parentName, + s.grandparentName AS grandparentName, + count() AS activitiesTotal, + countIf(ar.createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days, + countDistinct(ar.memberId) AS membersTotal, + countDistinctIf(ar.memberId, mem.joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days, + countDistinct(ar.organizationId) AS organizationsTotal, + countDistinctIf(ar.organizationId, org.createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days + FROM (SELECT * FROM segments FINAL) AS s + LEFT JOIN activityRelations_deduplicated_ds AS ar ON s.id = ar.segmentId + LEFT JOIN (SELECT * FROM members FINAL) AS mem ON ar.memberId = mem.id + LEFT JOIN (SELECT * FROM organizations FINAL) AS org ON ar.organizationId = org.id + WHERE s.parentSlug IS NOT NULL AND s.grandparentSlug IS NOT NULL + AND s.parentId IS NOT NULL AND s.grandparentId IS NOT NULL + AND s.parentId != '' AND s.grandparentId != '' + GROUP BY s.grandparentId, s.grandparentSlug, s.grandparentName + +NODE cdpDashboardMetricsPerSegment +SQL > + SELECT * FROM subprojectMetrics + UNION ALL + SELECT * FROM projectMetrics + UNION ALL + SELECT * FROM projectGroupMetrics + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging +EXPORT_SCHEDULE 0 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_per_segment_sink \ No newline at end of file diff --git a/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe new file mode 100644 index 0000000000..c38a151efd --- /dev/null +++ b/services/libs/tinybird/pipes/cdp_dashboard_metrics_total_sink.pipe @@ -0,0 +1,46 @@ +DESCRIPTION > + Global metrics for activities, organizations, and members used for CDP dashboard. We. are referring to the total witouth filtering by any segment + +NODE activityRelationsMetricsTotal +SQL > + SELECT + count() AS activitiesTotal, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS activitiesLast30Days + FROM activityRelations_deduplicated_ds + +NODE organizationsMetricsTotal +SQL > + SELECT + count() AS organizationsTotal, + countIf(createdAt >= now() - INTERVAL 30 DAY) AS organizationsLast30Days + FROM organizations FINAL + +NODE membersMetricsTotal +SQL > + SELECT count() AS membersTotal, countIf(joinedAt >= now() - INTERVAL 30 DAY) AS membersLast30Days + FROM members FINAL + +NODE mergeResults +SQL > + SELECT + -- activity + (SELECT activitiesTotal FROM activityRelationsMetricsTotal) AS activitiesTotal, + (SELECT activitiesLast30Days FROM activityRelationsMetricsTotal) AS activitiesLast30Days, + -- organizations + (SELECT organizationsTotal FROM organizationsMetricsTotal) AS organizationsTotal, + (SELECT organizationsLast30Days FROM organizationsMetricsTotal) AS organizationsLast30Days, + -- members + (SELECT membersTotal FROM membersMetricsTotal) AS membersTotal, + (SELECT membersLast30Days FROM membersMetricsTotal) AS membersLast30Days + +NODE cdpDashboardFullMetricsTotal +SQL > + SELECT * FROM mergeResults + +TYPE SINK +EXPORT_SERVICE kafka +EXPORT_CONNECTION_NAME lfx-oracle-kafka-streaming-staging +EXPORT_SCHEDULE 0 9 * * * +EXPORT_FORMAT json +EXPORT_STRATEGY @new +EXPORT_KAFKA_TOPIC cdp_dashboard_metrics_total_sink