Slowly Changing Dimensions Type 2 is one of those patterns that looks clean on a whiteboard and messy in production. The concept is simple: keep a full history of changes to a record, with start/end dates marking each version's validity window. The implementation — managing open-ended records, detecting what actually changed, closing old versions — requires careful engineering.
Delta Lake's MERGE operation makes this tractable. Here's how to implement it properly.
What SCD Type 2 looks like in the table
A customer changes their email address. The SCD Type 2 table should look like this:
| customer_id | email | status | valid_from | valid_to | is_current | |---|---|---|---|---|---| | 101 | old@email.com | active | 2024-01-15 | 2025-03-10 | false | | 101 | new@email.com | active | 2025-03-10 | 9999-12-31 | true |
The current record has valid_to = 9999-12-31 (a sentinel date) and is_current = true. Historical records have the actual end date and is_current = false. To get the current state of all customers, filter WHERE is_current = true.
The MERGE approach
The core operation is a two-part MERGE. First, close old versions: when an existing current record has changed, set its valid_to and is_current. Then insert new versions: the new current record.
Delta Lake doesn't support inserting multiple rows from a single MERGE match, so we handle this in two steps:
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from pyspark.sql import DataFrame
def apply_scd2(
spark,
source_df: DataFrame,
target_table: str,
key_cols: list[str],
tracked_cols: list[str],
timestamp_col: str = "updated_at"
):
target = DeltaTable.forName(spark, target_table)
# Build the join condition on natural keys
key_condition = " AND ".join([
f"target.{k} = source.{k}" for k in key_cols
])
# Build the change detection condition
change_condition = " OR ".join([
f"target.{c} <> source.{c}" for c in tracked_cols
])
# ── Step 1: Close outdated current records ──────────────────
target.alias("target").merge(
source=source_df.alias("source"),
condition=f"{key_condition} AND target.is_current = true AND ({change_condition})"
).whenMatchedUpdate(set={
"valid_to": f"source.{timestamp_col}",
"is_current": "false"
}).execute()
# ── Step 2: Insert new current records ──────────────────────
# Only for records that changed (new or modified)
existing_current = spark.table(target_table).filter(F.col("is_current") == True)
new_records = source_df.alias("source").join(
existing_current.alias("existing"),
on=key_cols,
how="left_anti" # records not in current target
).union(
# Also include changed records (those we just closed)
source_df.alias("source").join(
spark.table(target_table)
.filter(F.col("is_current") == False)
.filter(F.col("valid_to") == F.col(timestamp_col))
.alias("closed"),
on=key_cols,
how="inner"
).select(source_df.columns)
).distinct()
(
new_records
.withColumn("valid_from", F.col(timestamp_col))
.withColumn("valid_to", F.lit("9999-12-31").cast("date"))
.withColumn("is_current", F.lit(True))
.write
.format("delta")
.mode("append")
.saveAsTable(target_table)
)
A cleaner single-step approach
The two-step approach above works but is verbose. A cleaner pattern uses a staging table to pre-compute the new and old rows, then does a full MERGE:
def apply_scd2_clean(spark, source_df, target_table, key_cols, tracked_cols):
target_df = spark.table(target_table)
# Find records that changed
join_cond = [source_df[k] == target_df[k] for k in key_cols]
change_cond = " OR ".join([f"target.{c} <> source.{c}" for c in tracked_cols])
changed = (
source_df.alias("source")
.join(
target_df.filter(F.col("is_current") == True).alias("target"),
on=join_cond,
how="inner"
)
.where(change_cond)
.select("source.*")
)
# Records to close: matched current records that changed
to_close = (
target_df.filter(F.col("is_current") == True)
.join(changed.select(*key_cols), on=key_cols, how="inner")
.withColumn("valid_to", F.current_date())
.withColumn("is_current", F.lit(False))
)
# New records: brand new + changed (new version)
new_inserts = (
source_df
.join(target_df.select(*key_cols), on=key_cols, how="left_anti")
.union(changed)
.withColumn("valid_from", F.current_date())
.withColumn("valid_to", F.lit("9999-12-31").cast("date"))
.withColumn("is_current", F.lit(True))
)
# Merge closed records back into target
DeltaTable.forName(spark, target_table).alias("t").merge(
to_close.alias("s"),
condition=" AND ".join([f"t.{k} = s.{k}" for k in key_cols])
+ " AND t.is_current = false AND t.valid_from = s.valid_from"
).whenMatchedUpdateAll().execute()
# Append new versions
new_inserts.write.format("delta").mode("append").saveAsTable(target_table)
Creating the initial table
The target table needs the SCD columns in addition to the natural schema:
spark.sql(f"""
CREATE TABLE IF NOT EXISTS gold.dim_customers (
customer_id BIGINT NOT NULL,
email STRING,
status STRING,
address STRING,
-- SCD columns
valid_from DATE NOT NULL,
valid_to DATE NOT NULL,
is_current BOOLEAN NOT NULL
)
USING DELTA
PARTITIONED BY (is_current)
LOCATION 'abfss://gold@storage.dfs.core.windows.net/dim_customers'
""")
Partitioning by is_current is a significant optimization. Your most common query — WHERE is_current = true — reads only the true partition, which is typically a small fraction of the total table size. Historical data accumulates in the false partition and is scanned only for point-in-time queries.
Querying SCD Type 2 tables
Current state of all customers:
SELECT customer_id, email, status, address
FROM gold.dim_customers
WHERE is_current = true
Point-in-time snapshot (what did the data look like on 2024-06-01?):
SELECT customer_id, email, status, address
FROM gold.dim_customers
WHERE valid_from <= '2024-06-01'
AND valid_to >= '2024-06-01'
Full history for a specific customer:
SELECT customer_id, email, status, valid_from, valid_to, is_current
FROM gold.dim_customers
WHERE customer_id = 101
ORDER BY valid_from
Production gotchas
Null handling in change detection. Standard inequality (<>) returns NULL when either side is NULL. A column that was NULL and becomes a value, or vice versa, won't be detected as changed. Fix with explicit null-safe comparison:
change_cond = " OR ".join([
f"(target.{c} <> source.{c} OR (target.{c} IS NULL) <> (source.{c} IS NULL))"
for c in tracked_cols
])
Deduplication of source data. If your source contains multiple rows for the same key in the same batch (two updates to the same customer in one day), deduplicate before applying SCD2. Keep only the latest version per key:
from pyspark.sql import Window
w = Window.partitionBy(*key_cols).orderBy(F.col("updated_at").desc())
source_df = source_df.withColumn("rn", F.row_number().over(w)).filter("rn = 1").drop("rn")
OPTIMIZE on the historical partition. The is_current = false partition grows continuously and never gets compacted unless you run OPTIMIZE explicitly. Schedule it weekly:
spark.sql(f"OPTIMIZE {target_table} WHERE is_current = false ZORDER BY ({', '.join(key_cols)})")
Avoid aggressive VACUUM. Delta Lake's VACUUM deletes old file versions. If you're keeping 90 days of SCD history, make sure your VACUUM retention is longer than the audit window you need. The default is 7 days — likely too short.