Skip to main content
Version: 3.0.0

Create an Incremental Python Transform

An Incremental Python Transform Component allows you to process and update data incrementally, rather than reprocessing the entire dataset each time the flow is run. This approach is particularly useful when dealing with large datasets or when you need to efficiently handle frequent updates to your data.

This guide will walk you through the process of creating an Incremental Python Transform in Ascend.

Prerequisites

  • An Ascend Project
  • A Workspace attached to the Project
  • A Read Connector

Adding the Transform to the Flow

Navigate to your Workspace from the Homepage. Open up the Flow you want to add the Transform to.

Right click on the components directory and select New File.

Name the file and give it a .py extension, e.g. incremental_transform.py.

your_project_name
├── ascend_project.yaml
├── connections
├── flows
│ └── foo_flow
│ └── components
│ ├── incrementing_read_connector.yaml
│ └── (+ New File) incremental_transform.py
├── profiles
└── vaults

Building the Transform

When creating an incremental python transform, you must specify the following parameters in the decorator:

  • inputs: A list of upstream components that are inputs to this transform.
  • materialized: "incremental"
  • incremental_strategy: "merge"
  • unique_key: A comma separated string of columns names that define the primary key for the table.
flows/foo_flow/components/incremental_transform.py
from ibis import ir

from ascend.application.context import ComponentExecutionContext
from ascend.resources import ref, transform

@transform(
inputs=[ref("incrementing_read_connector")],
materialized="incremental",
incremental_strategy="merge",
unique_key="key"
)
def incremental_transform_python(incrementing_read_connector: ir.Table, context: ComponentExecutionContext):
output = incrementing_read_connector
# Add your transform logic here
return output

The records returned by this transform will then be merged with the results of previous runs of this transform as per incremental_strategy.

Update specific columns

You can optionally specify a list of columns to either include or exclude when updating a row. By default, when a row is updated, all columns are updated.

  • merge_update_columns: When a row is updated, only these columns will be updated. Mutually exclusive with merge_exclude_columns.
  • merge_exclude_columns: When a row is updated, all columns except these will be updated. Mutually exclusive with merge_update_columns.
flows/foo_flow/components/incremental_transform.py
from ibis import ir

from ascend.application.context import ComponentExecutionContext
from ascend.resources import ref, transform

@transform(
inputs=[ref("incrementing_read_connector")],
materialized="incremental",
incremental_strategy="merge",
unique_key="key",
merge_update_columns=["string", "ts"]
)
def incremental_transform_python(incrementing_read_connector: ir.Table, context: ComponentExecutionContext):
output = incrementing_read_connector
# in this example, the incrementing_read_connector component has columns: key, string, ts
# Add your transform logic here
return output

In this example, when an existing row is updated with new data, only the string and ts columns will be updated.