Architecture·2026-04-02·5 min read·

How to setup your Silver layer in Databricks

Build a production-grade Silver layer in Databricks — deduplication, MERGE patterns, schema enforcement, and data quality checks with Delta Lake.

What is the Silver layer?

The Silver layer is where raw Bronze data becomes trusted data. It applies business rules, enforces schema, deduplicates records, and produces clean, conformed tables that downstream consumers — Gold aggregations, ML models, analysts — can rely on.

If Bronze is the raw signal, Silver is where the noise gets removed.

What Silver is responsible for

  • Deduplication — remove duplicate records from Bronze
  • Type enforcement — cast strings to proper types
  • Null handling — replace or flag nulls per business rules
  • Schema conformance — drop or rename columns to match the canonical model
  • MERGE / upsert — apply CDC events to maintain current state
  • Basic validation — reject or quarantine records that violate constraints

Schema first

Before writing any transformation logic, define the Silver schema explicitly. This prevents schema drift from surprising downstream tables.

from pyspark.sql.types import (
    StructType, StructField,
    StringType, IntegerType, TimestampType, DecimalType, BooleanType
)

silver_orders_schema = StructType([
    StructField("order_id",      StringType(),            nullable=False),
    StructField("customer_id",   StringType(),            nullable=False),
    StructField("status",        StringType(),            nullable=True),
    StructField("total_amount",  DecimalType(18, 2),      nullable=True),
    StructField("order_date",    TimestampType(),         nullable=False),
    StructField("is_deleted",    BooleanType(),           nullable=False),
    StructField("_updated_at",   TimestampType(),         nullable=False),
])

Create the table with this schema before the first run:

CREATE TABLE IF NOT EXISTS catalog_prod.silver.orders (
    order_id      STRING        NOT NULL,
    customer_id   STRING        NOT NULL,
    status        STRING,
    total_amount  DECIMAL(18,2),
    order_date    TIMESTAMP     NOT NULL,
    is_deleted    BOOLEAN       NOT NULL,
    _updated_at   TIMESTAMP     NOT NULL
)
USING DELTA
TBLPROPERTIES (
    'delta.enableChangeDataFeed' = 'true',
    'delta.autoOptimize.optimizeWrite' = 'true',
    'delta.autoOptimize.autoCompact' = 'true'
);

Enabling Change Data Feed (CDF) on Silver tables lets Gold consumers read only changed rows, avoiding full table scans on every refresh.

Reading from Bronze

Always read Bronze incrementally using Delta streaming:

df_bronze = (
    spark.readStream
    .format("delta")
    .option("readChangeFeed", "false")        # Bronze is append-only
    .option("maxFilesPerTrigger", 1000)
    .table("catalog_prod.bronze.orders_cdc")
)

Parsing and cleaning CDC events

If Bronze holds raw CDC JSON (e.g., from Debezium), parse the envelope first:

from pyspark.sql import functions as F
from pyspark.sql.types import StringType

# Parse the CDC envelope
df_parsed = (
    df_bronze
    .withColumn("payload", F.from_json(F.col("body"), cdc_schema))
    .select(
        F.col("payload.after.order_id").alias("order_id"),
        F.col("payload.after.customer_id").alias("customer_id"),
        F.col("payload.after.status").alias("status"),
        F.col("payload.after.total_amount").cast("decimal(18,2)").alias("total_amount"),
        F.to_timestamp(F.col("payload.after.order_date"), "yyyy-MM-dd'T'HH:mm:ss").alias("order_date"),
        F.col("payload.op").alias("_op"),               # c=create, u=update, d=delete
        F.current_timestamp().alias("_updated_at"),
    )
    .withColumn("is_deleted", F.col("_op") == "d")
)

MERGE — the core Silver pattern

MERGE handles inserts, updates, and soft deletes in a single atomic operation:

from delta.tables import DeltaTable

def upsert_to_silver(batch_df, batch_id):
    silver_table = DeltaTable.forName(spark, "catalog_prod.silver.orders")

    # Deduplicate within the micro-batch before merging
    # Keep the latest record per order_id in this batch
    deduped = (
        batch_df
        .withColumn("_rank", F.row_number().over(
            Window.partitionBy("order_id").orderBy(F.col("_updated_at").desc())
        ))
        .filter(F.col("_rank") == 1)
        .drop("_rank", "_op")
    )

    (
        silver_table.alias("target")
        .merge(
            deduped.alias("source"),
            "target.order_id = source.order_id"
        )
        .whenMatchedUpdate(
            condition="source.is_deleted = false",
            set={
                "status":       "source.status",
                "total_amount": "source.total_amount",
                "order_date":   "source.order_date",
                "is_deleted":   "source.is_deleted",
                "_updated_at":  "source._updated_at",
            }
        )
        .whenMatchedUpdate(
            condition="source.is_deleted = true",
            set={"is_deleted": "true", "_updated_at": "source._updated_at"}
        )
        .whenNotMatchedInsertAll()
        .execute()
    )

(
    df_parsed.writeStream
    .format("delta")
    .foreachBatch(upsert_to_silver)
    .option("checkpointLocation", "/lakehouse/silver/orders/_checkpoint")
    .trigger(availableNow=True)
    .start()
)

Data quality with expectations

Use simple inline validations to quarantine bad records before they poison Silver:

def apply_quality_checks(df):
    """Return (valid_df, quarantine_df)"""
    checks = [
        (F.col("order_id").isNotNull(),    "order_id_null"),
        (F.col("customer_id").isNotNull(), "customer_id_null"),
        (F.col("order_date").isNotNull(),  "order_date_null"),
        (F.col("total_amount") >= 0,       "negative_amount"),
    ]

    failed_col = F.lit(None).cast(StringType())
    for condition, label in checks:
        failed_col = F.when(~condition, F.lit(label)).otherwise(failed_col)

    df_with_flag = df.withColumn("_failed_check", failed_col)

    valid       = df_with_flag.filter(F.col("_failed_check").isNull()).drop("_failed_check")
    quarantine  = df_with_flag.filter(F.col("_failed_check").isNotNull())

    return valid, quarantine

Write quarantined records to a dedicated table so engineers can inspect and replay them later:

def process_batch(batch_df, batch_id):
    valid, quarantine = apply_quality_checks(batch_df)

    if quarantine.count() > 0:
        (
            quarantine
            .withColumn("_quarantined_at", F.current_timestamp())
            .withColumn("_batch_id", F.lit(batch_id))
            .write.format("delta")
            .mode("append")
            .saveAsTable("catalog_prod.bronze.orders_quarantine")
        )

    upsert_to_silver(valid, batch_id)

Handling late-arriving data

Bronze may deliver records out of order. Silver needs to handle this without silently overwriting newer data with older replayed events:

# When merging, always check whether the incoming record is newer
.whenMatchedUpdate(
    condition="source._updated_at > target._updated_at AND source.is_deleted = false",
    set={ ... }
)

Monitoring Silver quality

-- Daily null rate per column
SELECT
    DATE(_updated_at)        AS day,
    COUNT(*)                 AS total_records,
    SUM(CASE WHEN order_id IS NULL      THEN 1 ELSE 0 END) AS null_order_id,
    SUM(CASE WHEN customer_id IS NULL   THEN 1 ELSE 0 END) AS null_customer_id,
    SUM(CASE WHEN total_amount IS NULL  THEN 1 ELSE 0 END) AS null_total_amount,
    SUM(CASE WHEN is_deleted = true     THEN 1 ELSE 0 END) AS soft_deleted
FROM catalog_prod.silver.orders
WHERE DATE(_updated_at) >= CURRENT_DATE() - INTERVAL 7 DAYS
GROUP BY 1
ORDER BY 1 DESC;
-- Quarantine volume — should stay near zero
SELECT
    DATE(_quarantined_at)  AS day,
    _failed_check,
    COUNT(*)               AS records
FROM catalog_prod.bronze.orders_quarantine
WHERE DATE(_quarantined_at) >= CURRENT_DATE() - INTERVAL 7 DAYS
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC;

What Silver is NOT

  • Silver does not aggregate data — that is Gold's job
  • Silver does not model business entities into star schema — that happens at Gold
  • Silver does not serve BI tools directly — it is an intermediate, not the final product

Keep Silver clean and consistent. Its tables should represent the current state of real-world entities — orders, customers, events — with full fidelity and no business bias.