Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/api/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ ts_test_suite(
data = [
":node_modules",
"//packages/@dataform/core:package_tar",
"//test_credentials:bigquery.json",
# "//test_credentials:bigquery.json",
"@nodejs//:node",
"@nodejs//:npm",
] + glob(["goldens/**"]),
Expand Down
121 changes: 96 additions & 25 deletions cli/api/dbadapters/execution_sql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,8 @@ from (${query}) as insertions`;
this.buildIncrementalSchemaChangeTasks(tasks, table);
// Fall through to run the static DML after the procedure alters the schema
case dataform.OnSchemaChange.IGNORE:
default:
tasks.add(
Task.statement(
table.uniqueKey && table.uniqueKey.length > 0
? this.mergeInto(
table.target,
tableMetadata?.fields.map(f => f.name),
this.getIncrementalQuery(table),
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter
)
: this.insertInto(
table.target,
tableMetadata?.fields.map(f => f.name).map(column => `\`${column}\``),
this.getIncrementalQuery(table)
)
)
);
const columns = tableMetadata?.fields.map(f => f.name) || [];
tasks.add(Task.statement(this.getIncrementalDmlStatement(table, columns)));
break;
}
}
Expand Down Expand Up @@ -222,6 +206,14 @@ from (${query}) as insertions`;
return `drop ${this.tableTypeAsSql(type)} if exists ${this.resolveTarget(target)}`;
}

private buildIncrementalPredicatesString(incrementalPredicates: string[]): string {
const validPredicates = incrementalPredicates ? incrementalPredicates.filter(p => p.trim() !== "") : [];
if (validPredicates.length === 0) {
return "";
}
return `and ${validPredicates.map(p => `(${p})`).join(" and ")}`;
}

private buildIncrementalSchemaChangeTasks(tasks: Tasks, table: dataform.ITable) {
const uniqueId = this.uniqueIdGenerator();

Expand Down Expand Up @@ -451,25 +443,104 @@ DROP TABLE IF EXISTS ${emptyTempTableName};
create or replace view ${this.resolveTarget(target)} as ${query}`;
}

private mergeInto(
private mergeInto(
target: dataform.ITarget,
columns: string[],
query: string,
uniqueKey: string[],
updatePartitionFilter: string
updatePartitionFilter: string,
incrementalPredicates: string[]
) {
const incrementalPredicatesString = this.buildIncrementalPredicatesString(incrementalPredicates);
const backtickedColumns = columns.map(column => `\`${column}\``);
return `
merge ${this.resolveTarget(target)} T
merge ${this.resolveTarget(target)} DATAFORM_DEST
using (${query}
) S
on ${uniqueKey.map(uniqueKeyCol => `T.${uniqueKeyCol} = S.${uniqueKeyCol}`).join(` and `)}
${updatePartitionFilter ? `and T.${updatePartitionFilter}` : ""}
) DATAFORM_SOURCE
on ${uniqueKey.map(uniqueKeyCol => `DATAFORM_DEST.${uniqueKeyCol} = DATAFORM_SOURCE.${uniqueKeyCol}`).join(` and `)} ${updatePartitionFilter ? `and DATAFORM_DEST.${updatePartitionFilter}` : ""}
${incrementalPredicatesString ? ` ${incrementalPredicatesString}` : ""}
when matched then
update set ${columns.map(column => `\`${column}\` = S.${column}`).join(",")}
update set ${columns.map(column => `\`${column}\` = DATAFORM_SOURCE.${column}`).join(",")}
when not matched then
insert (${backtickedColumns.join(",")}) values (${backtickedColumns.join(",")})`;
}

private insertOverwrite(
target: dataform.ITarget,
columns: string[],
query: string,
partitionBy: string,
Comment thread
Tuseeq1 marked this conversation as resolved.
updatePartitionFilter: string,
incrementalPredicates: string[]
): string {
const incrementalPredicatesString = this.buildIncrementalPredicatesString(incrementalPredicates);
const uniqueId = this.uniqueIdGenerator();
const stagingTableUnqualified = `staging_table_temp_${uniqueId}`;
const backtickedColumns = columns.map(column => `\`${column}\``);
const resolveTargetTable = this.resolveTarget(target);

return `CREATE OR REPLACE TEMP TABLE \`${stagingTableUnqualified}\` AS (
Comment thread
kolina marked this conversation as resolved.
${query}
);

BEGIN
DECLARE partitions_for_replacement DEFAULT (
ARRAY(
SELECT DISTINCT ${partitionBy}
FROM \`${stagingTableUnqualified}\`
WHERE ${partitionBy} IS NOT NULL
Comment thread
Tuseeq1 marked this conversation as resolved.
)
);

MERGE ${resolveTargetTable} DATAFORM_DEST
USING \`${stagingTableUnqualified}\` DATAFORM_SOURCE
ON FALSE
WHEN NOT MATCHED BY SOURCE AND ${partitionBy} IN UNNEST(partitions_for_replacement) ${updatePartitionFilter ? `and DATAFORM_DEST.${updatePartitionFilter}` : ""}
${incrementalPredicatesString ? ` ${incrementalPredicatesString}` : ""}
THEN
DELETE
WHEN NOT MATCHED BY TARGET THEN
INSERT (${backtickedColumns.join(",")}) VALUES (${backtickedColumns.join(",")});
END;

DROP TABLE IF EXISTS \`${stagingTableUnqualified}\`;`;
}

private getIncrementalDmlStatement(
table: dataform.ITable,
columns: string[]
): string {
const incrementalQuery = this.getIncrementalQuery(table);

switch (table.incrementalStrategy) {
case dataform.IncrementalStrategy.INSERT_OVERWRITE:
return this.insertOverwrite(
table.target,
columns,
incrementalQuery,
table.bigquery && table.bigquery.partitionBy,
table.bigquery && table.bigquery.updatePartitionFilter,
table.bigquery && table.bigquery.incrementalPredicates
);
case dataform.IncrementalStrategy.MERGE:
default:
if (table.uniqueKey && table.uniqueKey.length > 0) {
return this.mergeInto(
table.target,
columns,
incrementalQuery,
table.uniqueKey,
table.bigquery && table.bigquery.updatePartitionFilter,
table.bigquery && table.bigquery.incrementalPredicates
);
}
return this.insertInto(
table.target,
columns.map(column => `\`${column}\``),
incrementalQuery
);
}
}
}

export function collectEvaluationQueries(
Expand Down
30 changes: 30 additions & 0 deletions cli/api/execution_sql_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,34 @@ suite("ExecutionSql with 'onSchemaChange'", () => {
const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_ignore.sql", "utf8");
expect(procedureSql).to.equal(expectedSql.trim());
});

test("generates INSERT_OVERWRITE script for IGNORE strategy", () => {
const table = {
...baseTable,
incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE,
bigquery: {
partitionBy: "DATE(ts)",

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have an example column that is not a function like Date. My concern is that we a re biased toward handling this method. so it its just a normal column name we might not handle it correctly.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added additional test for normal column.

updatePartitionFilter: "ts >= '2024-01-01'"
}
};
const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata);
const sql = tasks.build().map(t => t.statement).join("\n;\n");
const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_ignore.sql", "utf8");
expect(sql).to.equal(expectedSql.trim());
});

test("generates INSERT_OVERWRITE script for EXTEND strategy", () => {
const table = {
...baseTable,
incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE,
onSchemaChange: dataform.OnSchemaChange.EXTEND,
bigquery: {
partitionBy: "DATE(ts)"
}
};
const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata);
const sql = tasks.build().map(t => t.statement).join("\n;\n");
const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_extend.sql", "utf8");
expect(sql).to.equal(expectedSql.trim());
});
});
99 changes: 99 additions & 0 deletions cli/api/goldens/insert_overwrite_extend.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`()
OPTIONS(strict_mode=false)
BEGIN

-- Create empty table to extract schema of new query.
CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS (
SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0
);


-- Compare schemas
DECLARE dataform_columns ARRAY<STRING>;
DECLARE temp_table_columns ARRAY<STRUCT<column_name STRING, data_type STRING>>;
DECLARE columns_added ARRAY<STRUCT<column_name STRING, data_type STRING>>;
DECLARE columns_removed ARRAY<STRING>;

SET dataform_columns = (
SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), [])
FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'incremental_on_schema_change'
);

SET temp_table_columns = (
SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), [])
FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty'
);

SET columns_added = (
SELECT IFNULL(ARRAY_AGG(column_info), [])
FROM UNNEST(temp_table_columns) AS column_info
WHERE column_info.column_name NOT IN UNNEST(dataform_columns)
);
SET columns_removed = (
SELECT IFNULL(ARRAY_AGG(column_name), [])
FROM UNNEST(dataform_columns) AS column_name
WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col)
);


-- Apply schema change strategy (EXTEND).
IF ARRAY_LENGTH(columns_removed) > 0 THEN
RAISE USING MESSAGE = FORMAT(
"Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T",
columns_removed
);
END IF;

IF ARRAY_LENGTH(columns_added) > 0 THEN
EXECUTE IMMEDIATE (
"ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " ||
(
SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ")
FROM UNNEST(columns_added) AS column_info
)
);
END IF;



-- Cleanup temporary tables.
DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`;

END
;
BEGIN
CALL `project-id.dataset-id.df_osc_test_uuid`();
EXCEPTION WHEN ERROR THEN
DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`;
DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`;
RAISE;
END;
DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`
;
CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS (
select 1 as id, 'a' as field1, 'new' as field2
);

BEGIN
DECLARE partitions_for_replacement DEFAULT (
ARRAY(
SELECT DISTINCT DATE(ts)
FROM `staging_table_temp_test_uuid`
WHERE DATE(ts) IS NOT NULL
)
);

MERGE `project-id.dataset-id.incremental_on_schema_change` DATAFORM_DEST
USING `staging_table_temp_test_uuid` DATAFORM_SOURCE
ON FALSE
WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement)

THEN
DELETE
WHEN NOT MATCHED BY TARGET THEN
INSERT (`id`,`field1`) VALUES (`id`,`field1`);
END;

DROP TABLE IF EXISTS `staging_table_temp_test_uuid`
25 changes: 25 additions & 0 deletions cli/api/goldens/insert_overwrite_ignore.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS (
select 1 as id, 'a' as field1, 'new' as field2
);

BEGIN
DECLARE partitions_for_replacement DEFAULT (
ARRAY(
SELECT DISTINCT DATE(ts)
FROM `staging_table_temp_test_uuid`
WHERE DATE(ts) IS NOT NULL
)
);

MERGE `project-id.dataset-id.incremental_on_schema_change` DATAFORM_DEST
USING `staging_table_temp_test_uuid` DATAFORM_SOURCE
ON FALSE
WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) and DATAFORM_DEST.ts >= '2024-01-01'

THEN
DELETE
WHEN NOT MATCHED BY TARGET THEN
INSERT (`id`,`field1`) VALUES (`id`,`field1`);
END;

DROP TABLE IF EXISTS `staging_table_temp_test_uuid`
25 changes: 25 additions & 0 deletions cli/api/goldens/insert_overwrite_simple_column.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS (
select 1 as id, 'a' as field1, 'new' as field2
);

BEGIN
DECLARE partitions_for_replacement DEFAULT (
ARRAY(
SELECT DISTINCT date_col
FROM `staging_table_temp_test_uuid`
WHERE date_col IS NOT NULL
)
);

MERGE `project-id.dataset-id.incremental_on_schema_change` DATAFORM_DEST
USING `staging_table_temp_test_uuid` DATAFORM_SOURCE
ON FALSE
WHEN NOT MATCHED BY SOURCE AND date_col IN UNNEST(partitions_for_replacement)

THEN
DELETE
WHEN NOT MATCHED BY TARGET THEN
INSERT (`id`,`field1`) VALUES (`id`,`field1`);
END;

DROP TABLE IF EXISTS `staging_table_temp_test_uuid`;
10 changes: 5 additions & 5 deletions cli/api/goldens/on_schema_change_ignore.sql
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
merge `project-id.dataset-id.incremental_on_schema_change` T
merge `project-id.dataset-id.incremental_on_schema_change` DATAFORM_DEST
using (select 1 as id, 'a' as field1, 'new' as field2
) S
on T.id = S.id
) DATAFORM_SOURCE
on DATAFORM_DEST.id = DATAFORM_SOURCE.id

when matched then
update set `id` = S.id,`field1` = S.field1
update set `id` = DATAFORM_SOURCE.id,`field1` = DATAFORM_SOURCE.field1
when not matched then
insert (`id`,`field1`) values (`id`,`field1`)
10 changes: 5 additions & 5 deletions cli/api/goldens/on_schema_change_synchronize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,12 @@ EXCEPTION WHEN ERROR THEN
END;
DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`
;
merge `project-id.dataset-id.incremental_on_schema_change` T
merge `project-id.dataset-id.incremental_on_schema_change` DATAFORM_DEST
using (select 1 as id, 'a' as field1, 'new' as field2
) S
on T.id = S.id
) DATAFORM_SOURCE
on DATAFORM_DEST.id = DATAFORM_SOURCE.id

when matched then
update set `id` = S.id,`field1` = S.field1
update set `id` = DATAFORM_SOURCE.id,`field1` = DATAFORM_SOURCE.field1
when not matched then
insert (`id`,`field1`) values (`id`,`field1`)
Loading
Loading