diff --git a/docs/json_schemas/contract/components/field_error_type.schema.json b/docs/json_schemas/contract/components/field_error_type.schema.json new file mode 100644 index 0000000..694948a --- /dev/null +++ b/docs/json_schemas/contract/components/field_error_type.schema.json @@ -0,0 +1,21 @@ +{ + "$schema": "https://json-schema.org/draft-07/schema", + "$id": "data-ingest:contract/components/field_error_type.schema.json", + "title": "field_error_detail", + "description": "The error type for a field when a validation error is raised during the data contract phase", + "type": "object", + "properties": { + "error_type": { + "description": "The type of error the details are for", + "type": "string", + "enum": [ + "Blank", + "Bad value", + "Wrong format" + ], + "additionalProperties": { + "$ref": "field_error_detail.schema.json" + } + } + } +} \ No newline at end of file diff --git a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py index a261f7b..843ee40 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py +++ b/src/dve/core_engine/backends/implementations/duckdb/duckdb_helpers.py @@ -273,3 +273,16 @@ def get_all_registered_udfs(connection: DuckDBPyConnection) -> set[str]: """ connection.sql("CREATE TEMP TABLE IF NOT EXISTS dve_udfs (function_name VARCHAR)") return {rw[0] for rw in connection.sql("SELECT * FROM dve_udfs").fetchall()} + + +def duckdb_rel_to_dictionaries( + entity: DuckDBPyRelation, batch_size=1000 +) -> Iterator[dict[str, Any]]: + """Iterator converting DuckDBPyRelation to lists of dictionaries. + Avoids issues where dates are getting converted to datetimes using polars as intermediate.""" + # TODO - look into float conversion - floats that can't be stored exactly in binary + # TODO - are given to nearest approximation. Tried Decimal, causes issues in arrays + # TODO - with templating (as in complex fields, repr used when str called in jinja templating). + cols: tuple[str] = tuple(entity.columns) # type: ignore + while rows := entity.fetchmany(batch_size): + yield from (dict(zip(cols, rw)) for rw in rows) diff --git a/src/dve/core_engine/backends/implementations/duckdb/rules.py b/src/dve/core_engine/backends/implementations/duckdb/rules.py index b14700d..e556c6b 100644 --- a/src/dve/core_engine/backends/implementations/duckdb/rules.py +++ b/src/dve/core_engine/backends/implementations/duckdb/rules.py @@ -23,6 +23,7 @@ from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( DDBStruct, duckdb_read_parquet, + duckdb_rel_to_dictionaries, duckdb_write_parquet, get_all_registered_udfs, get_duckdb_type_from_annotation, @@ -511,7 +512,7 @@ def notify(self, entities: DuckDBEntities, *, config: Notification) -> Messages: if config.excluded_columns: matched = matched.select(StarExpression(exclude=config.excluded_columns)) - for record in matched.df().to_dict(orient="records"): + for record in duckdb_rel_to_dictionaries(matched): # NOTE: only templates using values directly accessible in record - nothing nested # more complex extraction done in reporting module messages.append( diff --git a/tests/features/movies.feature b/tests/features/movies.feature index d1dbca4..d737574 100644 --- a/tests/features/movies.feature +++ b/tests/features/movies.feature @@ -31,7 +31,7 @@ Feature: Pipeline tests using the movies dataset Then The rules restrict "movies" to 4 qualifying records And there are errors with the following details and associated error_count from the business_rules phase | ErrorCode | ErrorMessage | error_count | - | LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 | + | LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 | | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | And the latest audit record for the submission is marked with processing status error_report When I run the error report phase @@ -67,7 +67,7 @@ Feature: Pipeline tests using the movies dataset Then The rules restrict "movies" to 4 qualifying records And there are errors with the following details and associated error_count from the business_rules phase | ErrorCode | ErrorMessage | error_count | - | LIMITED_RATINGS | Movie has too few ratings ([6.1]) | 1 | + | LIMITED_RATINGS | Movie has too few ratings ([6.5]) | 1 | | RUBBISH_SEQUEL | The movie The Greatest Movie Ever has a rubbish sequel | 1 | And the latest audit record for the submission is marked with processing status error_report When I run the error report phase diff --git a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py index 76d5d07..5c39e36 100644 --- a/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py +++ b/tests/test_core_engine/test_backends/test_implementations/test_duckdb/test_duckdb_helpers.py @@ -1,13 +1,18 @@ """Test Duck DB helpers""" + +import datetime import tempfile from pathlib import Path +from typing import Any import pytest import pyspark.sql.types as pst from duckdb import DuckDBPyRelation, DuckDBPyConnection from pyspark.sql import Row, SparkSession -from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import _ddb_read_parquet +from dve.core_engine.backends.implementations.duckdb.duckdb_helpers import ( + _ddb_read_parquet, + duckdb_rel_to_dictionaries) class TempConnection: @@ -15,6 +20,7 @@ class TempConnection: Full object would be a DataContract object but this simplified down to meet min requirements of the test. """ + def __init__(self, connection: DuckDBPyConnection) -> None: self._connection = connection @@ -25,7 +31,7 @@ def __init__(self, connection: DuckDBPyConnection) -> None: ("movie_ratings"), ("movie_ratings/"), ("file://movie_ratings/"), - ] + ], ) def test__ddb_read_parquet_with_hive_format( spark: SparkSession, temp_ddb_conn: DuckDBPyConnection, outpath: str @@ -38,11 +44,13 @@ def test__ddb_read_parquet_with_hive_format( Row(movie_name="Hot Fuzz", avg_user_rating=7.7, avg_critic_rating=6.5), Row(movie_name="Nemo", avg_user_rating=8.8, avg_critic_rating=7.6), ], - pst.StructType([ - pst.StructField("movie_name", pst.StringType()), - pst.StructField("avg_user_rating", pst.FloatType()), - pst.StructField("avg_critic_rating", pst.FloatType()), - ]) + pst.StructType( + [ + pst.StructField("movie_name", pst.StringType()), + pst.StructField("avg_user_rating", pst.FloatType()), + pst.StructField("avg_critic_rating", pst.FloatType()), + ] + ), ) out_path = str(Path(temp_dir_path, outpath)) test_data_df.coalesce(1).write.parquet(out_path) @@ -51,3 +59,39 @@ def test__ddb_read_parquet_with_hive_format( assert isinstance(ddby_relation, DuckDBPyRelation) assert ddby_relation.count("*").fetchone()[0] == 2 # type: ignore + + +@pytest.mark.parametrize( + "data", + ( + + [ + { + "str_field": "hi", + "int_field": 5, + "array_float_field": [6.5, 7.25], + "date_field": datetime.date(2021, 5, 3), + "timestamp_field": datetime.datetime(2022, 6, 7, 1, 2, 3), + }, + { + "str_field": "bye", + "int_field": 3, + "array_float_field": None, + "date_field": datetime.date(2021, 8, 11), + "timestamp_field": datetime.datetime(2022, 4, 3, 1, 2, 3), + }, + ], + + ), +) +def test_duckdb_rel_to_dictionaries(temp_ddb_conn: DuckDBPyConnection, + data: list[dict[str, Any]]): + _, con = temp_ddb_conn + test_rel = con.query("select dta.* from (select unnest($data) as dta)", + params={"data": data}) + res: list = [] + for chunk in duckdb_rel_to_dictionaries(test_rel, 1): + res.append(chunk) + + assert res == data + diff --git a/tests/testdata/movies/movies.json b/tests/testdata/movies/movies.json index 87f1149..afa606d 100644 --- a/tests/testdata/movies/movies.json +++ b/tests/testdata/movies/movies.json @@ -36,7 +36,7 @@ "year": 2020, "genre": ["Fantasy", "Family"], "duration_minutes": 110, - "ratings": [6.1], + "ratings": [6.5], "cast": [ { "name": "R. Williams", "role": "Cat", "date_joined": "2016-05-06" }, { "name": "T. Brown", "role": "Dog", "date_joined": "2016-05-07" }