Create an Incremental SQL Transform
An Incremental SQL Transform Component allows you to process and update data incrementally, rather than reprocessing the entire dataset each time the flow is run. This approach is particularly useful when dealing with large datasets or when you need to efficiently handle frequent updates to your data.
This guide will walk you through the process of creating an Incremental SQL Transform in Ascend.
Prerequisites
- An Ascend Project
- A Workspace attached to the Project
- A Read Connector
Adding the Transform to the Flow
Navigate to your workspace from the Homepage. Open up the flow you want to add the transform to.
Right click on the components
directory and select New File.
Name the file and give it a .sql.jinja
extension, e.g. incremental_transform.sql.jinja
.
your_project_name
├── ascend_project.yaml
├── connections
├── flows
│ └── foo_flow
│ └── components
│ ├── incrementing_read_connector.yaml
│ └── (+ New File) incremental_transform.sql.jinja
├── profiles
└── vaults
Building the Transform
When creating an Incremental SQL Transform, you must specify the following parameters in the jinja config above the transform logic:
materialized
: "incremental"incremental_strategy
: "merge"unique_key
: a comma separated string of columns names that define the primary key for the table.
An example transform is shown below:
{{
config(
materialized="incremental",
incremental_strategy="merge",
unique_key="key"
)
}}
SELECT * FROM {{ ref("incrementing_read_connector") }}
Whenever this transform is run, Ascend will take the output of this SQL query and upsert the values into this component's table in the Data Plane.
Update specific columns
You can optionally specify a list of columns to either include or exclude when updating a row. By default, when a row is updated, all columns are updated.
merge_update_columns
: When a row is updated, only these columns will be updated. Mutually exclusive withmerge_exclude_columns
.merge_exclude_columns
: When a row is updated, all columns except these will be updated. Mutually exclusive withmerge_update_columns
.
{{
config(
materialized="incremental",
incremental_strategy="merge",
unique_key="key",
merge_exclude_columns=["record_first_inserted_ts"]
)
}}
SELECT *, CURRENT_TIMESTAMP() AS record_first_inserted_ts FROM {{ ref("incrementing_read_connector") }}
In this example, when an existing row is updated with new data, the record_first_inserted_ts
column will not be updated.