Skip to main content
Version: 3.0.0

Create an Incremental Snowpark Transform

This guide shows you how to build an Incremental Snowpark Transform that processes only new or changed data since the last run, significantly improving performance and resource usage for large datasets.

Snowpark is Snowflake's developer framework that enables processing data directly where it's stored using familiar programming languages like Python.

Snowflake only

Note that Snowpark is only available to Ascend Instances running on Snowflake. Check out our Quickstart to set up a Snowflake instance.

Prerequisites​

Create a Transform​

You can create a Transform in two ways: through the form UI or directly in the Files panel.

  1. Double-click the Flow where you want to add your Transform
  2. Right-click on an existing component (typically a Read component or another Transform) that will provide input data
  3. Select Create Downstream → Transform Creating a Transform from the context menu
  4. Complete the form with these details:
    • Select your Flow
    • Enter a descriptive name for your Transform (e.g., sales_aggregation)
    • Choose the appropriate file type for your Transform logic Transform creation form

Create your Incremental Snowpark Transform​

Follow these steps to create an Incremental Snowpark Transform:

  1. Import required packages:

    • Ascend resources (Snowpark, ref)
    • Snowpark objects (DataFrame, Session, functions, SnowparkTransformExecutionContext)
  2. Apply the @snowpark() decorator with incremental settings:

    • Specify your inputs using refs
    • Set materialized="incremental" to enable incremental processing
    • Choose an incremental_strategy: either "append" or "merge"
      • For merge strategy, specify unique_key and merge_update_columns
Choosing an Incremental strategy
  • Use append when you want to add new records without changing existing ones
  • Use merge when you need to both add new records and update existing ones based on a unique key
  1. Define your transform function:
    • Use context.is_incremental to check if this is an Incremental run
    • For incremental runs, query the existing data to determine what's new
    • Filter the input data to include only new or changed records
    • Process and return the filtered DataFrame

Examples​

Use the merge strategy when you need to both add new records and update existing ones:

incremental-merge.py
from snowflake.snowpark import DataFrame as SnowparkDataFrame
from snowflake.snowpark import Session as SnowparkSession
from ascend.resources import ref, snowpark


@snowpark(
inputs=[ref("incrementing_data")],
materialized="incremental",
incremental_strategy="merge",
unique_key="key",
merge_update_columns=["string", "ts"],
)
def incremental_transform_snowpark(incrementing_data: SnowparkDataFrame, context) -> SnowparkDataFrame:
assert isinstance(context.session, SnowparkSession)
# inputs should be snowpark dataframes
assert isinstance(incrementing_data, SnowparkDataFrame)

"""
Incremental Snowpark transform that merges on 'key' updating 'string' and 'ts'.
Filters the data to only new records based on max ts.
"""
df = incrementing_data
if context.is_incremental:
# get max ts from existing output table
row = context.session.sql(f"SELECT max(ts) FROM {context.component.name}").collect()[0]
max_ts = row[0]
df = df.filter(df["ts"] > max_ts)
return df

Check out our reference guide for complete parameter options, advanced configurations, and additional examples.

🎉 Congratulations! You've successfully created an Incremental Snowpark Transform in Ascend.