Context objects
This concept guide provides a deeper understanding of the context objects used throughout Ascend, including the general ApplicationContext
used by the core framework and the specialized ComponentExecutionContext
used during component execution.
What are context objects in Ascend?β
In Ascend, context objects encapsulate the runtime environment, configuration, and services needed at different stages of application execution. They provide:
- Access to runtime configuration and flow metadata
- Resource acquisition and management (connections, vaults, data plane)
- DAG and component orchestration
- Temporary file and disposable resource management
- Incremental processing support
- Query tagging and metadata resolution
There are two primary context classes:
ApplicationContext
: A general-purpose context for flow-level and cross-component operations.ComponentExecutionContext
: ExtendsApplicationContext
with component-specific APIs during execution.
Why are context objects necessary?β
Context objects are a foundational design pattern in Ascend that solve several critical challenges in data processing frameworks:
-
Dependency injection: Context objects provide components with access to resources, configurations, and services without tight coupling. Components don't need to know how to create or find these resourcesβthe context injects them as needed.
-
Consistent state management: By passing context objects between framework layers, Ascend ensures that all components have access to the same runtime state, configurations, and resources during execution.
-
Resource lifecycle management: Contexts manage the acquisition, sharing, and release of resources like database connections, temporary files, and caches, ensuring proper cleanup even in error scenarios.
-
Runtime metadata propagation: They carry execution metadata like run IDs, component names, and tags that are essential for tracking, monitoring, and debugging data pipelines.
-
Cross-component communication: Components can safely access data and state from other components through controlled interfaces provided by context objects, without direct dependencies.
-
Platform abstraction: Context objects hide platform-specific details, allowing components to operate consistently across different execution environments (cloud, on-premise, different data planes).
-
Security boundaries: They implement access controls that restrict components to only the resources and data they're authorized to use.
By centralizing these functions in context objects, Ascend simplifies component implementation while maintaining a robust, scalable, and secure execution environment.
Application contextβ
The ApplicationContext
represents the runtime context for a flow run and orchestration logic. It is created by the Ascend engine and provides core information and services to the framework and helpers.
Key propertiesβ
run_id
: Unique identifier of the current run (flow).flow_name
: Name of the current flow.profile_name
: Profile being used.project_path
: Path to the project root.working_directory
: Local working directory for scratch files.data_plane
: Instance of the data plane for storage operations.resource_manager
: Manages flow and connection resources.parameters
: Flow parameters provided at runtime.vaults
: Access to configured vaults.dag
: Directed acyclic graph of the flow execution plan.labels
: Optional labels attached to the run.in_time_series_mode
: Flag indicating time-series mode for event-based flows.
Key methodsβ
get_connection(name)
: Retrieve a named connection from configured resources.get_query_tag()
: Return tags (flow, profile, run, etc.) for query attribution.get_app_context_for_flow(flow)
: Obtain anApplicationContext
for another flow in the same profile.get_component_context(component, flow)
: Obtain aComponentExecutionContext
for a specific component.get_component_store(name, flow=None)
: Access a component's data store for metadata or data operations.metadata_storage_location_prefix
: Prefix for metadata storage, as defined by the data plane.
Component execution contextβ
The ComponentExecutionContext
is the core context object provided to components during execution. It extends the ApplicationContext
and adds component-specific functionality.
Key propertiesβ
component_name
: Name of the current componentflow_name
: Name of the current flowrun_id
: Unique identifier for the current runprofile_name
: Name of the profile being usedtmp_dir
: Temporary directory for component use (automatically cleaned up)data_plane
: Access to the data plane for storage operationsvaults
: Access to configured vaultsparameters
: Flow parametersmeta_column_name_resolver
: Resolver for metadata column names
Key methodsβ
get_connection(name)
: Retrieve a connection by nameget_component_store(name, flow=None)
: Get a component's data storecurrent_data(data_format=None)
: Get the component's current dataset_cache(key, value)
/get_cache(key)
: Store/retrieve temporary dataget_meta_column_name(col)
: Resolve metadata column namesget_query_tag()
: Get tags for query attributionto_write_context()
: Convert to a write context
Incremental component execution contextβ
The IncrementalComponentExecutionContext
is a specialized context that enables efficient processing of new or changed data without reprocessing existing data. It wraps the ComponentExecutionContext
and adds an is_incremental
flag that signals when a component is running in incremental mode.
This context is particularly useful for implementing:
- Time-based incremental processing (processing only data newer than previously processed data)
- Partition-based strategies (processing only new partitions)
- Change data capture scenarios (handling inserts, updates, and deletes)
Example: Using incremental context for append strategyβ
The following example demonstrates a typical timestamp-based incremental processing pattern. It shows how to:
- Check if the component is running incrementally
- Access the current data to determine what's already been processed
- Filter incoming data based on timestamps
- Accommodate platform-specific requirements (like Snowflake's uppercase column names)
from datetime import datetime
from ascend.resources import read
from ascend.application.context import IncrementalComponentExecutionContext
# Example data for timestamped incremental append
initial_data = {
"key": [1, 2],
"ts": [datetime(2020, 1, 1, 12, 0), datetime(2020, 1, 2, 12, 30)],
"string": ["a", "b"],
"integer": [1, 2],
}
updated_data = {
"key": [1, 2, 3],
"ts": [
datetime(2021, 1, 1, 12, 0),
datetime(2020, 1, 2, 12, 30),
datetime(2021, 2, 1, 12, 0),
],
"string": ["a", "bb", "c"],
"integer": [1, 2, 3],
}
@read(strategy="incremental", incremental_strategy="append")
def incremental_custom_python_read_append(context: IncrementalComponentExecutionContext):
"""Demonstrates the append incremental strategy with timestamp-based filtering."""
current_data = context.current_data("pandas")
data = initial_data
if context.is_incremental:
# row 1 has an updated timestamp, so it is included in the delta
# row 3 is new and has a timestamp > than the max in the current output
ts_key = "ts"
if context.data_plane_type == "snowflake":
ts_key = ts_key.upper()
max_ts = current_data[ts_key].max()
filtered_data = {key: [val for i, val in enumerate(updated_data[key]) if updated_data["ts"][i] > max_ts] for key in updated_data.keys()}
# Since we are using the append strategy, the updated row 1 will get appended to the existing output.
data = filtered_data
if context.data_plane_type == "snowflake":
# make keys uppercase for Snowflake compatibility
data = {key.upper(): val for key, val in data.items()}
return data
In this example:
- The
IncrementalComponentExecutionContext
provides theis_incremental
flag to determine if the component is running incrementally current_data()
is used to access the existing data for comparisondata_plane_type
is used to handle platform-specific variations
Working with context objectsβ
Accessing runtime informationβ
# Basic runtime information
component_name = context.component_name
flow_name = context.flow_name
run_id = context.run_id
profile_name = context.profile_name
# Query tagging for database operations
query_tags = context.get_query_tag() # Returns tags like flow, profile, component
Resource managementβ
# Accessing connections
snowflake_conn = context.get_connection("snowflake_connection")
# Accessing vaults
secrets = context.vaults.get("my_secrets_vault")
api_key = secrets.get("api_key")
# Working with temporary files
import pandas as pd
temp_file_path = context.tmp_dir / "temporary_data.csv"
df.to_csv(temp_file_path)
# Files in tmp_dir are automatically cleaned up after component execution
Component interactionβ
# Get data from another component
upstream_store = context.get_component_store("upstream_component")
upstream_data = upstream_store.get_current_data("pandas")
# Get data from a component in another flow
cross_flow_store = context.get_component_store("other_component", "other_flow")
Caching dataβ
# Store calculated values for later use in the same run
context.set_cache("expensive_calculation", result)
# Retrieve cached values
if cached_result := context.get_cache("expensive_calculation"):
# Use cached result
pass
else:
# Calculate and cache
result = perform_expensive_calculation()
context.set_cache("expensive_calculation", result)
Conclusionβ
Context objects are fundamental to Ascend's component model, providing components with the information and capabilities they need at runtime while maintaining isolation and security. By understanding and effectively using the context objects, you can build components that seamlessly integrate with Ascend's data processing capabilities, including powerful incremental processing strategies.