Skip to content

Commit

Permalink
Fixed snapshot related bug which is not working with CTE's
Browse files Browse the repository at this point in the history
  • Loading branch information
prdpsvs committed Apr 11, 2024
1 parent f9ef28f commit 57f2aa6
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 61 deletions.
2 changes: 1 addition & 1 deletion dbt/adapters/fabric/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version = "1.8.0"
version = "1.8.1"
7 changes: 7 additions & 0 deletions dbt/adapters/fabric/fabric_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,13 @@ def render_limited(self) -> str:
else:
return f"(select TOP {self.limit} * from {rendered}) _dbt_top_subq"

# TODO: Standardizing quote characters
# def quoted(self, identifier):
# return "[{identifier}]".format(
# quote_char=self.quote_character,
# identifier=identifier,
# )

@available
@classmethod
def render_column_constraint(cls, constraint: ColumnLevelConstraint) -> Optional[str]:
Expand Down
1 change: 1 addition & 0 deletions dbt/include/fabric/macros/adapters/columns.sql
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
{% set tempTableName %}
{{ relation.schema }}.{{ relation.identifier }}_{{ range(1300, 19000) | random }}
{% endset %}
{{ log("Cannot Alter table type, as it is not supported. Using random table as a temp table. - " ~ tempTableName) }}

{% set tempTable %}
CREATE TABLE {{tempTableName}}
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/fabric/macros/adapters/relation.sql
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@
{% set tempTableName %}
{{ relation.include(database=False).identifier.replace("#", "") }}_{{ range(21000, 109000) | random }}
{% endset %}

{{ log("Truncate Statement is not supported, Using random table as a temp table. - " ~ tempTableName) }}
{% call statement('truncate_relation') -%}
CREATE TABLE {{ tempTableName }} AS SELECT * FROM {{ relation }} WHERE 1=2
EXEC('DROP TABLE IF EXISTS {{ relation.include(database=False) }};');
Expand Down
199 changes: 199 additions & 0 deletions dbt/include/fabric/macros/materializations/snapshots/helpers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
{% macro fabric__post_snapshot(staging_relation) %}
-- Clean up the snapshot temp table
{% do drop_relation(staging_relation) %}
{% endmacro %}

--Due to Alter not being supported, have to rely on this for temporarily
{% macro fabric__create_columns(relation, columns) %}
{# default__ macro uses "add column"
TSQL preferes just "add"
#}

{% set columns %}
{% for column in columns %}
, CAST(NULL AS {{column.data_type}}) AS {{column_name}}
{% endfor %}
{% endset %}

{% set tempTableName %}
[{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}_{{ range(1300, 19000) | random }}]
{% endset %}
{{ log("Creating new columns are not supported without dropping a table. Using random table as a temp table. - " ~ tempTableName) }}

{% set tempTable %}
CREATE TABLE {{tempTableName}}
AS SELECT * {{columns}} FROM [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] {{ information_schema_hints() }}
{% endset %}

{% call statement('create_temp_table') -%}
{{ tempTable }}
{%- endcall %}

{% set dropTable %}
DROP TABLE [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}]
{% endset %}

{% call statement('drop_table') -%}
{{ dropTable }}
{%- endcall %}

{% set createTable %}
CREATE TABLE {{ relation }}
AS SELECT * FROM {{tempTableName}} {{ information_schema_hints() }}
{% endset %}

{% call statement('create_Table') -%}
{{ createTable }}
{%- endcall %}

{% set dropTempTable %}
DROP TABLE {{tempTableName}}
{% endset %}

{% call statement('drop_temp_table') -%}
{{ dropTempTable }}
{%- endcall %}
{% endmacro %}

{% macro fabric__get_true_sql() %}
{{ return('1=1') }}
{% endmacro %}


{% macro fabric__build_snapshot_table(strategy, relation) %}

select *,
{{ strategy.scd_id }} as dbt_scd_id,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to
from (
select * from {{ relation }}
) sbq

{% endmacro %}

{% macro fabric__snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) -%}

with snapshot_query as (

select * from {{ temp_snapshot_relation }}

),

snapshotted_data as (

select *,
{{ strategy.unique_key }} as dbt_unique_key

from {{ target_relation }}
where dbt_valid_to is null

),

insertions_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
nullif({{ strategy.updated_at }}, {{ strategy.updated_at }}) as dbt_valid_to,
{{ strategy.scd_id }} as dbt_scd_id

from snapshot_query
),

updates_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key,
{{ strategy.updated_at }} as dbt_updated_at,
{{ strategy.updated_at }} as dbt_valid_from,
{{ strategy.updated_at }} as dbt_valid_to

from snapshot_query
),

{%- if strategy.invalidate_hard_deletes %}

deletes_source_data as (

select
*,
{{ strategy.unique_key }} as dbt_unique_key
from snapshot_query
),
{% endif %}

insertions as (

select
'insert' as dbt_change_type,
source_data.*

from insertions_source_data as source_data
left outer join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where snapshotted_data.dbt_unique_key is null
or (
snapshotted_data.dbt_unique_key is not null
and (
{{ strategy.row_changed }}
)
)

),

updates as (

select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.dbt_scd_id

from updates_source_data as source_data
join snapshotted_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where (
{{ strategy.row_changed }}
)
)

{%- if strategy.invalidate_hard_deletes -%}
,

deletes as (

select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as dbt_valid_from,
{{ snapshot_get_time() }} as dbt_updated_at,
{{ snapshot_get_time() }} as dbt_valid_to,
snapshotted_data.dbt_scd_id

from snapshotted_data
left join deletes_source_data as source_data on snapshotted_data.dbt_unique_key = source_data.dbt_unique_key
where source_data.dbt_unique_key is null
)
{%- endif %}

select * from insertions
union all
select * from updates
{%- if strategy.invalidate_hard_deletes %}
union all
select * from deletes
{%- endif %}

{%- endmacro %}

{% macro build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
{% set temp_relation = make_temp_relation(target_relation) %}
{% set select = fabric__snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
{% call statement('build_snapshot_staging_relation') %}
{{ create_table_as(True, temp_relation, select) }}
{% endcall %}

{% do return(temp_relation) %}
{% endmacro %}
163 changes: 105 additions & 58 deletions dbt/include/fabric/macros/materializations/snapshots/snapshot.sql
Original file line number Diff line number Diff line change
@@ -1,58 +1,105 @@
{% macro fabric__post_snapshot(staging_relation) %}
-- Clean up the snapshot temp table
{% do drop_relation(staging_relation) %}
{% endmacro %}

{% macro fabric__create_columns(relation, columns) %}
{# default__ macro uses "add column"
TSQL preferes just "add"
#}

{% set columns %}
{% for column in columns %}
, CAST(NULL AS {{column.data_type}}) AS {{column_name}}
{% endfor %}
{% endset %}

{% set tempTableName %}
[{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}_{{ range(1300, 19000) | random }}]
{% endset %}

{% set tempTable %}
CREATE TABLE {{tempTableName}}
AS SELECT * {{columns}} FROM [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}] {{ information_schema_hints() }}
{% endset %}

{% call statement('create_temp_table') -%}
{{ tempTable }}
{%- endcall %}

{% set dropTable %}
DROP TABLE [{{relation.database}}].[{{ relation.schema }}].[{{ relation.identifier }}]
{% endset %}

{% call statement('drop_table') -%}
{{ dropTable }}
{%- endcall %}

{% set createTable %}
CREATE TABLE {{ relation }}
AS SELECT * FROM {{tempTableName}} {{ information_schema_hints() }}
{% endset %}

{% call statement('create_Table') -%}
{{ createTable }}
{%- endcall %}

{% set dropTempTable %}
DROP TABLE {{tempTableName}}
{% endset %}

{% call statement('drop_temp_table') -%}
{{ dropTempTable }}
{%- endcall %}
{% endmacro %}

{% macro fabric__get_true_sql() %}
{{ return('1=1') }}
{% endmacro %}
{% materialization snapshot, adapter='fabric' %}

{%- set config = model['config'] -%}
{%- set target_table = model.get('alias', model.get('name')) -%}
{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}
-- grab current tables grants config for comparision later on
{%- set grant_config = config.get('grants') -%}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table,
type='table') -%}

{%- if not target_relation.is_table -%}
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}

{% set temp_snapshot_relation_exists, temp_snapshot_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table+"_snapshot_staging_temp_view",
type='view') -%}

-- Create a temporary view to manage if user SQl uses CTE
{% set temp_snapshot_relation_sql = model['compiled_code'].replace("'", "''") %}
{% call statement('create temp_snapshot_relation') %}
EXEC('DROP VIEW IF EXISTS {{ temp_snapshot_relation.include(database=False) }};');
EXEC('create view {{ temp_snapshot_relation.include(database=False) }} as {{ temp_snapshot_relation_sql }};');
{% endcall %}

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, temp_snapshot_relation) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}
{% set staging_table = build_snapshot_staging_table(strategy, temp_snapshot_relation, target_relation) %}
-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}
{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}
{% do create_columns(target_relation, missing_columns) %}
{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}
{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}

{% set final_sql = snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}

{% endif %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

fabric__drop_relation_script(temp_snapshot_relation)

{% set should_revoke = should_revoke(target_relation_exists, full_refresh_mode=False) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% do persist_docs(target_relation, model) %}

{% if not target_relation_exists %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Loading

0 comments on commit 57f2aa6

Please sign in to comment.