Schema change
This guide shows you how to implement schema change strategies in your Ascend pipelines to handle evolving data schemas gracefully and maintain robust, resilient Flows.
Refer to our concept guide to choose the optimal schema change strategy for your use case.
Full refresh​
To perform a full refresh on an entire Flow:
- Navigate to the Build Info panel and click Run Flow
- In the Run Flow dialog, scroll down to the Advanced Actions section and check the box for the Full Refresh option
- Click Run to execute the entire Flow in full-refresh mode
Examples​
This section demonstrates how to specify a schema change strategy in different types of Components. By configuring the on_schema_change
parameter correctly, you can ensure your data pipelines remain resilient when source schemas evolve.
Incremental Python Read​
In this example, an Incremental Read Component specifies the "sync_all_columns" schema change strategy in the @read
decorator:
import polars as pl
import pyarrow as pa
from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.resources import read
@read(
strategy="incremental",
incremental_strategy="merge",
unique_key="id",
on_schema_change="sync_all_columns",
)
def read_metabook(context: ComponentExecutionContext) -> pa.Table:
df = pl.read_parquet("gs://ascend-io-gcs-public/ottos-expeditions/lakev0/generated/events/metabook.parquet/year=*/month=*/day=*/*.parquet")
current_data = context.current_data()
if current_data is not None:
current_data = current_data.to_polars()
max_ts = current_data["timestamp"].max()
log(f"Reading data after {max_ts}")
df = df.filter(df["timestamp"] > max_ts)
else:
log("No current data found, reading all data")
log(f"Returning {df.height} rows")
return df.to_arrow()
Local files Read Component​
This example demonstrates how to configure a local file system Read Component with the ignore schema change strategy:
component:
read:
connection: local_fs
local_file:
path: rc_schema_evolution/
include:
- regex: .*\.csv
parser:
csv:
has_header: true
strategy:
partitioned:
enable_substitution_by_partition_name: false
on_schema_change: ignore
Incremental MySQL Read Component​
This example shows an incremental MySQL Read Component configured with the sync_all_columns strategy and merge-based incremental loading:
component:
skip: false
read:
connection: mysql_db
mysql:
table:
name: mysql_incremental_schema_evolution_test
strategy:
on_schema_change: sync_all_columns
replication:
incremental:
column_name: updated_at
incremental:
merge:
unique_key: id
deletion_column: deleted_at
data_plane:
databricks:
table_properties:
delta.columnMapping.mode: name
Incremental Python Transform​
The following Python Transform Component uses the on_schema_change
parameter to specify how to handle schema changes when running incrementally:
from ascend.resources import ref, transform
@transform(
inputs=[ref("schema_shifting_data")],
materialized="incremental",
incremental_strategy="merge",
unique_key="key",
merge_update_columns=["string", "ts"],
on_schema_change="sync_all_columns",
)
def incremental_transform_schema_evol_python_sync(schema_shifting_data, context):
def _n(x):
return x if "string" in schema_shifting_data else x.upper()
output = schema_shifting_data
if context.is_incremental:
current_data = context.current_data()
output = output[output[_n("ts")] > current_data[_n("ts")].max()]
return output
Incremental SQL Transform​
This SQL Transform example shows how to configure schema change handling in SQL using the config
block:
{{
config(
materialized="incremental",
incremental_strategy="merge",
unique_key="key",
merge_update_columns=["string", "ts"],
on_schema_change="sync_all_columns", # Other options: append_new_columns, ignore, fail
)
}}
SELECT * FROM {{ ref("schema_shifting_data") }}
{% if is_incremental() %}
WHERE ts > (SELECT ts FROM {{ this }} ORDER BY ts DESC LIMIT 1)
{% endif %}
Summary​
Strategy | When to Use | Description |
---|---|---|
sync_all_columns | Ensure target tables always reflect current source structure | Synchronizes target schema to exactly match source, adding new columns and removing obsolete ones |
append_new_columns | Preserve historical columns while adding new ones | Adds new columns from source but keeps all existing target columns unchanged |
ignore | Maintain existing schema without modifications | No schema changes are made; uses existing target schema regardless of source changes |
fail | Need strict control over schema changes | Raises an error on any schema mismatch, forcing manual intervention |
full-refresh | Making significant schema changes to ensure data consistency | Rebuilds entire dataset from scratch with updated schema |
Best Practices​
- Test schema change strategies in development environments before applying to production
- Document your schema change strategy choices for each Component to aid in maintenance