Running a full table reload every night is the first approach everyone tries. It works — until it doesn't. Tables grow, costs rise, and one day you're reprocessing 800 million rows because of three changed records. Incremental MERGE with UPSERT is the correct answer, and Delta Lake makes it straightforward once you understand the mechanics.
What MERGE actually does
A MERGE INTO statement in Delta Lake operates on two datasets: a target (the Delta table you're writing to) and a source (the dataframe with new or changed records). You define a match condition — usually the business key — and specify what happens on match and what happens when there's no match.
MERGE INTO silver.customers AS target
USING source_updates AS source
ON target.customer_id = source.customer_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
In PySpark, you use the DeltaTable API:
from delta.tables import DeltaTable
from pyspark.sql import functions as F
delta_target = DeltaTable.forName(spark, "silver.customers")
delta_target.alias("target").merge(
source=source_df.alias("source"),
condition="target.customer_id = source.customer_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
Clean and simple. But the real work is in building source_df correctly.
Building the watermark query
The whole point of incremental load is reading only what changed since the last run. You need a watermark: the maximum updated_at timestamp from the last successful execution, stored somewhere reliable.
from pyspark.sql import functions as F
# Read the current high watermark
watermark_df = spark.table("control.pipeline_watermarks").filter(
F.col("pipeline_name") == "customers_silver"
)
last_watermark = watermark_df.select("last_updated_at").first()[0]
# Read only changed records from Bronze
source_df = (
spark.table("bronze.customers")
.filter(F.col("updated_at") > last_watermark)
.filter(F.col("_is_deleted") == False)
)
print(f"Records to process: {source_df.count()}")
After a successful MERGE, you update the watermark:
new_watermark = source_df.select(F.max("updated_at")).first()[0]
spark.sql(f"""
UPDATE control.pipeline_watermarks
SET last_updated_at = '{new_watermark}',
last_run_at = current_timestamp()
WHERE pipeline_name = 'customers_silver'
""")
Never update the watermark before the MERGE succeeds. Always after.
Full notebook structure
Here's the complete notebook pattern I use in production:
# ── 0. Parameters ──────────────────────────────────────────────
dbutils.widgets.text("full_refresh", "false")
FULL_REFRESH = dbutils.widgets.get("full_refresh").lower() == "true"
PIPELINE_NAME = "customers_silver"
TARGET_TABLE = "silver.customers"
SOURCE_TABLE = "bronze.customers"
# ── 1. Full refresh path ────────────────────────────────────────
if FULL_REFRESH:
print("Full refresh mode — overwriting target table")
(
spark.table(SOURCE_TABLE)
.filter(F.col("_is_deleted") == False)
.write
.format("delta")
.mode("overwrite")
.option("overwriteSchema", "true")
.saveAsTable(TARGET_TABLE)
)
# Reset watermark to current max
new_wm = spark.table(SOURCE_TABLE).select(F.max("updated_at")).first()[0]
spark.sql(f"""
UPDATE control.pipeline_watermarks
SET last_updated_at = '{new_wm}', last_run_at = current_timestamp()
WHERE pipeline_name = '{PIPELINE_NAME}'
""")
dbutils.notebook.exit("full_refresh_complete")
# ── 2. Incremental path ─────────────────────────────────────────
last_wm = (
spark.table("control.pipeline_watermarks")
.filter(F.col("pipeline_name") == PIPELINE_NAME)
.select("last_updated_at")
.first()[0]
)
source_df = (
spark.table(SOURCE_TABLE)
.filter(F.col("updated_at") > last_wm)
.filter(F.col("_is_deleted") == False)
)
record_count = source_df.count()
print(f"Incremental records: {record_count}")
if record_count == 0:
print("No new records. Exiting.")
dbutils.notebook.exit("no_changes")
# ── 3. MERGE ────────────────────────────────────────────────────
DeltaTable.forName(spark, TARGET_TABLE).alias("t").merge(
source=source_df.alias("s"),
condition="t.customer_id = s.customer_id"
).whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()
# ── 4. Update watermark ─────────────────────────────────────────
new_wm = source_df.select(F.max("updated_at")).first()[0]
spark.sql(f"""
UPDATE control.pipeline_watermarks
SET last_updated_at = '{new_wm}', last_run_at = current_timestamp()
WHERE pipeline_name = '{PIPELINE_NAME}'
""")
print(f"Done. New watermark: {new_wm}")
Common pitfalls
Non-unique business keys. If your source has duplicate customer_id in the same batch, the MERGE behavior is non-deterministic. Deduplicate before merging:
from pyspark.sql import Window
w = Window.partitionBy("customer_id").orderBy(F.col("updated_at").desc())
source_df = (
source_df
.withColumn("rn", F.row_number().over(w))
.filter(F.col("rn") == 1)
.drop("rn")
)
Small files after MERGE. Each MERGE write creates new Parquet files. Over time this fragments the table. Run OPTIMIZE periodically:
spark.sql(f"OPTIMIZE {TARGET_TABLE} ZORDER BY (customer_id)")
Schedule this weekly, not after every MERGE run.
Matching on low-cardinality columns. If your match condition is a broad column (like country), Delta will scan the entire table. Always match on the actual business key and ensure your table is Z-Ordered by it.
Scheduling in Databricks Workflows
Set up a Databricks Job with:
- Job cluster (not interactive): cheaper, isolated, no shared state
- Retry policy: 2 retries with 5-minute delay — handles transient Azure storage issues
- Widget defaults:
full_refresh = falsein the job definition; set totruemanually when needed
The full_refresh widget is the safety valve. When schema changes break the incremental path, you run once with full_refresh=true and the pipeline resets cleanly.
Production checklist
- [ ] Watermark table exists and has a row for this pipeline before first run
- [ ] Source table has an indexed
updated_atcolumn (avoids full scans) - [ ] Deduplication step before MERGE
- [ ]
full_refreshwidget wired up and tested - [ ] OPTIMIZE scheduled separately (not inline)
- [ ] Job alerts configured for failure notification