Build a templatized data pipeline
This guide shows you how to create reusable data transformations using Ascend's Simple Application. The Simple Application is a pre-built Ascend solution that enables you to define parameterized Transforms with Jinja templating, creating flexible and maintainable data pipelines.
Want to learn more about Ascend-native Applications? Check out this concept guide.
Build a Simple Application​
The Simple Application uses SQL or Python templates (or both) with Jinja to create parameterized Components. This guide demonstrates both approaches.
- SQL templates
- SQL and Python templates
This implementation uses SQL templates to build a straightforward filtering Application.
Set up your project structure​
Create a template directory structure in your project to organize your SQL templates:
my_project/
├── templates/
│ └── flows/
│ └── basic_filter/
│ └── filter.sql
Create your SQL template​
Add a .sql
file with placeholders for parameters:
SELECT *
FROM {{ ref(input_table) }}
WHERE {{ filter_column }} > {{ filter_value }}
LIMIT {{ row_limit }}
Configure the Application​
Create a YAML configuration file that references the SQL template and defines parameter values:
component:
application:
application_id: simple # Simple Application
config:
template_path: templates/flows/basic_filter # Directory with your templates
parameters:
input_table: YOUR_INPUT_TABLE
filter_column: YOUR_COLUMN_NAME
filter_value: 100
row_limit: 1000
When Ascend processes this configuration, it generates a SQL Transform Component that filters your data based on the specified parameters.
This implementation uses both SQL and Python templates to build a data diff Application that identifies differences between two tables.
Set up your project structure​
Create a template directory structure in your project to organize your templates:
my_project/
├── templates/
│ └── flows/
│ └── data_diff/
│ ├── a.sql
│ ├── b.sql
│ ├── diff.py
│ └── flagged.py
Create your templates​
Add each template file with the following content:
First table query template (a.sql
)
This template retrieves data from the first table with any configured filters for comparison:
SELECT
*
FROM
{{ ref(a.get("name"), flow=a.get("flow")) }}
{%- if where %}
WHERE
{{ where }}
{%- endif %}
Second table query template (b.sql
)
This template retrieves data from the second table with any configured filters for comparison:
SELECT
*
FROM
{{ ref(b.get("name"), flow=b.get("flow")) }}
{%- if where %}
WHERE
{{ where }}
{%- endif %}
Data comparison logic (diff.py
)
This template contains the core comparison logic that:
- Dynamically compares schemas of two input tables
- Joins on all string/timestamp columns and selects all columns from both sides
- Raises an error if schemas don't match
import ibis
from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.resources import application_subcomponent_name, application_subcomponent_ref, transform
from ibis import ir
@transform(
name=application_subcomponent_name("diff"),
inputs=[
application_subcomponent_ref("a", alias="a"),
application_subcomponent_ref("b", alias="b"),
],
)
def diff(a: ir.Table, b: ir.Table, context: ComponentExecutionContext) -> ir.Table:
"""
Dynamically compares schemas of two tables, joins on all string/timestamp columns, and selects all columns from both sides.
Raises an error if schemas do not match.
Uses alias names for inputs if provided, otherwise defaults to 'a' and 'b'.
"""
# Get aliases from parameters (default to 'a' and 'b')
app_context = context.application_component_context
params = app_context.config["parameters"]
a_alias = params["a"].get("alias", "a")
b_alias = params["b"].get("alias", "b")
threshold = params.get("value_threshold", 0)
exclude_columns = set(params.get("exclude_columns", []))
numeric_types = {"int", "integer", "bigint", "smallint", "tinyint", "float", "double", "real", "decimal", "numeric"}
# Get schemas as dicts: {colname: dtype}, excluding columns in exclude_columns
a_schema = {col: str(dtype) for col, dtype in zip(a.schema().names, a.schema().types) if col not in exclude_columns}
b_schema = {col: str(dtype) for col, dtype in zip(b.schema().names, b.schema().types) if col not in exclude_columns}
log(f"Schema {a_alias} (filtered): {a_schema}")
log(f"Schema {b_alias} (filtered): {b_schema}")
# Identify join keys: all columns that are string or timestamp, and not excluded
join_types = {"string", "timestamp", "date", "datetime"}
id_cols = [col for col, dtype in a_schema.items() if any(t in dtype.lower() for t in join_types)]
if not id_cols:
raise ValueError("No string or timestamp columns found for join keys after excluding columns.")
log(f"Join keys: {id_cols}")
join_condition = None
for col in id_cols:
dtype = a_schema[col]
cond = a[col] == b[col]
join_condition = cond if join_condition is None else (join_condition & cond)
# Perform full outer join (no suffixes argument in Ibis)
joined = a.join(b, join_condition, how="outer")
select_exprs = [joined[col] for col in id_cols]
# Add 'exists' column: struct with a_alias and b_alias fields, true if the first join key is not null in each input
presence_col = id_cols[0]
exists_struct = ibis.struct(dict(**{a_alias: a[presence_col].notnull(), b_alias: b[presence_col].notnull()})).name("EXISTS")
select_exprs.append(exists_struct)
# Add Missing column: indicates which dataset is missing the row, or if present in both
missing_expr = (
ibis.case()
.when(~exists_struct[a_alias], ibis.literal(f"Dataset {a_alias} Missing Row"))
.when(~exists_struct[b_alias], ibis.literal(f"Dataset {b_alias} Missing Row"))
.else_(ibis.literal("Row Not Missing"))
.end()
.name("MISSING")
)
select_exprs.append(missing_expr)
# Add DIFF_COLUMNS: array of {column, diff} for numeric columns where diff is not null and >= threshold
diff_columns_array = []
for col, dtype in a_schema.items():
if col not in id_cols and col not in exclude_columns:
is_numeric = any(t in dtype.lower() for t in numeric_types)
if is_numeric:
left_expr = a[col]
right_expr = b[col]
diff_value = (left_expr - right_expr).abs()
include_expr = diff_value.notnull() & (diff_value != 0)
diff_struct = ibis.ifelse(include_expr, ibis.struct({"column": ibis.literal(col), "diff": diff_value}), None)
diff_columns_array.append(diff_struct)
if diff_columns_array:
diff_columns_array_expr = ibis.array(diff_columns_array).name("DIFF_COLUMNS")
else:
diff_columns_array_expr = ibis.literal([], type="array<struct<column: string, diff: double>>").name("DIFF_COLUMNS")
select_exprs.append(diff_columns_array_expr)
# For non-key columns, output a struct with a_alias, b_alias, diff, diff_abs, and diff_pct fields
diff_pct_abs_exprs = []
for col, dtype in a_schema.items():
if col not in id_cols and col not in exclude_columns:
is_numeric = any(t in dtype.lower() for t in numeric_types)
left_expr = a[col]
right_expr = b[col]
if is_numeric:
# Calculate absolute percent difference: abs(a-b)/((abs(a)+abs(b))/2)*100
numerator = (left_expr - right_expr).abs()
denominator = ((left_expr.abs() + right_expr.abs()) / 2).nullif(0)
abs_pct_diff_expr = (numerator / denominator).coalesce(0.0)
# Only include if above threshold
include_expr = numerator >= threshold
filtered_pct_diff_expr = ibis.ifelse(include_expr, abs_pct_diff_expr, 0.0)
diff_pct_abs_exprs.append(filtered_pct_diff_expr)
# Only use zero for missing side if the other side has a value; otherwise, keep null
both_null = left_expr.isnull() & right_expr.isnull()
diff_expr = ibis.ifelse(both_null, ibis.literal(None), left_expr.coalesce(0) - right_expr.coalesce(0))
diff_abs_expr = diff_expr.abs()
diff_pct_expr = abs_pct_diff_expr
left_struct_expr = ibis.ifelse(left_expr.isnull() & right_expr.notnull(), 0, left_expr)
right_struct_expr = ibis.ifelse(right_expr.isnull() & left_expr.notnull(), 0, right_expr)
else:
diff_expr = ibis.literal(None)
diff_abs_expr = ibis.literal(None)
diff_pct_expr = ibis.literal(None)
left_struct_expr = left_expr
right_struct_expr = right_expr
struct_expr = ibis.struct(
dict(
**{a_alias: left_struct_expr, b_alias: right_struct_expr},
diff=diff_expr,
diff_abs=diff_abs_expr,
diff_pct=diff_pct_expr,
)
).name(col)
select_exprs.append(struct_expr)
# Add diff_score column: 1 if either input does not exist, else sum of abs(diff_pct) for all numeric columns
exists_a = exists_struct[a_alias]
exists_b = exists_struct[b_alias]
if diff_pct_abs_exprs:
sum_diff_pct_abs = diff_pct_abs_exprs[0].coalesce(0.0)
for expr in diff_pct_abs_exprs[1:]:
sum_diff_pct_abs = sum_diff_pct_abs + expr
sum_diff_pct_abs = sum_diff_pct_abs.coalesce(0.0)
else:
sum_diff_pct_abs = ibis.literal(0.0)
diff_score_expr = ibis.case().when(~exists_a | ~exists_b, 1.0).else_(sum_diff_pct_abs).end().coalesce(0.0).name("DIFF_SCORE")
select_exprs.insert(len(id_cols), diff_score_expr)
log(f"Output columns: {[str(e.get_name()) for e in select_exprs]}")
log(f"Using aliases: {a_alias} (for 'a'), {b_alias} (for 'b')")
result = joined.select(select_exprs)
return result
Filtered results (flagged.py
)
This template filters the output from diff.py
, keeping only rows where the DIFF_SCORE
(representing the degree of difference for each row) exceeds a configurable threshold:
from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.resources import (
application_subcomponent_name,
application_subcomponent_ref,
transform,
)
from ibis import ir
@transform(
name=application_subcomponent_name("flagged"),
inputs=[
application_subcomponent_ref("diff"),
],
)
def flagged(diff: ir.Table, context: ComponentExecutionContext) -> ir.Table:
"""
Filters by DIFF_SCORE threshold
"""
app_context = context.application_component_context
threshold = app_context.config["parameters"]["percent_threshold"]
log(f"Filtering rows with DIFF_SCORE > {threshold}")
# Filter rows using Ibis expressions
filtered = diff.filter(diff.DIFF_SCORE > threshold)
log("Ibis filtering complete")
return filtered
Configure the Application​
Create a YAML configuration file that references the templates and defines parameter values:
component:
application:
application_id: simple # Simple Application
config:
template_path: templates/flows/data_diff # Directory containing all templates
parameters:
a:
flow: data_quality
name: builder_sessions
alias: table_1
b:
name: simple__builder_sessions
alias: table_2
where: | # Date filter
TO_DATE(session_start_time) IN ('2022-10-15', '2025-03-01')
percent_threshold: 0.001 # Relative/percent difference threshold
value_threshold: 0.001 # Absolute difference threshold
exclude_columns:
- SESSION_ID
When Ascend processes this configuration, it generates SQL and Python Transform Components that compare your tables based on the specified parameters.
🎉 Congratulations! You've successfully created a Simple Application that can be reused across different datasets and scenarios.