LaelaZ's picture
Deploy CommercePipeline to HF Spaces (Docker)
16decd8 verified
"""Transform stage: build staging views and mart tables from SQL files.
The SQL lives in ``pipeline/sql`` and is executed in a deterministic, dependency
-aware order. Keeping the transformation in plain ``.sql`` files (rather than
inline strings) means the models read like a dbt project and can be inspected or
run by hand with the DuckDB CLI.
"""
from __future__ import annotations
import logging
import duckdb
from pipeline.config import Settings, get_settings
log = logging.getLogger("commerce.transform")
# Execution order. Staging first (views over raw), then the intermediate model,
# then the marts (which may depend on the intermediate model and each other-free).
STAGING_MODELS = [
"staging/stg_customers.sql",
"staging/stg_products.sql",
"staging/stg_orders.sql",
"staging/stg_order_items.sql",
"staging/stg_events.sql",
]
MART_MODELS = [
"marts/int_order_revenue.sql", # intermediate; must precede the marts below
"marts/daily_revenue.sql",
"marts/top_products.sql",
"marts/customer_cohort_retention.sql",
"marts/funnel_conversion.sql",
]
def _run_model(con: duckdb.DuckDBPyConnection, sql_dir, rel_path: str) -> None:
path = sql_dir / rel_path
sql = path.read_text(encoding="utf-8")
log.info("building model %s", rel_path)
con.execute(sql)
def run(con: duckdb.DuckDBPyConnection | None = None, settings: Settings | None = None) -> list[str]:
"""Execute all models. Returns the list of mart relation names built."""
s = settings or get_settings()
owns_con = con is None
con = con or duckdb.connect(str(s.db_path))
try:
con.execute("CREATE SCHEMA IF NOT EXISTS staging;")
con.execute("CREATE SCHEMA IF NOT EXISTS marts;")
for model in STAGING_MODELS:
_run_model(con, s.sql_dir, model)
for model in MART_MODELS:
_run_model(con, s.sql_dir, model)
marts = [
row[0]
for row in con.execute(
"SELECT table_name FROM information_schema.tables "
"WHERE table_schema = 'marts' ORDER BY table_name;"
).fetchall()
]
log.info("built marts: %s", marts)
return marts
finally:
if owns_con:
con.close()
if __name__ == "__main__": # pragma: no cover
logging.basicConfig(level=logging.INFO, format="%(message)s")
print(run())