Create an Incremental PySpark Transform
This guide shows you how to build an Incremental PySpark Transform that processes only new or changed data since the last run, significantly improving performance and resource usage for large datasets.
PySpark is Apache Spark's Python API that enables distributed data processing with Python, allowing you to work with large datasets across a cluster.
Note that PySpark is only available to Ascend Instances running on Databricks. Check out our Quickstart to set up a Databricks Instance
Prerequisites​
- Ascend Flow
Create a Transform​
You can create a Transform in two ways: through the form UI or directly in the Files panel.
- Using the Component Form
- Using the Files Panel
- Double-click the Flow where you want to add your Transform
- Right-click on an existing component (typically a Read component or another Transform) that will provide input data
- Select Create Downstream → Transform
- Complete the form with these details:
- Select your Flow
- Enter a descriptive name for your Transform (e.g.,
sales_aggregation
) - Choose the appropriate file type for your Transform logic
- Open the files panel in the top left corner
- Navigate to and select your desired Flow
- Right-click on the components directory and choose New file
- Name your file with a descriptive name that reflects its purpose (e.g.,
sales_aggregation
) - Choose the appropriate file extension based on your Transform type:
.py
for Python Transforms.sql
for SQL Transforms
Create your Incremental PySpark Transform​
Follow these steps to create an Incremental PySpark Transform:
-
Import required packages:
- Ascend resources (
pyspark
,ref
) - PySpark objects (
DataFrame
,SparkSession
,functions
)
- Ascend resources (
-
Apply the
@pyspark()
decorator with incremental settings:- Specify your
inputs
using refs - Set
materialized="incremental"
to enable incremental processing - Choose an
incremental_strategy
: either "append" or "merge"- For merge strategy, specify
unique_key
andmerge_update_columns
- For merge strategy, specify
- Specify your
This example uses the merge_update_columns
strategy, where you specify the subset of columns you want to update.
This example uses the merge_exclude_columns
strategy, where you update all columns except for the ones specified.
- Use append when you want to add new records without changing existing ones
- Use merge when you need to both add new records and update existing ones based on a unique key
- Define your transform function:
- Use
context.is_incremental
to check if this is an Incremental run - For incremental runs, query the existing data to determine what's new
- Filter the input data to include only new or changed records
- Process and return the filtered DataFrame
- Use
Examples​
- Merge strategy
- Append strategy
Use the merge strategy when you need to both add new records and update existing ones:
import pyspark.sql.functions as F
from ascend.resources import pyspark, ref
from pyspark.sql import DataFrame, SparkSession
@pyspark(
inputs=[ref("incrementing_data")],
materialized="incremental",
incremental_strategy="merge",
unique_key="key",
merge_update_columns=["string", "ts"],
)
def incremental_transform_pyspark(spark: SparkSession, incrementing_data: DataFrame, context):
"""
Incremental PySpark transform that merges on 'key' updating 'string' and 'ts'.
Filters the data to only new records based on max ts.
"""
df = incrementing_data
if context.is_incremental:
# get max ts from existing output table
row = spark.sql(f"SELECT max(ts) FROM {context.component.name}").collect()[0]
max_ts = row[0]
df = df.filter(F.col("ts") > max_ts)
return df
Use the append strategy when you only need to add new records:
import pyspark.sql.functions as F
from ascend.resources import pyspark, ref
from pyspark.sql import DataFrame, SparkSession
@pyspark(
inputs=[ref("incrementing_data")],
materialized="incremental",
incremental_strategy="append",
)
def incremental_transform_pyspark_append(spark: SparkSession, incrementing_data: DataFrame, context):
"""
Incremental PySpark append transform that appends only new records (ts > max ts)
"""
df = incrementing_data
if context.is_incremental:
# get max ts from existing output table
row = spark.sql(f"SELECT max(ts) FROM {context.component.name}").collect()[0]
max_ts = row[0]
df = df.filter(F.col("ts") > max_ts)
return df
Check out our PySpark Transform reference guide for complete parameter options, advanced configurations, and additional examples.
🎉 Congratulations! You've successfully created an Incremental PySpark Transform in Ascend.