Skip to main content

Incremental Python Transform

In this guide, we'll build an Incremental Python Transform that processes only new or changed data to improve pipeline performance.

Check out our concept guides to learn about incremental processing strategies and supported input formats.

Prerequisites

Create a Transform

You can create a Transform in two ways: through the form UI or directly in the Files panel.

  1. Double-click the Flow where you want to add your Transform
  2. Right-click on an existing component (typically a Read component or another Transform) that will provide input data
  3. Select Create DownstreamTransform Creating a Transform from the context menu
  4. Complete the form with these details:
    • Select your Flow
    • Enter a descriptive name for your Transform (e.g., sales_aggregation)
    • Choose the appropriate file type for your Transform logic Transform creation form

Create your Python Transform

Structure your Python Transform following this pattern:

  1. Import necessary packages: Import Ascend resources like transform and ref from the ascend.resources module

  2. Apply the @transform() decorator: Configure it with:

    • inputs: List of input datasets using ref()
    • materialized: Set to "incremental" to enable incremental processing
    • incremental_strategy: Use "merge" for updating existing records
    • unique_key: Specify a column that uniquely identifies records
    • merge_update_columns: List columns that should be updated during merges
  3. Define your transform function: Create a function that takes input data and context parameters

  4. Use incremental context: Check context.is_incremental and use context.current_data() to access existing data

  5. Filter for new data: Use incremental state to process only new or changed records

    • Compare current data with previous state using timestamps or IDs
    # Example of filtering for new data
    if context.is_incremental:
    current_data = context.current_data()
    # Only process records with newer timestamps
    output = output[output["timestamp"] > current_data["timestamp"].max()]
  6. Return the processed data: Send your transformed data back to Ascend

    • Return a properly structured dataframe or table object

Example

This example demonstrates how to create an incremental transform that only processes new or updated records:

incremental_transform.py
"""
Example of an Incremental Python Transform Component.

This file demonstrates how to create a transform that processes only new or
changed data to improve pipeline performance using the incremental strategy.
"""

from ascend.resources import ref, transform


@transform(
inputs=[ref("incrementing_data")],
materialized="incremental",
incremental_strategy="merge",
unique_key="key",

🎉 Congratulations! You've successfully created an Incremental Python Transform in Ascend.