PySpark·2026-03-22·5 min read·

SCD Type 2 in Delta Lake: tracking history without losing your mind

A complete implementation of Slowly Changing Dimensions Type 2 using Delta Lake MERGE, with real examples and production considerations.

Slowly Changing Dimensions Type 2 is one of those patterns that looks clean on a whiteboard and messy in production. The concept is simple: keep a full history of changes to a record, with start/end dates marking each version's validity window. The implementation — managing open-ended records, detecting what actually changed, closing old versions — requires careful engineering.

Delta Lake's MERGE operation makes this tractable. Here's how to implement it properly.

What SCD Type 2 looks like in the table

A customer changes their email address. The SCD Type 2 table should look like this:

| customer_id | email | status | valid_from | valid_to | is_current | |---|---|---|---|---|---| | 101 | old@email.com | active | 2024-01-15 | 2025-03-10 | false | | 101 | new@email.com | active | 2025-03-10 | 9999-12-31 | true |

The current record has valid_to = 9999-12-31 (a sentinel date) and is_current = true. Historical records have the actual end date and is_current = false. To get the current state of all customers, filter WHERE is_current = true.

The MERGE approach

The core operation is a two-part MERGE. First, close old versions: when an existing current record has changed, set its valid_to and is_current. Then insert new versions: the new current record.

Delta Lake doesn't support inserting multiple rows from a single MERGE match, so we handle this in two steps:

from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql import DataFrame

def apply_scd2(
    spark,
    source_df: DataFrame,
    target_table: str,
    key_cols: list[str],
    tracked_cols: list[str],
    timestamp_col: str = "updated_at"
):
    target = DeltaTable.forName(spark, target_table)

    # Build the join condition on natural keys
    key_condition = " AND ".join([
        f"target.{k} = source.{k}" for k in key_cols
    ])

    # Build the change detection condition
    change_condition = " OR ".join([
        f"target.{c} <> source.{c}" for c in tracked_cols
    ])

    # ── Step 1: Close outdated current records ──────────────────
    target.alias("target").merge(
        source=source_df.alias("source"),
        condition=f"{key_condition} AND target.is_current = true AND ({change_condition})"
    ).whenMatchedUpdate(set={
        "valid_to": f"source.{timestamp_col}",
        "is_current": "false"
    }).execute()

    # ── Step 2: Insert new current records ──────────────────────
    # Only for records that changed (new or modified)
    existing_current = spark.table(target_table).filter(F.col("is_current") == True)

    new_records = source_df.alias("source").join(
        existing_current.alias("existing"),
        on=key_cols,
        how="left_anti"  # records not in current target
    ).union(
        # Also include changed records (those we just closed)
        source_df.alias("source").join(
            spark.table(target_table)
                 .filter(F.col("is_current") == False)
                 .filter(F.col("valid_to") == F.col(timestamp_col))
                 .alias("closed"),
            on=key_cols,
            how="inner"
        ).select(source_df.columns)
    ).distinct()

    (
        new_records
        .withColumn("valid_from", F.col(timestamp_col))
        .withColumn("valid_to", F.lit("9999-12-31").cast("date"))
        .withColumn("is_current", F.lit(True))
        .write
        .format("delta")
        .mode("append")
        .saveAsTable(target_table)
    )

A cleaner single-step approach

The two-step approach above works but is verbose. A cleaner pattern uses a staging table to pre-compute the new and old rows, then does a full MERGE:

def apply_scd2_clean(spark, source_df, target_table, key_cols, tracked_cols):
    target_df = spark.table(target_table)

    # Find records that changed
    join_cond = [source_df[k] == target_df[k] for k in key_cols]
    change_cond = " OR ".join([f"target.{c} <> source.{c}" for c in tracked_cols])

    changed = (
        source_df.alias("source")
        .join(
            target_df.filter(F.col("is_current") == True).alias("target"),
            on=join_cond,
            how="inner"
        )
        .where(change_cond)
        .select("source.*")
    )

    # Records to close: matched current records that changed
    to_close = (
        target_df.filter(F.col("is_current") == True)
        .join(changed.select(*key_cols), on=key_cols, how="inner")
        .withColumn("valid_to", F.current_date())
        .withColumn("is_current", F.lit(False))
    )

    # New records: brand new + changed (new version)
    new_inserts = (
        source_df
        .join(target_df.select(*key_cols), on=key_cols, how="left_anti")
        .union(changed)
        .withColumn("valid_from", F.current_date())
        .withColumn("valid_to", F.lit("9999-12-31").cast("date"))
        .withColumn("is_current", F.lit(True))
    )

    # Merge closed records back into target
    DeltaTable.forName(spark, target_table).alias("t").merge(
        to_close.alias("s"),
        condition=" AND ".join([f"t.{k} = s.{k}" for k in key_cols])
              + " AND t.is_current = false AND t.valid_from = s.valid_from"
    ).whenMatchedUpdateAll().execute()

    # Append new versions
    new_inserts.write.format("delta").mode("append").saveAsTable(target_table)

Creating the initial table

The target table needs the SCD columns in addition to the natural schema:

spark.sql(f"""
    CREATE TABLE IF NOT EXISTS gold.dim_customers (
        customer_id     BIGINT      NOT NULL,
        email           STRING,
        status          STRING,
        address         STRING,
        -- SCD columns
        valid_from      DATE        NOT NULL,
        valid_to        DATE        NOT NULL,
        is_current      BOOLEAN     NOT NULL
    )
    USING DELTA
    PARTITIONED BY (is_current)
    LOCATION 'abfss://gold@storage.dfs.core.windows.net/dim_customers'
""")

Partitioning by is_current is a significant optimization. Your most common query — WHERE is_current = true — reads only the true partition, which is typically a small fraction of the total table size. Historical data accumulates in the false partition and is scanned only for point-in-time queries.

Querying SCD Type 2 tables

Current state of all customers:

SELECT customer_id, email, status, address
FROM gold.dim_customers
WHERE is_current = true

Point-in-time snapshot (what did the data look like on 2024-06-01?):

SELECT customer_id, email, status, address
FROM gold.dim_customers
WHERE valid_from <= '2024-06-01'
  AND valid_to   >= '2024-06-01'

Full history for a specific customer:

SELECT customer_id, email, status, valid_from, valid_to, is_current
FROM gold.dim_customers
WHERE customer_id = 101
ORDER BY valid_from

Production gotchas

Null handling in change detection. Standard inequality (<>) returns NULL when either side is NULL. A column that was NULL and becomes a value, or vice versa, won't be detected as changed. Fix with explicit null-safe comparison:

change_cond = " OR ".join([
    f"(target.{c} <> source.{c} OR (target.{c} IS NULL) <> (source.{c} IS NULL))"
    for c in tracked_cols
])

Deduplication of source data. If your source contains multiple rows for the same key in the same batch (two updates to the same customer in one day), deduplicate before applying SCD2. Keep only the latest version per key:

from pyspark.sql import Window

w = Window.partitionBy(*key_cols).orderBy(F.col("updated_at").desc())
source_df = source_df.withColumn("rn", F.row_number().over(w)).filter("rn = 1").drop("rn")

OPTIMIZE on the historical partition. The is_current = false partition grows continuously and never gets compacted unless you run OPTIMIZE explicitly. Schedule it weekly:

spark.sql(f"OPTIMIZE {target_table} WHERE is_current = false ZORDER BY ({', '.join(key_cols)})")

Avoid aggressive VACUUM. Delta Lake's VACUUM deletes old file versions. If you're keeping 90 days of SCD history, make sure your VACUUM retention is longer than the audit window you need. The default is 7 days — likely too short.