Skip to main content

Schema change

This guide shows you how to implement schema change strategies in your Ascend pipelines to handle evolving data schemas gracefully and maintain robust, resilient Flows.

Refer to our concept guide to choose the optimal schema change strategy for your use case.

Full refresh​

To perform a full refresh on an entire Flow:

  1. Navigate to the Build Info panel and click Run Flow build info
  2. In the Run Flow dialog, scroll down to the Advanced Actions section and check the box for the Full Refresh option
  3. Click Run to execute the entire Flow in full-refresh mode full refresh

Examples​

This section demonstrates how to specify a schema change strategy in different types of Components. By configuring the on_schema_change parameter correctly, you can ensure your data pipelines remain resilient when source schemas evolve.

Incremental Python Read​

In this example, an Incremental Read Component specifies the "sync_all_columns" schema change strategy in the @read decorator:

read_metabook.py
import polars as pl
import pyarrow as pa
from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.resources import read


@read(
strategy="incremental",
incremental_strategy="merge",
unique_key="id",
on_schema_change="sync_all_columns",
)
def read_metabook(context: ComponentExecutionContext) -> pa.Table:
df = pl.read_parquet("gs://ascend-io-gcs-public/ottos-expeditions/lakev0/generated/events/metabook.parquet/year=*/month=*/day=*/*.parquet")
current_data = context.current_data()
if current_data is not None:
current_data = current_data.to_polars()
max_ts = current_data["timestamp"].max()
log(f"Reading data after {max_ts}")
df = df.filter(df["timestamp"] > max_ts)
else:
log("No current data found, reading all data")

log(f"Returning {df.height} rows")
return df.to_arrow()

Local files Read Component​

This example demonstrates how to configure a local file system Read Component with the ignore schema change strategy:

read_local_files_ignore.yaml
component:
read:
connection: local_fs
local_file:
path: rc_schema_evolution/
include:
- regex: .*\.csv
parser:
csv:
has_header: true
strategy:
partitioned:
enable_substitution_by_partition_name: false
on_schema_change: ignore

Incremental MySQL Read Component​

This example shows an incremental MySQL Read Component configured with the sync_all_columns strategy and merge-based incremental loading:

incremental_mysql_sync.yaml
component:
skip: false
read:
connection: mysql_db
mysql:
table:
name: mysql_incremental_schema_evolution_test
strategy:
on_schema_change: sync_all_columns
replication:
incremental:
column_name: updated_at
incremental:
merge:
unique_key: id
deletion_column: deleted_at
data_plane:
databricks:
table_properties:
delta.columnMapping.mode: name

Incremental Python Transform​

The following Python Transform Component uses the on_schema_change parameter to specify how to handle schema changes when running incrementally:

incremental_transform.py
from ascend.resources import ref, transform


@transform(
inputs=[ref("schema_shifting_data")],
materialized="incremental",
incremental_strategy="merge",
unique_key="key",
merge_update_columns=["string", "ts"],
on_schema_change="sync_all_columns",
)
def incremental_transform_schema_evol_python_sync(schema_shifting_data, context):
def _n(x):
return x if "string" in schema_shifting_data else x.upper()

output = schema_shifting_data
if context.is_incremental:
current_data = context.current_data()
output = output[output[_n("ts")] > current_data[_n("ts")].max()]

return output

Incremental SQL Transform​

This SQL Transform example shows how to configure schema change handling in SQL using the config block:

{{
config(
materialized="incremental",
incremental_strategy="merge",
unique_key="key",
merge_update_columns=["string", "ts"],
on_schema_change="sync_all_columns", # Other options: append_new_columns, ignore, fail
)
}}
SELECT * FROM {{ ref("schema_shifting_data") }}

{% if is_incremental() %}
WHERE ts > (SELECT ts FROM {{ this }} ORDER BY ts DESC LIMIT 1)
{% endif %}

Summary​

StrategyWhen to UseDescription
sync_all_columnsEnsure target tables always reflect current source structureSynchronizes target schema to exactly match source, adding new columns and removing obsolete ones
append_new_columnsPreserve historical columns while adding new onesAdds new columns from source but keeps all existing target columns unchanged
ignoreMaintain existing schema without modificationsNo schema changes are made; uses existing target schema regardless of source changes
failNeed strict control over schema changesRaises an error on any schema mismatch, forcing manual intervention
full-refreshMaking significant schema changes to ensure data consistencyRebuilds entire dataset from scratch with updated schema

Best Practices​

  • Test schema change strategies in development environments before applying to production
  • Document your schema change strategy choices for each Component to aid in maintenance