-
Notifications
You must be signed in to change notification settings - Fork 203
Support incremental_strategy parameter and new insert_overwrite strategy #2195
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -86,4 +86,34 @@ suite("ExecutionSql with 'onSchemaChange'", () => { | |
| const expectedSql = fs.readFileSync("cli/api/goldens/on_schema_change_ignore.sql", "utf8"); | ||
| expect(procedureSql).to.equal(expectedSql.trim()); | ||
| }); | ||
|
|
||
| test("generates INSERT_OVERWRITE script for IGNORE strategy", () => { | ||
| const table = { | ||
| ...baseTable, | ||
| incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE, | ||
| bigquery: { | ||
| partitionBy: "DATE(ts)", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we have an example column that is not a function like Date. My concern is that we a re biased toward handling this method. so it its just a normal column name we might not handle it correctly.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added additional test for normal column. |
||
| updatePartitionFilter: "ts >= '2024-01-01'" | ||
| } | ||
| }; | ||
| const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); | ||
| const sql = tasks.build().map(t => t.statement).join("\n;\n"); | ||
| const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_ignore.sql", "utf8"); | ||
| expect(sql).to.equal(expectedSql.trim()); | ||
| }); | ||
|
|
||
| test("generates INSERT_OVERWRITE script for EXTEND strategy", () => { | ||
| const table = { | ||
| ...baseTable, | ||
| incrementalStrategy: dataform.IncrementalStrategy.INSERT_OVERWRITE, | ||
| onSchemaChange: dataform.OnSchemaChange.EXTEND, | ||
| bigquery: { | ||
| partitionBy: "DATE(ts)" | ||
| } | ||
| }; | ||
| const tasks = executionSql.publishTasks(table, { fullRefresh: false }, tableMetadata); | ||
| const sql = tasks.build().map(t => t.statement).join("\n;\n"); | ||
| const expectedSql = fs.readFileSync("cli/api/goldens/insert_overwrite_extend.sql", "utf8"); | ||
| expect(sql).to.equal(expectedSql.trim()); | ||
| }); | ||
| }); | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,99 @@ | ||
| CREATE OR REPLACE PROCEDURE `project-id.dataset-id.df_osc_test_uuid`() | ||
| OPTIONS(strict_mode=false) | ||
| BEGIN | ||
|
|
||
| -- Create empty table to extract schema of new query. | ||
| CREATE OR REPLACE TABLE `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty` AS ( | ||
| SELECT * FROM (select 1 as id, 'a' as field1, 'new' as field2) AS insertions LIMIT 0 | ||
| ); | ||
|
|
||
|
|
||
| -- Compare schemas | ||
| DECLARE dataform_columns ARRAY<STRING>; | ||
| DECLARE temp_table_columns ARRAY<STRUCT<column_name STRING, data_type STRING>>; | ||
| DECLARE columns_added ARRAY<STRUCT<column_name STRING, data_type STRING>>; | ||
| DECLARE columns_removed ARRAY<STRING>; | ||
|
|
||
| SET dataform_columns = ( | ||
| SELECT IFNULL(ARRAY_AGG(DISTINCT column_name), []) | ||
| FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` | ||
| WHERE table_name = 'incremental_on_schema_change' | ||
| ); | ||
|
|
||
| SET temp_table_columns = ( | ||
| SELECT IFNULL(ARRAY_AGG(STRUCT(column_name, data_type)), []) | ||
| FROM `project-id.dataset-id.INFORMATION_SCHEMA.COLUMNS` | ||
| WHERE table_name = 'incremental_on_schema_change_df_temp_test_uuid_empty' | ||
| ); | ||
|
|
||
| SET columns_added = ( | ||
| SELECT IFNULL(ARRAY_AGG(column_info), []) | ||
| FROM UNNEST(temp_table_columns) AS column_info | ||
| WHERE column_info.column_name NOT IN UNNEST(dataform_columns) | ||
| ); | ||
| SET columns_removed = ( | ||
| SELECT IFNULL(ARRAY_AGG(column_name), []) | ||
| FROM UNNEST(dataform_columns) AS column_name | ||
| WHERE column_name NOT IN (SELECT col.column_name FROM UNNEST(temp_table_columns) AS col) | ||
| ); | ||
|
|
||
|
|
||
| -- Apply schema change strategy (EXTEND). | ||
| IF ARRAY_LENGTH(columns_removed) > 0 THEN | ||
| RAISE USING MESSAGE = FORMAT( | ||
| "Column removals are not allowed when on_schema_change = 'EXTEND'. Removed columns: %T", | ||
| columns_removed | ||
| ); | ||
| END IF; | ||
|
|
||
| IF ARRAY_LENGTH(columns_added) > 0 THEN | ||
| EXECUTE IMMEDIATE ( | ||
| "ALTER TABLE `project-id.dataset-id.incremental_on_schema_change` " || | ||
| ( | ||
| SELECT STRING_AGG(FORMAT("ADD COLUMN IF NOT EXISTS %s %s", column_info.column_name, column_info.data_type), ", ") | ||
| FROM UNNEST(columns_added) AS column_info | ||
| ) | ||
| ); | ||
| END IF; | ||
|
|
||
|
|
||
|
|
||
| -- Cleanup temporary tables. | ||
| DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; | ||
|
|
||
| END | ||
| ; | ||
| BEGIN | ||
| CALL `project-id.dataset-id.df_osc_test_uuid`(); | ||
| EXCEPTION WHEN ERROR THEN | ||
| DROP TABLE IF EXISTS `project-id.dataset-id.incremental_on_schema_change_df_temp_test_uuid_empty`; | ||
| DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid`; | ||
| RAISE; | ||
| END; | ||
| DROP PROCEDURE IF EXISTS `project-id.dataset-id.df_osc_test_uuid` | ||
| ; | ||
| CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS ( | ||
| select 1 as id, 'a' as field1, 'new' as field2 | ||
| ); | ||
|
|
||
| BEGIN | ||
| DECLARE partitions_for_replacement DEFAULT ( | ||
| ARRAY( | ||
| SELECT DISTINCT DATE(ts) | ||
| FROM `staging_table_temp_test_uuid` | ||
| WHERE DATE(ts) IS NOT NULL | ||
| ) | ||
| ); | ||
|
|
||
| MERGE `project-id.dataset-id.incremental_on_schema_change` DATAFORM_DEST | ||
| USING `staging_table_temp_test_uuid` DATAFORM_SOURCE | ||
| ON FALSE | ||
| WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) | ||
|
|
||
| THEN | ||
| DELETE | ||
| WHEN NOT MATCHED BY TARGET THEN | ||
| INSERT (`id`,`field1`) VALUES (`id`,`field1`); | ||
| END; | ||
|
|
||
| DROP TABLE IF EXISTS `staging_table_temp_test_uuid` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS ( | ||
| select 1 as id, 'a' as field1, 'new' as field2 | ||
| ); | ||
|
|
||
| BEGIN | ||
| DECLARE partitions_for_replacement DEFAULT ( | ||
| ARRAY( | ||
| SELECT DISTINCT DATE(ts) | ||
| FROM `staging_table_temp_test_uuid` | ||
| WHERE DATE(ts) IS NOT NULL | ||
| ) | ||
| ); | ||
|
|
||
| MERGE `project-id.dataset-id.incremental_on_schema_change` DATAFORM_DEST | ||
| USING `staging_table_temp_test_uuid` DATAFORM_SOURCE | ||
| ON FALSE | ||
| WHEN NOT MATCHED BY SOURCE AND DATE(ts) IN UNNEST(partitions_for_replacement) and DATAFORM_DEST.ts >= '2024-01-01' | ||
|
|
||
| THEN | ||
| DELETE | ||
| WHEN NOT MATCHED BY TARGET THEN | ||
| INSERT (`id`,`field1`) VALUES (`id`,`field1`); | ||
| END; | ||
|
|
||
| DROP TABLE IF EXISTS `staging_table_temp_test_uuid` |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,25 @@ | ||
| CREATE OR REPLACE TEMP TABLE `staging_table_temp_test_uuid` AS ( | ||
| select 1 as id, 'a' as field1, 'new' as field2 | ||
| ); | ||
|
|
||
| BEGIN | ||
| DECLARE partitions_for_replacement DEFAULT ( | ||
| ARRAY( | ||
| SELECT DISTINCT date_col | ||
| FROM `staging_table_temp_test_uuid` | ||
| WHERE date_col IS NOT NULL | ||
| ) | ||
| ); | ||
|
|
||
| MERGE `project-id.dataset-id.incremental_on_schema_change` DATAFORM_DEST | ||
| USING `staging_table_temp_test_uuid` DATAFORM_SOURCE | ||
| ON FALSE | ||
| WHEN NOT MATCHED BY SOURCE AND date_col IN UNNEST(partitions_for_replacement) | ||
|
|
||
| THEN | ||
| DELETE | ||
| WHEN NOT MATCHED BY TARGET THEN | ||
| INSERT (`id`,`field1`) VALUES (`id`,`field1`); | ||
| END; | ||
|
|
||
| DROP TABLE IF EXISTS `staging_table_temp_test_uuid`; |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,9 +1,9 @@ | ||
| merge `project-id.dataset-id.incremental_on_schema_change` T | ||
| merge `project-id.dataset-id.incremental_on_schema_change` DATAFORM_DEST | ||
| using (select 1 as id, 'a' as field1, 'new' as field2 | ||
| ) S | ||
| on T.id = S.id | ||
| ) DATAFORM_SOURCE | ||
| on DATAFORM_DEST.id = DATAFORM_SOURCE.id | ||
|
|
||
| when matched then | ||
| update set `id` = S.id,`field1` = S.field1 | ||
| update set `id` = DATAFORM_SOURCE.id,`field1` = DATAFORM_SOURCE.field1 | ||
| when not matched then | ||
| insert (`id`,`field1`) values (`id`,`field1`) |
Uh oh!
There was an error while loading. Please reload this page.