todos os projetos

Global Inflation Tracker

↗ GitHubactive

Built on Databricks Community Edition, pulling CPI data from the FRED API for Brazil, Japan, UK, and USA. Bronze stores the raw API response. Silver removes duplicates and cuts each series to a shared date range so all four countries cover the same window. Gold maps FRED's series IDs to country names and builds the final table. The CSV here came straight out of that Gold layer.

PySparkDelta LakeFRED APIDatabricksPandas
pipeline
FRED APIBronzeSilverGoldCSV export
Databricks Community Edition — inflation pipeline workspace
01 · Bronze — ingestão FRED APIinflation.bronze.fred_global_inflation_raw
import requests
import pandas as pd

from pyspark.sql.functions import (
    col,
    current_timestamp,
    to_date
)

# COMMAND ----------

SERIES = [
    "FPCPITOTLZGBRA",  # Brazil
    "FPCPITOTLZGUSA",  # United States
    "FPCPITOTLZGJPN",  # Japan
    "FPCPITOTLZGGBR"   # United Kingdom
]

BASE_URL = "https://api.stlouisfed.org/fred/series/observations"

all_data = []

# COMMAND ----------

for series_id in SERIES:

    params = {
        "series_id": series_id,
        "api_key": API_KEY,
        "file_type": "json"
    }

    response = requests.get(BASE_URL, params=params, timeout=30)
    response.raise_for_status()

    observations = response.json().get("observations", [])

    for row in observations:
        all_data.append({
            "series_id": series_id,
            "date": row.get("date"),
            "value": row.get("value")
        })

# COMMAND ----------

pdf = pd.DataFrame(all_data)
spark_df = spark.createDataFrame(pdf)

# COMMAND ----------

bronze_df = (
    spark_df
    .filter(col("value") != ".")
    .withColumn("date", to_date(col("date")))
    .withColumn("value", col("value").cast("double"))
    .withColumn("ingestion_timestamp", current_timestamp())
)

# COMMAND ----------

bronze_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable("inflation.bronze.fred_global_inflation_raw")
02 · Silver — dedup & alinhamento de datasinflation.silver.fred_global_inflation_silver
from pyspark.sql.functions import (
    col,
    min as spark_min,
    max as spark_max,
    count,
    current_timestamp
)

# COMMAND ----------

BRONZE_TABLE = "inflation.bronze.fred_global_inflation_raw"
SILVER_TABLE = "inflation.silver.fred_global_inflation_silver"

spark.sql("CREATE SCHEMA IF NOT EXISTS inflation.silver")

bronze_df = spark.table(BRONZE_TABLE)

# COMMAND ----------

# Duplicate check
duplicate_check_df = (
    bronze_df
    .groupBy("series_id", "date")
    .agg(count("*").alias("duplicate_count"))
    .filter(col("duplicate_count") > 1)
)

dedup_df = bronze_df.dropDuplicates(["series_id", "date"])

# COMMAND ----------

# Align date ranges across all series
series_date_limits_df = (
    dedup_df
    .groupBy("series_id")
    .agg(
        spark_min("date").alias("series_min_date"),
        spark_max("date").alias("series_max_date"),
        count("*").alias("total_rows")
    )
)

global_date_limits = (
    series_date_limits_df
    .agg(
        spark_max("series_min_date").alias("global_min_date"),
        spark_min("series_max_date").alias("global_max_date")
    )
    .collect()[0]
)

global_min_date = global_date_limits["global_min_date"]
global_max_date = global_date_limits["global_max_date"]

# COMMAND ----------

silver_df = (
    dedup_df
    .filter(
        (col("date") >= global_min_date) &
        (col("date") <= global_max_date)
    )
    .withColumn("silver_processed_timestamp", current_timestamp())
)

# COMMAND ----------

silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(SILVER_TABLE)
03 · Gold — camada analíticainflation.gold.global_inflation_analytics
from pyspark.sql.functions import col, when, current_timestamp

# COMMAND ----------

SILVER_TABLE = "inflation.silver.fred_global_inflation_silver"
GOLD_TABLE   = "inflation.gold.global_inflation_analytics"

spark.sql("CREATE SCHEMA IF NOT EXISTS inflation.gold")

silver_df = spark.table(SILVER_TABLE)

# COMMAND ----------

# Map FRED series IDs to country names
gold_df = (
    silver_df
    .withColumn(
        "countries",
        when(col("series_id") == "FPCPITOTLZGBRA", "Brazil")
        .when(col("series_id") == "FPCPITOTLZGUSA", "United States")
        .when(col("series_id") == "FPCPITOTLZGJPN", "Japan")
        .when(col("series_id") == "FPCPITOTLZGGBR", "United Kingdom")
        .otherwise("Unknown")
    )
    .withColumnRenamed("value", "inflation")
    .drop("series_id")
)

# COMMAND ----------

gold_df = (
    gold_df
    .select(
        "countries",
        "date",
        "inflation",
        "ingestion_timestamp",
        "silver_processed_timestamp"
    )
    .orderBy("countries", "date")
)

# COMMAND ----------

gold_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(GOLD_TABLE)
arquitetura
FRED APIfonte externa
Federal Reserve Economic Data — séries anuais de IPC para Brasil, Japão, UK e EUA. IDs: FPCPITOTLZGBRA, FPCPITOTLZGUSA, FPCPITOTLZGJPN, FPCPITOTLZGGBR.
Camada Bronzeingestão bruta
Extrai respostas brutas da API, converte datas e valores numéricos, grava ingestion_timestamp. Sem lógica de negócio — fidelidade da fonte preservada.
Camada Silverdeduplicação & harmonização
Remove registros duplicados. Alinha as quatro séries na janela temporal compartilhada (máximo das datas de início, mínimo das datas de fim) e grava silver_processed_timestamp.
Camada Goldsemântico · pronto para análise
Mapeia IDs de séries FRED para nomes de países legíveis, renomeia value para inflation, reordena colunas. Dataset pronto para consumo analítico.
Streamlit Analyticsvisualização interativa
Lê o export CSV da camada Gold. Filtros por país, slider de período, alternância log/linear, tooltips com delta anual e indicador de pico, e gráfico Plotly interativo.
decisões de engenharia
Bronze preserva a fidelidade da fonte bruta
A camada Bronze escreve exatamente o que a API FRED retorna — sem transformações, sem renomeações. Se algo quebrar no downstream, o payload bruto está sempre disponível para re-derivação.
Silver harmoniza janelas temporais entre todas as séries
Séries diferentes do FRED começam e terminam em datas distintas. A Silver calcula as datas globais min/max compartilhadas pelas quatro e corta cada uma para essa janela — nenhum país tem vantagem de intervalo ao comparar tendências.
Gold aplica nomenclatura semântica da camada de negócio
IDs de séries FRED (FPCPITOTLZGBRA) são códigos de máquina. A Gold os mapeia para nomes de países — a camada analítica fala a linguagem do domínio, não da API. A coluna também é renomeada de value para inflation.
Delta Lake garante consistência transacional
Cada camada grava como tabela Delta com overwriteSchema habilitado. Transações ACID garantem que execuções com falha deixam o catálogo no estado anterior — ou o write é confirmado por completo ou não ocorre.
Arquitetura Medallion permite reconstrução independente de camadas
Bronze/Silver/Gold é uma separação intencional de responsabilidades: ingere bruto, limpa e harmoniza, serve pronto para análise. Cada camada tem responsabilidade única e pode ser reconstruída sem afetar as demais.
sobre o dataset
Fonte
FRED Economic Data (Federal Reserve)
Países
Brazil · Japan · United Kingdom · United States
Período histórico
1981 – 2024
Observações
176 linhas · 44 por país
Atualização
Anual · re-execução manual do pipeline
Formato de armazenamento
Delta Lake (Databricks Community Edition)
Arquitetura
Medallion — Bronze / Silver / Gold
análise interativa