Skip to main content

Advanced Python patterns

This guide covers advanced Python Transform patterns including different input data formats, the context object, logging utilities, and vault access.

Input data formats

The @transform decorator supports multiple input data formats via the input_data_format parameter. The default is Ibis.

DuckDB PyRelation

Use DuckDB relations for SQL-based transformations with excellent performance:

import duckdb

from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.resources import ref, test, transform

@transform(
inputs=[ref("upstream_component")],
input_data_format="duckdb",
tests=[test("not_null", column="calculated_value")],
)
def duckdb_transform(
data: duckdb.DuckDBPyRelation,
context: ComponentExecutionContext
) -> duckdb.DuckDBPyRelation:
"""Transform data using DuckDB relations."""
log("Processing with DuckDB")

# Use SQL syntax directly on the relation
result = duckdb.sql("""
SELECT
*,
original_value * 2 AS calculated_value,
CASE
WHEN category = 'A' THEN 'Premium'
ELSE 'Standard'
END AS tier
FROM data
WHERE status = 'active'
""")

return result

PyArrow

Use PyArrow for columnar data processing and efficient serialization:

import pyarrow as pa
import pyarrow.compute as pc

from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.resources import ref, test, transform

@transform(
inputs=[ref("upstream_component")],
input_data_format="pyarrow",
tests=[
test("count_greater_than", count=0),
test("not_null", column="doubled_value"),
],
)
def pyarrow_transform(
data: pa.Table,
context: ComponentExecutionContext
) -> pa.Table:
"""Transform data using PyArrow."""
log(f"Processing PyArrow table with schema: {data.schema}")

# Use PyArrow compute functions
doubled = pc.multiply(data.column("original_value"), 2)

# Add new column
result = data.append_column("doubled_value", doubled)

# Filter using expressions
mask = pc.greater(data.column("amount"), 100)
result = result.filter(mask)

return result

Dictionary format

Use dictionaries for simple column-based operations:

from typing import Any

from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.resources import ref, test, transform

@transform(
inputs=[ref("upstream_component")],
input_data_format="dict",
tests=[test("not_null", column="calculated_field")],
)
def dict_transform(
data: dict[str, list[Any]],
context: ComponentExecutionContext
) -> dict[str, list[Any]]:
"""Transform data using dictionary manipulation."""
log(f"Processing data with columns: {list(data.keys())}")

# Create new columns by transforming existing ones
result = data.copy()
result["calculated_field"] = [x * 2 for x in data["value"]]
result["upper_name"] = [s.upper() for s in data["name"]]

# Access parameters if configured
multiplier = context.parameters.get("multiplier", 1)
result["adjusted"] = [x * multiplier for x in data["value"]]

return result

pandas

Use pandas for familiar DataFrame operations:

import pandas as pd
from datetime import datetime

from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.resources import ref, test, transform

@transform(
inputs=[ref("upstream_component")],
input_data_format="pandas",
tests=[test("not_null", column="processed_value")],
)
def pandas_transform(
data: pd.DataFrame,
context: ComponentExecutionContext
) -> pd.DataFrame:
"""Transform data using pandas."""
result = data.copy()

# Apply transformations
result["processed_value"] = result["value"].apply(lambda x: x * 2)
result["category_upper"] = result["category"].str.upper()
result["processed_at"] = datetime.now()

log(f"Processed {len(result)} rows")
return result

Context object reference

The context parameter provides access to runtime information, parameters, and utilities.

Runtime information

@transform(inputs=[ref("source")])
def my_transform(data, context):
# Component and Flow information
component_name = context.component_name
flow_name = context.flow_name
profile_name = context.profile_name

# Run information
run_id = context.run_id
flow_run = context.flow_run
created_at = flow_run.created_at

# Data plane information
data_plane_type = context.data_plane_type # "snowflake", "bigquery", etc.

return data

Parameter access

@transform(inputs=[ref("source")])
def my_transform(data, context):
# Access component parameters
threshold = context.parameters.get("threshold", 100)
config = context.parameters.get("config", {})

# Use in transformation
return data.filter(data["value"] > threshold)

Incremental processing

@transform(
inputs=[ref("source")],
materialized="incremental",
incremental_strategy="merge",
unique_key="id",
)
def incremental_transform(data, context):
# Check if running incrementally
if context.is_incremental:
# Get current state of the data
current = context.current_data()
if current is not None:
last_ts = current["updated_at"].max()
log(f"Processing records after {last_ts}")
else:
log("Full refresh mode")

return data.mutate(processed_at=datetime.now())

Partition information

@transform(
inputs=[ref("source", reshape="map")],
)
def partitioned_transform(data, context):
# Get current partition values
partition_id = context.partition_values.get("_ascend_partition_uuid")
date_partition = context.partition_values.get("date")

log(f"Processing partition: {partition_id}, date: {date_partition}")

return data.mutate(partition_processed=partition_id)

Temporary storage

@transform(inputs=[ref("source")])
def transform_with_temp(data, context):
# Access temporary directory for intermediate files
tmp_dir = context.tmp_dir

# Write intermediate data
intermediate_path = f"{tmp_dir}/intermediate.parquet"
data.to_parquet(intermediate_path)

# Process and return
return data

Cache storage

@transform(inputs=[ref("source")])
def transform_with_cache(data, context):
# Store values in cache
context.set_cache("row_count", data.count().execute())
context.set_cache("processed_columns", list(data.columns))

# Later retrieval
cached_count = context.get_cache("row_count")

return data

Vault access

Access secrets securely through the context:

from ascend.application.context import ComponentExecutionContext
from ascend.resources import task, ref

@task(dependencies=[ref("data_component")])
def secure_api_task(data_component, context: ComponentExecutionContext):
"""Access secrets from vault."""
# Get vault reference
vault = context.vaults.get("api_credentials")

# Retrieve secrets
api_key = vault.get("api_key")
api_secret = vault.get("api_secret")

# Use credentials (example)
response = call_external_api(api_key, api_secret, data_component)

log(f"API call completed with status: {response.status}")

Logging utilities

Ascend provides logging utilities through ascend.common.events.

Basic logging

from ascend.common.events import log

@transform(inputs=[ref("source")])
def my_transform(data, context):
log("Starting transformation")

# Process data
result = data.mutate(value=data.value * 2)

log(f"Processed {result.count().execute()} rows")
return result

Debug logging with stacktrace

from ascend.common.events import log

@transform(inputs=[ref("source")])
def debug_transform(data, context):
# Include stacktrace for debugging
log("Debug checkpoint reached", stacktrace=True)
return data

Performance timing

from ascend.common.events import timer

@transform(inputs=[ref("source")])
def timed_transform(data, context):
with timer("data_validation"):
# Validate data
validate(data)

with timer("transformation"):
# Transform data
result = apply_transformations(data)

with timer("aggregation"):
# Aggregate results
final = aggregate(result)

return final

Exception logging

from ascend.common.events import log_on_exception

@transform(inputs=[ref("source")])
def safe_transform(data, context):
with log_on_exception("Error during data processing"):
# Code that might raise an exception
result = risky_operation(data)

return result

Conditional exception handling

import contextlib
from ascend.common.events import log_on_exception

@transform(inputs=[ref("source")])
def selective_transform(data, context):
# Log all exceptions except ValueError
with contextlib.suppress(ValueError):
with log_on_exception(
"Processing error",
where=lambda e: not isinstance(e, ValueError)
):
result = validate_and_process(data)

return data

Smart partitioned readers

Create custom partitioned readers for efficient data ingestion:

from datetime import datetime
from typing import Generator

import pandas as pd

from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.common.filters import ListItem
from ascend.resources import CustomPythonPartitionedReader, test

reader = CustomPythonPartitionedReader(
name="partitioned_source",
tests=[
test("count_greater_than", count=0),
test("not_null", column="id"),
],
)

@reader.list()
def list_partitions(context: ComponentExecutionContext) -> Generator[ListItem, None, None]:
"""Generate partitions to process."""
# Could query an API, list S3 keys, etc.
partitions = ["2024-01", "2024-02", "2024-03"]

for partition in partitions:
yield ListItem(
name=partition,
fingerprint=f"fp_{partition}_{datetime.now().isoformat()}"
)

@reader.read()
def read_partition(
context: ComponentExecutionContext,
item: ListItem
) -> pd.DataFrame:
"""Read data for a specific partition."""
partition_id = item.name
log(f"Reading partition: {partition_id}")

# Fetch data for this partition (from API, file, etc.)
data = fetch_data_for_partition(partition_id)

return pd.DataFrame(data)

Schema evolution

Control how schema changes are handled:

@transform(
inputs=[ref("source")],
on_schema_change="sync_all_columns", # Options: "sync_all_columns", "fail", "ignore"
)
def evolving_schema_transform(data, context):
"""Handle schema changes gracefully."""
return data

Options:

  • "sync_all_columns": Add new columns, update existing (recommended)
  • "fail": Fail on any schema change
  • "ignore": Ignore schema changes

Best practices

  1. Choose the right input format: Use Ibis (default) for most cases; pandas for complex transformations; DuckDB for SQL-heavy operations
  2. Use type hints: Always include type hints for better documentation and IDE support
  3. Log meaningful information: Use structured logging for debugging and monitoring
  4. Handle errors gracefully: Use log_on_exception for better error visibility
  5. Access secrets via vaults: Never hardcode credentials; always use vault references
  6. Leverage caching: Use context.set_cache() for expensive computations that may be reused

Next steps