What is the Silver layer?
The Silver layer is where raw Bronze data becomes trusted data. It applies business rules, enforces schema, deduplicates records, and produces clean, conformed tables that downstream consumers — Gold aggregations, ML models, analysts — can rely on.
If Bronze is the raw signal, Silver is where the noise gets removed.
What Silver is responsible for
- Deduplication — remove duplicate records from Bronze
- Type enforcement — cast strings to proper types
- Null handling — replace or flag nulls per business rules
- Schema conformance — drop or rename columns to match the canonical model
- MERGE / upsert — apply CDC events to maintain current state
- Basic validation — reject or quarantine records that violate constraints
Schema first
Before writing any transformation logic, define the Silver schema explicitly. This prevents schema drift from surprising downstream tables.
from pyspark.sql.types import (
StructType, StructField,
StringType, IntegerType, TimestampType, DecimalType, BooleanType
)
silver_orders_schema = StructType([
StructField("order_id", StringType(), nullable=False),
StructField("customer_id", StringType(), nullable=False),
StructField("status", StringType(), nullable=True),
StructField("total_amount", DecimalType(18, 2), nullable=True),
StructField("order_date", TimestampType(), nullable=False),
StructField("is_deleted", BooleanType(), nullable=False),
StructField("_updated_at", TimestampType(), nullable=False),
])
Create the table with this schema before the first run:
CREATE TABLE IF NOT EXISTS catalog_prod.silver.orders (
order_id STRING NOT NULL,
customer_id STRING NOT NULL,
status STRING,
total_amount DECIMAL(18,2),
order_date TIMESTAMP NOT NULL,
is_deleted BOOLEAN NOT NULL,
_updated_at TIMESTAMP NOT NULL
)
USING DELTA
TBLPROPERTIES (
'delta.enableChangeDataFeed' = 'true',
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
);
Enabling Change Data Feed (CDF) on Silver tables lets Gold consumers read only changed rows, avoiding full table scans on every refresh.
Reading from Bronze
Always read Bronze incrementally using Delta streaming:
df_bronze = (
spark.readStream
.format("delta")
.option("readChangeFeed", "false") # Bronze is append-only
.option("maxFilesPerTrigger", 1000)
.table("catalog_prod.bronze.orders_cdc")
)
Parsing and cleaning CDC events
If Bronze holds raw CDC JSON (e.g., from Debezium), parse the envelope first:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
# Parse the CDC envelope
df_parsed = (
df_bronze
.withColumn("payload", F.from_json(F.col("body"), cdc_schema))
.select(
F.col("payload.after.order_id").alias("order_id"),
F.col("payload.after.customer_id").alias("customer_id"),
F.col("payload.after.status").alias("status"),
F.col("payload.after.total_amount").cast("decimal(18,2)").alias("total_amount"),
F.to_timestamp(F.col("payload.after.order_date"), "yyyy-MM-dd'T'HH:mm:ss").alias("order_date"),
F.col("payload.op").alias("_op"), # c=create, u=update, d=delete
F.current_timestamp().alias("_updated_at"),
)
.withColumn("is_deleted", F.col("_op") == "d")
)
MERGE — the core Silver pattern
MERGE handles inserts, updates, and soft deletes in a single atomic operation:
from delta.tables import DeltaTable
def upsert_to_silver(batch_df, batch_id):
silver_table = DeltaTable.forName(spark, "catalog_prod.silver.orders")
# Deduplicate within the micro-batch before merging
# Keep the latest record per order_id in this batch
deduped = (
batch_df
.withColumn("_rank", F.row_number().over(
Window.partitionBy("order_id").orderBy(F.col("_updated_at").desc())
))
.filter(F.col("_rank") == 1)
.drop("_rank", "_op")
)
(
silver_table.alias("target")
.merge(
deduped.alias("source"),
"target.order_id = source.order_id"
)
.whenMatchedUpdate(
condition="source.is_deleted = false",
set={
"status": "source.status",
"total_amount": "source.total_amount",
"order_date": "source.order_date",
"is_deleted": "source.is_deleted",
"_updated_at": "source._updated_at",
}
)
.whenMatchedUpdate(
condition="source.is_deleted = true",
set={"is_deleted": "true", "_updated_at": "source._updated_at"}
)
.whenNotMatchedInsertAll()
.execute()
)
(
df_parsed.writeStream
.format("delta")
.foreachBatch(upsert_to_silver)
.option("checkpointLocation", "/lakehouse/silver/orders/_checkpoint")
.trigger(availableNow=True)
.start()
)
Data quality with expectations
Use simple inline validations to quarantine bad records before they poison Silver:
def apply_quality_checks(df):
"""Return (valid_df, quarantine_df)"""
checks = [
(F.col("order_id").isNotNull(), "order_id_null"),
(F.col("customer_id").isNotNull(), "customer_id_null"),
(F.col("order_date").isNotNull(), "order_date_null"),
(F.col("total_amount") >= 0, "negative_amount"),
]
failed_col = F.lit(None).cast(StringType())
for condition, label in checks:
failed_col = F.when(~condition, F.lit(label)).otherwise(failed_col)
df_with_flag = df.withColumn("_failed_check", failed_col)
valid = df_with_flag.filter(F.col("_failed_check").isNull()).drop("_failed_check")
quarantine = df_with_flag.filter(F.col("_failed_check").isNotNull())
return valid, quarantine
Write quarantined records to a dedicated table so engineers can inspect and replay them later:
def process_batch(batch_df, batch_id):
valid, quarantine = apply_quality_checks(batch_df)
if quarantine.count() > 0:
(
quarantine
.withColumn("_quarantined_at", F.current_timestamp())
.withColumn("_batch_id", F.lit(batch_id))
.write.format("delta")
.mode("append")
.saveAsTable("catalog_prod.bronze.orders_quarantine")
)
upsert_to_silver(valid, batch_id)
Handling late-arriving data
Bronze may deliver records out of order. Silver needs to handle this without silently overwriting newer data with older replayed events:
# When merging, always check whether the incoming record is newer
.whenMatchedUpdate(
condition="source._updated_at > target._updated_at AND source.is_deleted = false",
set={ ... }
)
Monitoring Silver quality
-- Daily null rate per column
SELECT
DATE(_updated_at) AS day,
COUNT(*) AS total_records,
SUM(CASE WHEN order_id IS NULL THEN 1 ELSE 0 END) AS null_order_id,
SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) AS null_customer_id,
SUM(CASE WHEN total_amount IS NULL THEN 1 ELSE 0 END) AS null_total_amount,
SUM(CASE WHEN is_deleted = true THEN 1 ELSE 0 END) AS soft_deleted
FROM catalog_prod.silver.orders
WHERE DATE(_updated_at) >= CURRENT_DATE() - INTERVAL 7 DAYS
GROUP BY 1
ORDER BY 1 DESC;
-- Quarantine volume — should stay near zero
SELECT
DATE(_quarantined_at) AS day,
_failed_check,
COUNT(*) AS records
FROM catalog_prod.bronze.orders_quarantine
WHERE DATE(_quarantined_at) >= CURRENT_DATE() - INTERVAL 7 DAYS
GROUP BY 1, 2
ORDER BY 1 DESC, 3 DESC;
What Silver is NOT
- Silver does not aggregate data — that is Gold's job
- Silver does not model business entities into star schema — that happens at Gold
- Silver does not serve BI tools directly — it is an intermediate, not the final product
Keep Silver clean and consistent. Its tables should represent the current state of real-world entities — orders, customers, events — with full fidelity and no business bias.