Skip to main content

Incremental Python Read

In this guide, we'll build an Incremental Python Read Component that ingests only new or updated records by leveraging Ascend's Incremental strategy.

Prerequisites

Create a new Component

From your Workspace Super Graph view, follow these steps to create your Component:

  1. Double-click the Flow where you want to create your Component
  2. Right-click anywhere in the Flow Graph
  3. Hover over Create Component, then over Read in the expanded menu, and click From Scratch menu
  4. Complete the form with these details:
    • Select your Flow
    • Enter a descriptive Component Name like read_sales
    • Select Python as your file type form

Create your Incremental Python Read Component

Structure your Incremental Python Read Component following this pattern, based on our Otto's Expeditions Project:

  1. Import necessary packages: Include Ascend resources (read), context handlers (ComponentExecutionContext), data processing libraries (like Polars or pandas), and logging utilities (log)

  2. Apply the @read() decorator with incremental configuration:

    • Set strategy="incremental" to enable incremental processing
    • Choose incremental_strategy="merge" or "append" based on your data needs
    • Specify unique_key for merge operations
    • Set on_schema_change="sync_all_columns" to handle schema evolution
  3. Define your incremental read function: Implement logic that filters data based on previous state

  4. Return structured data: Return the processed data as a dataframe or table

The @read() decorator integrates your function into Ascend's stateful execution framework, automatically managing incremental state and schema updates.

Choose an Incremental Strategy

Ascend offers two incremental strategies: merge and append. Choose based on your data requirements:

StrategyDescriptionWhen to UseRequired parameters
MergeUpdates existing records based on a key and inserts new onesFor data that can be updated (user profiles, product info)unique_key
AppendSimply adds new data to existing datasetFor immutable data (logs, events)None

For detailed explanations, see our incremental processing reference guide.

Merge strategy example

This example demonstrates the merge incremental strategy using our Otto's Expeditions Project:

incremental_read_merge.py
import polars as pl
import pyarrow as pa
from ascend.application.context import ComponentExecutionContext
from ascend.common.events import log
from ascend.resources import read


@read(
strategy="incremental",
incremental_strategy="merge",
unique_key="id",
on_schema_change="sync_all_columns",
)
def read_inlinked(context: ComponentExecutionContext) -> pa.Table:
"""

Append strategy example

This example demonstrates the append incremental strategy with timestamp-based filtering:

incremental_read_append.py
from datetime import datetime

from ascend.application.context import IncrementalComponentExecutionContext
from ascend.resources import read

# Example data for timestamped incremental append
initial_data = {
"key": [1, 2],
"ts": [datetime(2020, 1, 1, 12, 0), datetime(2020, 1, 2, 12, 30)],
"string": ["a", "b"],
"integer": [1, 2],
}

updated_data = {
"key": [1, 2, 3],

For more examples and advanced options, see our reference guide.

🎉 Congratulations! You've successfully created an Incremental Python Read Component in Ascend.