Data quality problems always surface in Gold. A dashboard breaks, a metric looks wrong, a model produces nonsense. You trace it back through Silver, then Bronze, and find the root cause in a source system that changed its schema six weeks ago without telling anyone.
Data contracts don't solve every data quality problem. But they solve the one that costs the most: undocumented, unannounced breaking changes from producers.
What a data contract is
A data contract is a formal agreement between a data producer (source system, upstream team) and a data consumer (your pipeline, your Bronze table) about the structure and semantics of the data being exchanged.
At minimum, a contract defines the schema (column names, data types, nullable constraints), the semantics of each field beyond just the name, the SLA for freshness and acceptable latency, and who owns the contract. Contracts live in version control alongside your pipeline code. They are YAML files, not Confluence pages.
Defining a contract in YAML
A practical contract definition for a customers table:
# contracts/customers.yaml
contract_version: "1.2.0"
table: bronze.customers
owner: "data-platform-team"
source_system: "crm-api"
sla:
freshness_hours: 4
row_count_min: 10000
schema:
- name: customer_id
type: integer
nullable: false
description: "Primary key. Unique identifier from CRM system."
- name: email
type: string
nullable: false
description: "Customer email address. Must be valid format."
constraints:
pattern: "^[^@]+@[^@]+\\.[^@]+$"
- name: status
type: string
nullable: false
description: "Account status."
constraints:
allowed_values: ["active", "inactive", "pending"]
- name: created_at
type: timestamp
nullable: false
description: "UTC timestamp of account creation."
- name: updated_at
type: timestamp
nullable: false
description: "UTC timestamp of last modification. Used as watermark."
Version the contract. When a producer adds a new nullable column, that's a non-breaking change — bump the minor version. When they rename or drop a column, that's a breaking change — bump the major version and coordinate with consumers.
Implementing the validator
The contract is useless without enforcement. Build a validation step that runs before data lands in Bronze:
import yaml
import re
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from dataclasses import dataclass, field
from typing import List
@dataclass
class ValidationResult:
passed: bool
violations: List[str] = field(default_factory=list)
def validate_against_contract(df: DataFrame, contract_path: str) -> ValidationResult:
with open(contract_path) as f:
contract = yaml.safe_load(f)
violations = []
schema_fields = {col["name"]: col for col in contract["schema"]}
# 1. Check for missing required columns
actual_columns = set(df.columns)
required_columns = {
name for name, spec in schema_fields.items()
if not spec.get("nullable", True)
}
missing = required_columns - actual_columns
if missing:
violations.append(f"Missing required columns: {missing}")
# 2. Check for null violations in non-nullable columns
for col_name, spec in schema_fields.items():
if col_name not in actual_columns:
continue
if not spec.get("nullable", True):
null_count = df.filter(F.col(col_name).isNull()).count()
if null_count > 0:
violations.append(
f"Column '{col_name}' has {null_count} null values (not allowed)"
)
# 3. Check allowed values constraints
for col_name, spec in schema_fields.items():
if col_name not in actual_columns:
continue
allowed = spec.get("constraints", {}).get("allowed_values")
if allowed:
invalid = (
df.filter(~F.col(col_name).isin(allowed))
.count()
)
if invalid > 0:
violations.append(
f"Column '{col_name}' has {invalid} rows with values "
f"outside allowed set: {allowed}"
)
# 4. Check minimum row count SLA
row_count = df.count()
min_rows = contract.get("sla", {}).get("row_count_min", 0)
if row_count < min_rows:
violations.append(
f"Row count {row_count} is below minimum SLA of {min_rows}"
)
return ValidationResult(
passed=len(violations) == 0,
violations=violations
)
Call it in your ingestion notebook before the Bronze write:
result = validate_against_contract(source_df, "contracts/customers.yaml")
if not result.passed:
print("CONTRACT VIOLATIONS DETECTED:")
for v in result.violations:
print(f" - {v}")
# Log violations to an audit table
violations_df = spark.createDataFrame(
[(v,) for v in result.violations],
["violation_message"]
).withColumn("pipeline_run_id", F.lit(dbutils.notebook.entry_point.getDbutils().notebook().getContext().currentRunId().get())) \
.withColumn("contract_table", F.lit("bronze.customers")) \
.withColumn("detected_at", F.current_timestamp())
violations_df.write.format("delta").mode("append").saveAsTable("control.contract_violations")
raise Exception("Data contract validation failed. Pipeline halted.")
# Contract passed — write to Bronze
source_df.write.format("delta").mode("append").saveAsTable("bronze.customers")
Fail-fast vs. quarantine lane
The example above uses fail-fast: if the contract is violated, the pipeline stops. This is the right default for non-nullable columns and missing columns — you never want bad structural data in Bronze.
But not all violations warrant a full stop. A small percentage of rows with invalid status values might be acceptable. You quarantine them and proceed with the clean rows.
# Split clean and dirty rows
clean_df = source_df.filter(F.col("status").isin(["active", "inactive", "pending"]))
quarantine_df = source_df.filter(~F.col("status").isin(["active", "inactive", "pending"]))
# Write clean rows to Bronze
clean_df.write.format("delta").mode("append").saveAsTable("bronze.customers")
# Write quarantined rows to a holding area for investigation
if quarantine_df.count() > 0:
quarantine_df \
.withColumn("quarantine_reason", F.lit("invalid_status_value")) \
.withColumn("quarantined_at", F.current_timestamp()) \
.write.format("delta").mode("append") \
.saveAsTable("bronze._quarantine_customers")
The quarantine table accumulates violations for review. A weekly report shows producers what they're sending that doesn't meet the contract. Producers fix their output. The feedback loop closes.
Integrating validation into ADF
If your Bronze ingestion runs through an ADF Copy activity, you can trigger validation as a pre-copy Web Activity that calls an Azure Function:
{
"name": "Validate_Contract",
"type": "WebActivity",
"typeProperties": {
"url": "https://your-func.azurewebsites.net/api/validate",
"method": "POST",
"body": {
"contract": "contracts/customers.yaml",
"source_path": "abfss://landing@storage.dfs.core.windows.net/customers/"
}
},
"dependsOn": [],
"policy": { "timeout": "0.00:05:00", "retry": 1 }
}
The Azure Function reads the landing zone data, runs the validation, and returns HTTP 200 (pass) or HTTP 422 (fail with violation details). ADF fails the pipeline on a non-200 response, before any data reaches Bronze.
Real example: the NULL explosion
Six weeks into a migration project, a source team updated their ETL to filter out records with null emails. They didn't update the contract. Their ETL had a bug that accidentally set email to null for all new records during a 3-day window.
Because we had contract validation in place:
- The pipeline failed on the first run of the affected window
- The violation was logged immediately: "Column 'email' has 45,302 null values"
- The source team was notified within minutes via an ADF alert
- No null emails reached Bronze, Silver, or Gold
- The source team fixed the bug and resent the data
Without contracts: 45,302 null emails would have silently propagated through the entire Lakehouse. Gold-layer dashboards would have shown broken aggregations for three days before anyone noticed.
That one incident justified the entire contract framework.