Incremental Python Read
In this guide, we'll build an Incremental Python Read Component that ingests only new or updated records by leveraging Ascend's Incremental strategy.
Prerequisites
- Ascend Flow
Create a new Component
From your Workspace Super Graph view, follow these steps to create your Component:
- Form
- Files panel
- Double-click the Flow where you want to create your Component
- Right-click anywhere in the Flow Graph
- Hover over Create Component, then over Read in the expanded menu, and click From Scratch
- Complete the form with these details:
- Select your Flow
- Enter a descriptive Component Name like
read_sales
- Select Python as your file type
- Open the files panel in the top left corner
- Navigate to and select your desired Flow
- Right-click on the components directory and choose New file
- Name your file with a descriptive name like
read_sales.py
and press enter
Create your Incremental Python Read Component
Structure your Incremental Python Read Component following this pattern, based on our Otto's Expeditions Project:
-
Import necessary packages: Include Ascend resources (
read
), context handlers (ComponentExecutionContext
), data processing libraries (likePolars
orpandas
), and logging utilities (log
) -
Apply the
@read()
decorator with incremental configuration:- Set
strategy="incremental"
to enable incremental processing - Choose
incremental_strategy="merge"
or"append"
based on your data needs - Specify
unique_key
for merge operations - Set
on_schema_change="sync_all_columns"
to handle schema evolution
- Set
-
Define your incremental read function: Implement logic that filters data based on previous state
-
Return structured data: Return the processed data as a dataframe or table
The @read()
decorator integrates your function into Ascend's stateful execution framework, automatically managing incremental state and schema updates.
Choose an Incremental Strategy
Ascend offers two incremental strategies: merge
and append
. Choose based on your data requirements:
Strategy | Description | When to Use | Required parameters |
---|---|---|---|
Merge | Updates existing records based on a key and inserts new ones | For data that can be updated (user profiles, product info) | unique_key |
Append | Simply adds new data to existing dataset | For immutable data (logs, events) | None |
For detailed explanations, see our incremental processing reference guide.
Merge strategy example
This example demonstrates the merge
incremental strategy using our Otto's Expeditions Project:
import polars as pl
import pyarrow as pa
from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.resources import read
@read(
strategy="incremental",
incremental_strategy="merge",
unique_key="id",
on_schema_change="sync_all_columns",
)
def read_inlinked(context: ComponentExecutionContext) -> pa.Table:
"""
Append strategy example
This example demonstrates the append
incremental strategy with timestamp-based filtering:
from datetime import datetime
from ascend.application.context import IncrementalComponentExecutionContext
from ascend.resources import read
# Example data for timestamped incremental append
initial_data = {
"key": [1, 2],
"ts": [datetime(2020, 1, 1, 12, 0), datetime(2020, 1, 2, 12, 30)],
"string": ["a", "b"],
"integer": [1, 2],
}
updated_data = {
"key": [1, 2, 3],
For more examples and advanced options, see our reference guide.
🎉 Congratulations! You've successfully created an Incremental Python Read Component in Ascend.