Create an Incremental Python Transform
An Incremental Python Transform Component allows you to process and update data incrementally, rather than reprocessing the entire dataset each time the flow is run. This approach is particularly useful when dealing with large datasets or when you need to efficiently handle frequent updates to your data.
This guide will walk you through the process of creating an Incremental Python Transform in Ascend.
Prerequisites
- An Ascend Project
- A Workspace attached to the Project
- A Read Connector
Adding the Transform to the Flow
Navigate to your Workspace from the Homepage. Open up the Flow you want to add the Transform to.
Right click on the components
directory and select New File.
Name the file and give it a .py
extension, e.g. incremental_transform.py
.
your_project_name
├── ascend_project.yaml
├── connections
├── flows
│ └── foo_flow
│ └── components
│ ├── incrementing_read_connector.yaml
│ └── (+ New File) incremental_transform.py
├── profiles
└── vaults
Building the Transform
When creating an incremental python transform, you must specify the following parameters in the decorator:
inputs
: A list of upstream components that are inputs to this transform.materialized
: "incremental"incremental_strategy
: "merge"unique_key
: A comma separated string of columns names that define the primary key for the table.
from ibis import ir
from ascend.application.context import ComponentExecutionContext
from ascend.resources import ref, transform
@transform(
inputs=[ref("incrementing_read_connector")],
materialized="incremental",
incremental_strategy="merge",
unique_key="key"
)
def incremental_transform_python(incrementing_read_connector: ir.Table, context: ComponentExecutionContext):
output = incrementing_read_connector
# Add your transform logic here
return output
The records returned by this transform will then be merged with the results of previous runs of this transform as per incremental_strategy
.
Update specific columns
You can optionally specify a list of columns to either include or exclude when updating a row. By default, when a row is updated, all columns are updated.
merge_update_columns
: When a row is updated, only these columns will be updated. Mutually exclusive withmerge_exclude_columns
.merge_exclude_columns
: When a row is updated, all columns except these will be updated. Mutually exclusive withmerge_update_columns
.
from ibis import ir
from ascend.application.context import ComponentExecutionContext
from ascend.resources import ref, transform
@transform(
inputs=[ref("incrementing_read_connector")],
materialized="incremental",
incremental_strategy="merge",
unique_key="key",
merge_update_columns=["string", "ts"]
)
def incremental_transform_python(incrementing_read_connector: ir.Table, context: ComponentExecutionContext):
output = incrementing_read_connector
# in this example, the incrementing_read_connector component has columns: key, string, ts
# Add your transform logic here
return output
In this example, when an existing row is updated with new data, only the string
and ts
columns will be updated.