LaelaZ commited on
Commit
16decd8
·
verified ·
1 Parent(s): 503e4c7

Deploy CommercePipeline to HF Spaces (Docker)

Browse files
.streamlit/config.toml ADDED
@@ -0,0 +1,57 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # CommercePipeline — dashboard theme.
2
+ # Identity: ANALYTICS / BI — crisp, data-forward, professional.
3
+ #
4
+ # Distinct visual identity: a confident TEAL accent on cool slate neutrals,
5
+ # with monospace numerals for figures (the data-tool hallmark). This is its
6
+ # own identity, deliberately separate from the sibling portfolio projects.
7
+
8
+ [theme]
9
+ base = "light"
10
+
11
+ # --- Brand ---
12
+ primaryColor = "#0d9488" # teal-600 — accent / interactive
13
+ backgroundColor = "#f5f7f9" # cool, slate-tinted canvas
14
+ secondaryBackgroundColor = "#ffffff" # white — cards / inputs
15
+ textColor = "#0f1b2a" # near slate-900 — body text
16
+ borderColor = "#e2e8f0" # hairline borders on cards/inputs
17
+ linkColor = "#0d9488"
18
+
19
+ # --- Type ---
20
+ font = "sans-serif" # clean system/Inter-like body stack
21
+ headingFont = "sans-serif"
22
+ codeFont = "monospace" # tabular numerals + code blocks
23
+ baseRadius = "0.85rem" # rounded-xl cards, consistent corners
24
+ showWidgetBorder = true
25
+
26
+ # --- Cohesive chart palettes (one identity across every Altair chart) ---
27
+ # Teal-led categorical scale (data-forward, not a rainbow).
28
+ chartCategoricalColors = [
29
+ "#0d9488", # teal-600
30
+ "#2563eb", # blue-600
31
+ "#7c3aed", # violet-600
32
+ "#d97706", # amber-600
33
+ "#059669", # emerald-600
34
+ "#db2777", # pink-600
35
+ "#0891b2", # cyan-600
36
+ ]
37
+ # Single-hue teal ramp for sequential / heatmap data.
38
+ chartSequentialColors = [
39
+ "#ecfeff", "#cffafe", "#99f6e4", "#5eead4",
40
+ "#2dd4bf", "#14b8a6", "#0d9488", "#0f766e", "#115e59",
41
+ ]
42
+
43
+ # --- Dark sidebar = a focused "control rail", BI-product style ---
44
+ [theme.sidebar]
45
+ backgroundColor = "#0f1b2a"
46
+ secondaryBackgroundColor = "#16263a"
47
+ textColor = "#cbd5e1"
48
+ primaryColor = "#2dd4bf" # brighter teal pops on dark
49
+ borderColor = "#22344a"
50
+ linkColor = "#2dd4bf"
51
+
52
+ [server]
53
+ headless = true
54
+ runOnSave = true
55
+
56
+ [browser]
57
+ gatherUsageStats = false
Dockerfile ADDED
@@ -0,0 +1,29 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # CommercePipeline -- single image that runs the pipeline and serves the dashboard.
2
+ FROM python:3.11-slim
3
+
4
+ ENV PYTHONUNBUFFERED=1 \
5
+ PYTHONDONTWRITEBYTECODE=1 \
6
+ PIP_NO_CACHE_DIR=1 \
7
+ PIP_DISABLE_PIP_VERSION_CHECK=1
8
+
9
+ WORKDIR /app
10
+
11
+ # Install dependencies first for better layer caching.
12
+ COPY requirements.txt ./
13
+ RUN pip install -r requirements.txt
14
+
15
+ # Copy the project.
16
+ COPY pipeline ./pipeline
17
+ COPY dashboard ./dashboard
18
+ COPY pyproject.toml README.md ./
19
+
20
+ # Build the warehouse at image-build time so the container starts with data ready.
21
+ # (Re-runnable at container start via the entrypoint below.)
22
+ RUN python -m pipeline run
23
+
24
+ EXPOSE 8501
25
+ HEALTHCHECK --interval=30s --timeout=5s --start-period=10s \
26
+ CMD python -c "import urllib.request,sys; sys.exit(0 if urllib.request.urlopen('http://localhost:8501/_stcore/health').read()==b'ok' else 1)"
27
+
28
+ # Rebuild marts on start (cheap, ~1s) then serve the dashboard.
29
+ CMD ["sh", "-c", "python -m pipeline run && streamlit run dashboard/app.py --server.port 8501 --server.address 0.0.0.0 --server.headless true"]
LICENSE ADDED
@@ -0,0 +1,21 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ MIT License
2
+
3
+ Copyright (c) 2026 Laela Zorana
4
+
5
+ Permission is hereby granted, free of charge, to any person obtaining a copy
6
+ of this software and associated documentation files (the "Software"), to deal
7
+ in the Software without restriction, including without limitation the rights
8
+ to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
+ copies of the Software, and to permit persons to whom the Software is
10
+ furnished to do so, subject to the following conditions:
11
+
12
+ The above copyright notice and this permission notice shall be included in all
13
+ copies or substantial portions of the Software.
14
+
15
+ THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
+ IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
+ FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
+ AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
+ LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
+ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
+ SOFTWARE.
Makefile ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # CommercePipeline -- Makefile DAG.
2
+ # The pipeline target is itself a small DAG: ingest -> load -> transform -> quality,
3
+ # wired through file-stamp prerequisites so `make` only reruns what changed.
4
+
5
+ PYTHON ?= python3
6
+ PORT ?= 8501
7
+ RAW_DIR := data/raw
8
+ WAREHOUSE := data/warehouse/commerce.duckdb
9
+ RAW_STAMP := $(RAW_DIR)/.ingested
10
+
11
+ .DEFAULT_GOAL := help
12
+
13
+ .PHONY: help install ingest load transform quality pipeline test dashboard demo clean lint
14
+
15
+ help: ## Show this help
16
+ @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) \
17
+ | awk 'BEGIN {FS = ":.*?## "}; {printf " \033[36m%-12s\033[0m %s\n", $$1, $$2}'
18
+
19
+ install: ## Install runtime + dev dependencies
20
+ $(PYTHON) -m pip install -r requirements.txt
21
+
22
+ $(RAW_STAMP): pipeline/ingest.py pipeline/config.py
23
+ $(PYTHON) -m pipeline ingest
24
+ @touch $(RAW_STAMP)
25
+
26
+ ingest: $(RAW_STAMP) ## Generate the synthetic raw dataset
27
+
28
+ $(WAREHOUSE): $(RAW_STAMP) pipeline/load.py pipeline/transform.py $(wildcard pipeline/sql/**/*.sql)
29
+ $(PYTHON) -m pipeline load
30
+ $(PYTHON) -m pipeline transform
31
+
32
+ load: $(WAREHOUSE) ## Load raw files into DuckDB
33
+ transform: $(WAREHOUSE) ## Build staging + mart models
34
+
35
+ quality: $(WAREHOUSE) ## Run the data-quality gate (fails on violations)
36
+ $(PYTHON) -m pipeline quality
37
+
38
+ pipeline: ## Run the full pipeline: ingest -> load -> transform -> quality
39
+ $(PYTHON) -m pipeline run
40
+
41
+ test: ## Run the test suite
42
+ $(PYTHON) -m pytest -q
43
+
44
+ dashboard: ## Serve the Streamlit dashboard (PORT overrideable)
45
+ $(PYTHON) -m streamlit run dashboard/app.py --server.port $(PORT)
46
+
47
+ demo: pipeline ## Run the pipeline, then launch the dashboard
48
+ $(MAKE) dashboard
49
+
50
+ clean: ## Remove generated data and caches
51
+ rm -rf $(RAW_DIR)/* data/warehouse/* .pytest_cache
52
+ find . -type d -name __pycache__ -prune -exec rm -rf {} +
dashboard/app.py ADDED
@@ -0,0 +1,649 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Streamlit dashboard for the CommercePipeline marts.
2
+
3
+ Reads the DuckDB warehouse produced by the pipeline (read-only) and presents it
4
+ as a polished, data-forward BI product: a branded header, a bento-style KPI
5
+ grid, monospace numerals, one cohesive teal chart palette, and a clear
6
+ lineage / quality-gate section.
7
+
8
+ The visual identity is ANALYTICS / BI — a confident teal accent on cool slate
9
+ neutrals, distinct from the rest of the portfolio. Theme tokens live in
10
+ ``.streamlit/config.toml``; this file mirrors the same tokens so the Altair
11
+ charts and custom CSS stay in lock-step.
12
+
13
+ Run with::
14
+
15
+ streamlit run dashboard/app.py
16
+
17
+ If the warehouse does not exist yet, the app explains how to build it rather
18
+ than crashing.
19
+ """
20
+
21
+ from __future__ import annotations
22
+
23
+ import sys
24
+ import textwrap
25
+ from pathlib import Path
26
+
27
+ import altair as alt
28
+ import duckdb
29
+ import pandas as pd
30
+ import streamlit as st
31
+
32
+ # Streamlit renders via markdown, which turns 4+ space-indented lines into a code
33
+ # block — that would print our inline HTML/CSS as visible text instead of applying
34
+ # it. Dedent every HTML string so tags start at column 0 and render as HTML.
35
+ _st_markdown = st.markdown
36
+
37
+
38
+ def _dedented_markdown(body, *args, **kwargs):
39
+ if isinstance(body, str):
40
+ body = textwrap.dedent(body).strip("\n")
41
+ return _st_markdown(body, *args, **kwargs)
42
+
43
+
44
+ st.markdown = _dedented_markdown
45
+
46
+ # Make the `pipeline` package importable when run via `streamlit run`.
47
+ sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
48
+ from pipeline.config import get_settings # noqa: E402
49
+
50
+ # --- Brand palette — ANALYTICS / BI identity (teal on slate) ------------------
51
+ # Mirrors .streamlit/config.toml so charts + CSS share one language.
52
+ TEAL = "#0d9488" # teal-600 — primary accent
53
+ TEAL_500 = "#14b8a6" # teal-500 — line strokes
54
+ TEAL_SOFT = "#99f6e4" # teal-200 — area fill
55
+ INK = "#0f1b2a" # near slate-900 — body / values
56
+ SLATE_500 = "#64748b" # muted labels
57
+ GRID = "#eef2f6" # hairline chart grid
58
+ # Ordered categorical scale used across every chart for one colour identity.
59
+ CATEGORY_RANGE = ["#0d9488", "#2563eb", "#7c3aed", "#d97706", "#059669", "#db2777", "#0891b2"]
60
+ # Single-hue teal ramp for sequential / heatmap encodings.
61
+ TEAL_RAMP = ["#ecfeff", "#99f6e4", "#2dd4bf", "#0d9488", "#0f766e", "#115e59"]
62
+ # Monospace stack for tabular numerals (the data-tool hallmark).
63
+ MONO = "ui-monospace, 'SF Mono', 'JetBrains Mono', 'Roboto Mono', Menlo, monospace"
64
+
65
+ st.set_page_config(
66
+ page_title="CommercePipeline — Analytics",
67
+ page_icon="📊",
68
+ layout="wide",
69
+ initial_sidebar_state="collapsed",
70
+ menu_items={"about": "CommercePipeline · trustworthy e-commerce analytics. Built by Laela Zorana."},
71
+ )
72
+
73
+ settings = get_settings()
74
+
75
+
76
+ # --- One cohesive Altair theme for every chart -------------------------------
77
+ # A single config so all charts share one visual language: a clean sans body,
78
+ # hairline horizontal grid, no chart borders, and teal-led colour ranges.
79
+ _LABEL = {"labelColor": SLATE_500, "titleColor": SLATE_500,
80
+ "labelFontSize": 11, "titleFontSize": 11, "labelFontWeight": 500}
81
+ _CHART_CONFIG = {
82
+ "config": {
83
+ "background": "transparent",
84
+ "font": "Inter, ui-sans-serif, system-ui, -apple-system, 'Segoe UI', Roboto, sans-serif",
85
+ "view": {"stroke": "transparent"},
86
+ "axis": {
87
+ "domain": False, "ticks": False, "labelPadding": 8, "titlePadding": 12,
88
+ "gridColor": GRID, "gridWidth": 1, **_LABEL,
89
+ },
90
+ "axisX": {"grid": False},
91
+ "axisY": {"grid": True},
92
+ "legend": {**_LABEL, "symbolType": "circle", "symbolSize": 90},
93
+ "range": {"category": CATEGORY_RANGE, "heatmap": TEAL_RAMP, "ramp": TEAL_RAMP},
94
+ "title": {"color": INK, "fontSize": 13, "fontWeight": 600, "anchor": "start", "dy": -4},
95
+ "bar": {"color": TEAL},
96
+ "rect": {"stroke": "#ffffff", "strokeWidth": 1.5},
97
+ }
98
+ }
99
+
100
+ # Altair 5.5+ uses ``alt.theme``; fall back to the legacy registry on older 5.x.
101
+ try: # pragma: no cover - thin shim around the charting lib
102
+ alt.theme.register("commerce_bi", enable=True)(lambda: _CHART_CONFIG)
103
+ except AttributeError: # pragma: no cover
104
+ alt.themes.register("commerce_bi", lambda: _CHART_CONFIG)
105
+ alt.themes.enable("commerce_bi")
106
+
107
+
108
+ # --- Global styling -----------------------------------------------------------
109
+ def inject_styles() -> None:
110
+ """Hide default Streamlit chrome and apply the teal BI identity: branded
111
+ header, bento KPI cards with an accent rail, and monospace numerals."""
112
+ st.markdown(
113
+ f"""
114
+ <style>
115
+ :root {{
116
+ --teal:#0d9488; --teal-700:#0f766e; --teal-50:#f0fdfa;
117
+ --ink:#0f1b2a; --muted:#64748b; --line:#e6ebf0; --mono:{MONO};
118
+ }}
119
+ html, body, [class*="css"], .stApp,
120
+ button, input, textarea, select {{ font-family:'Inter', ui-sans-serif, system-ui, -apple-system, 'Segoe UI', Roboto, sans-serif; }}
121
+
122
+ /* Remove default Streamlit clutter. */
123
+ #MainMenu, footer, header[data-testid="stHeader"] {{ visibility:hidden; }}
124
+ .stDeployButton {{ display:none; }}
125
+ .block-container {{ padding-top:2.1rem; padding-bottom:3rem; max-width:1200px; }}
126
+
127
+ /* Restrained, BI-grade canvas: a single cool wash, no rainbow. */
128
+ .stApp {{
129
+ background:
130
+ radial-gradient(46rem 40rem at 104% -10%, rgba(13,148,136,0.07), transparent 55%),
131
+ radial-gradient(40rem 38rem at -6% -8%, rgba(37,99,235,0.05), transparent 52%),
132
+ #f5f7f9;
133
+ }}
134
+
135
+ /* Branded header. */
136
+ .cp-header {{ display:flex; align-items:center; gap:0.85rem; margin-bottom:0.3rem; }}
137
+ .cp-logo {{
138
+ display:grid; place-items:center; width:46px; height:46px; border-radius:13px;
139
+ background:linear-gradient(135deg,#0d9488,#0f766e); color:#fff;
140
+ box-shadow:0 8px 18px -7px rgba(13,148,136,0.6); flex:0 0 auto;
141
+ }}
142
+ .cp-logo svg {{ width:25px; height:25px; }}
143
+ .cp-title {{ font-size:1.68rem; font-weight:800; letter-spacing:-0.022em; line-height:1.05; color:var(--ink); }}
144
+ .cp-title .grad {{
145
+ background:linear-gradient(100deg,#0d9488,#14b8a6);
146
+ -webkit-background-clip:text; background-clip:text; color:transparent;
147
+ }}
148
+ .cp-eyebrow {{ font-size:0.7rem; font-weight:700; letter-spacing:0.15em; text-transform:uppercase; color:#94a3b8; }}
149
+ .cp-lede {{ color:#475569; font-size:1.0rem; line-height:1.6; max-width:50rem; margin:0.55rem 0 0.2rem; }}
150
+ .cp-lede b {{ color:var(--teal-700); font-weight:600; }}
151
+
152
+ /* Pill / badge row under the header. */
153
+ .cp-pills {{ display:flex; flex-wrap:wrap; gap:0.5rem; margin-top:0.95rem; }}
154
+ .cp-pill {{
155
+ display:inline-flex; align-items:center; gap:0.45rem;
156
+ font-size:0.76rem; font-weight:600; color:#334e68;
157
+ background:#fff; border:1px solid var(--line);
158
+ padding:0.3rem 0.7rem; border-radius:999px;
159
+ box-shadow:0 1px 2px rgba(15,27,42,0.04);
160
+ }}
161
+ .cp-pill b {{ font-family:var(--mono); font-weight:700; font-variant-numeric:tabular-nums; color:var(--ink); }}
162
+ .cp-pill.ok {{ color:var(--teal-700); border-color:rgba(13,148,136,0.3); background:var(--teal-50); }}
163
+ .cp-pill.bad {{ color:#b91c1c; border-color:rgba(220,38,38,0.3); background:#fef2f2; }}
164
+ .cp-dot {{ width:7px; height:7px; border-radius:999px; background:currentColor;
165
+ box-shadow:0 0 0 3px color-mix(in srgb, currentColor 18%, transparent); }}
166
+
167
+ /* Section label. */
168
+ .cp-section {{ font-size:1.1rem; font-weight:700; color:var(--ink); letter-spacing:-0.01em;
169
+ margin:0.2rem 0 0.1rem; display:flex; align-items:baseline; gap:0.55rem; }}
170
+ .cp-section .num {{ font-family:var(--mono); font-size:0.8rem; color:var(--teal);
171
+ font-weight:700; letter-spacing:0.04em; }}
172
+ .cp-sub {{ color:var(--muted); font-size:0.88rem; margin:0 0 0.45rem; }}
173
+
174
+ /* Bento KPI cards. */
175
+ div[data-testid="stMetric"] {{
176
+ background:#ffffff; border:1px solid var(--line); border-radius:16px;
177
+ padding:1.0rem 1.15rem 0.9rem; position:relative; overflow:hidden;
178
+ box-shadow:0 1px 2px rgba(15,27,42,0.04), 0 12px 28px -18px rgba(15,27,42,0.22);
179
+ transition:transform .12s ease, box-shadow .12s ease, border-color .12s ease;
180
+ }}
181
+ div[data-testid="stMetric"]:hover {{
182
+ transform:translateY(-2px); border-color:#d7e0e8;
183
+ box-shadow:0 1px 2px rgba(15,27,42,0.05), 0 18px 36px -18px rgba(13,148,136,0.34);
184
+ }}
185
+ div[data-testid="stMetricLabel"] p {{
186
+ font-size:0.7rem !important; font-weight:600 !important; letter-spacing:0.07em;
187
+ text-transform:uppercase; color:var(--muted) !important;
188
+ }}
189
+ /* Monospace, tabular numerals — the data-tool hallmark. */
190
+ div[data-testid="stMetricValue"] {{
191
+ font-family:var(--mono) !important; font-size:1.7rem !important; font-weight:700 !important;
192
+ color:var(--ink) !important; letter-spacing:-0.01em; font-variant-numeric:tabular-nums;
193
+ }}
194
+ div[data-testid="stMetricDelta"] {{ font-size:0.76rem !important; font-weight:600 !important; }}
195
+ div[data-testid="stMetricDelta"] div {{ font-variant-numeric:tabular-nums; }}
196
+
197
+ /* Top "proof" cards get a teal accent rail + tinted ground. */
198
+ .cp-proof div[data-testid="stMetric"] {{ border-top:3px solid var(--teal); background:linear-gradient(180deg,#fbfffe,#ffffff); }}
199
+
200
+ /* Quality-gate check grid. */
201
+ .cp-gate {{ display:grid; grid-template-columns:repeat(auto-fill,minmax(225px,1fr)); gap:0.5rem; margin:0.3rem 0 0.2rem; }}
202
+ .cp-check {{ display:flex; align-items:center; gap:0.55rem; background:#fff; border:1px solid var(--line);
203
+ border-radius:11px; padding:0.55rem 0.75rem; font-size:0.8rem; }}
204
+ .cp-check .ic {{ flex:0 0 auto; width:18px; height:18px; border-radius:6px; display:grid; place-items:center;
205
+ font-size:0.7rem; font-weight:800; color:#fff; }}
206
+ .cp-check.pass .ic {{ background:var(--teal); }}
207
+ .cp-check.fail .ic {{ background:#dc2626; }}
208
+ .cp-check .nm {{ font-weight:600; color:#334e68; }}
209
+ .cp-check .rel {{ margin-left:auto; font-family:var(--mono); font-size:0.7rem; color:#94a3b8; }}
210
+
211
+ /* Lineage flow strip. */
212
+ .cp-flow {{ display:flex; flex-wrap:wrap; align-items:stretch; gap:0.4rem; margin:0.2rem 0 0.4rem; }}
213
+ .cp-stage {{ flex:1 1 150px; background:#fff; border:1px solid var(--line); border-radius:13px;
214
+ padding:0.7rem 0.85rem; box-shadow:0 1px 2px rgba(15,27,42,0.04); }}
215
+ .cp-stage .st-n {{ font-family:var(--mono); font-size:0.68rem; font-weight:700; color:var(--teal); }}
216
+ .cp-stage .st-t {{ font-size:0.9rem; font-weight:700; color:var(--ink); margin-top:0.1rem; }}
217
+ .cp-stage .st-d {{ font-size:0.74rem; color:var(--muted); margin-top:0.15rem; line-height:1.4; }}
218
+ .cp-stage .st-v {{ font-family:var(--mono); font-size:0.74rem; color:var(--teal-700); font-weight:600; margin-top:0.3rem; }}
219
+ .cp-arrow {{ align-self:center; color:#cbd5e1; font-weight:700; }}
220
+ .cp-stage.gate {{ border-color:rgba(13,148,136,0.35); background:var(--teal-50); }}
221
+
222
+ /* Tighten radio / slider chrome. */
223
+ div[data-testid="stRadio"] label p, div[data-testid="stSlider"] label p {{ font-weight:600; color:#334155; }}
224
+
225
+ /* Footer. */
226
+ .cp-footer {{ margin-top:2.4rem; padding-top:1.1rem; border-top:1px solid var(--line);
227
+ display:flex; flex-wrap:wrap; gap:0.5rem 1.2rem; align-items:center;
228
+ justify-content:space-between; color:var(--muted); font-size:0.82rem; }}
229
+ .cp-footer a {{ color:var(--teal); text-decoration:none; font-weight:600; }}
230
+ .cp-footer a:hover {{ text-decoration:underline; }}
231
+ .cp-footer code {{ font-family:var(--mono); background:var(--teal-50); color:var(--teal-700);
232
+ padding:0.08rem 0.4rem; border-radius:6px; font-size:0.76rem; }}
233
+ </style>
234
+ """,
235
+ unsafe_allow_html=True,
236
+ )
237
+
238
+
239
+ inject_styles()
240
+
241
+
242
+ # --- Data access --------------------------------------------------------------
243
+ @st.cache_data(show_spinner=False)
244
+ def load_mart(query: str) -> pd.DataFrame:
245
+ con = duckdb.connect(str(settings.db_path), read_only=True)
246
+ try:
247
+ return con.execute(query).df()
248
+ finally:
249
+ con.close()
250
+
251
+
252
+ @st.cache_data(show_spinner=False)
253
+ def pipeline_health() -> dict:
254
+ """Headline pipeline proof, derived read-only from the warehouse.
255
+
256
+ Returns raw rows ingested, mart count, and the data-quality gate result so
257
+ the dashboard can show the same trust signals the build enforces.
258
+ """
259
+ from pipeline import quality # local import keeps page load light
260
+
261
+ con = duckdb.connect(str(settings.db_path), read_only=True)
262
+ try:
263
+ raw_tables = [
264
+ r[0]
265
+ for r in con.execute(
266
+ "SELECT table_name FROM information_schema.tables "
267
+ "WHERE table_schema = 'raw' ORDER BY table_name;"
268
+ ).fetchall()
269
+ ]
270
+ raw_rows = sum(
271
+ con.execute(f"SELECT count(*) FROM raw.{t}").fetchone()[0] for t in raw_tables
272
+ )
273
+ marts = [
274
+ r[0]
275
+ for r in con.execute(
276
+ "SELECT table_name FROM information_schema.tables "
277
+ "WHERE table_schema = 'marts' AND table_name NOT LIKE 'int\\_%' ESCAPE '\\' "
278
+ "ORDER BY table_name;"
279
+ ).fetchall()
280
+ ]
281
+ results = quality.run(con, settings, raise_on_fail=False)
282
+ gates_passed = sum(1 for r in results if r.passed)
283
+ gates_total = len(results)
284
+ checks = [
285
+ {"name": r.name, "relation": r.relation, "passed": r.passed, "failing": r.failing_rows}
286
+ for r in results
287
+ ]
288
+ finally:
289
+ con.close()
290
+ return {
291
+ "raw_rows": raw_rows,
292
+ "raw_tables": len(raw_tables),
293
+ "marts": marts,
294
+ "gates_passed": gates_passed,
295
+ "gates_total": gates_total,
296
+ "checks": checks,
297
+ }
298
+
299
+
300
+ def warehouse_ready() -> bool:
301
+ if not settings.db_path.exists():
302
+ return False
303
+ try:
304
+ con = duckdb.connect(str(settings.db_path), read_only=True)
305
+ try:
306
+ con.execute("SELECT 1 FROM marts.daily_revenue LIMIT 1;")
307
+ return True
308
+ finally:
309
+ con.close()
310
+ except Exception:
311
+ return False
312
+
313
+
314
+ # --- Branded header -----------------------------------------------------------
315
+ st.markdown(
316
+ """
317
+ <div class="cp-header">
318
+ <span class="cp-logo">
319
+ <svg viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2.1"
320
+ stroke-linecap="round" stroke-linejoin="round">
321
+ <path d="M3 3v18h18"/><path d="M7 14l3-4 3 3 5-7"/>
322
+ </svg>
323
+ </span>
324
+ <div>
325
+ <div class="cp-eyebrow">E-commerce analytics pipeline</div>
326
+ <div class="cp-title">Commerce<span class="grad">Pipeline</span></div>
327
+ </div>
328
+ </div>
329
+ """,
330
+ unsafe_allow_html=True,
331
+ )
332
+ st.markdown(
333
+ "<p class='cp-lede'>Raw operational data &rarr; a DuckDB warehouse &rarr; "
334
+ "layered SQL marts, behind a <b>data-quality gate that fails the build when the "
335
+ "numbers can&rsquo;t be trusted</b>. Every figure below is served read-only straight "
336
+ "from the warehouse the pipeline produced.</p>",
337
+ unsafe_allow_html=True,
338
+ )
339
+
340
+ if not warehouse_ready():
341
+ st.markdown("<div class='cp-pills'><span class='cp-pill'>Warehouse not built yet</span></div>", unsafe_allow_html=True)
342
+ st.warning(
343
+ "No warehouse found yet. Build it first:\n\n"
344
+ "```bash\nmake pipeline # or: python -m pipeline run\n```\n\n"
345
+ f"Expected database at `{settings.db_path}`."
346
+ )
347
+ st.stop()
348
+
349
+ health = pipeline_health()
350
+ gate_ok = health["gates_passed"] == health["gates_total"]
351
+ st.markdown(
352
+ f"""
353
+ <div class="cp-pills">
354
+ <span class="cp-pill {'ok' if gate_ok else 'bad'}">
355
+ <span class="cp-dot"></span>
356
+ {'Quality gate passing' if gate_ok else 'Quality gate FAILED'} ·
357
+ <b>{health['gates_passed']}/{health['gates_total']}</b>
358
+ </span>
359
+ <span class="cp-pill"><b>{health['raw_rows']:,}</b>&nbsp;rows ingested · <b>{health['raw_tables']}</b>&nbsp;source tables</span>
360
+ <span class="cp-pill"><b>{len(health['marts'])}</b>&nbsp;analytics marts</span>
361
+ <span class="cp-pill">DuckDB · in-process warehouse</span>
362
+ </div>
363
+ """,
364
+ unsafe_allow_html=True,
365
+ )
366
+
367
+ st.write("")
368
+
369
+ # --- Headline proof KPIs ------------------------------------------------------
370
+ st.markdown(
371
+ "<div class='cp-section'><span class='num'>01</span>Pipeline health</div>"
372
+ "<div class='cp-sub'>The trust signals the build itself enforces, surfaced in the product.</div>",
373
+ unsafe_allow_html=True,
374
+ )
375
+ st.markdown("<div class='cp-proof'>", unsafe_allow_html=True)
376
+ p1, p2, p3, p4 = st.columns(4)
377
+ p1.metric("Rows processed", f"{health['raw_rows']:,}", f"{health['raw_tables']} source tables")
378
+ p2.metric("Analytics marts", f"{len(health['marts'])}", "staging → marts")
379
+ p3.metric(
380
+ "Data-quality gates",
381
+ f"{health['gates_passed']}/{health['gates_total']}",
382
+ "all passing" if gate_ok else "failing",
383
+ delta_color="normal" if gate_ok else "inverse",
384
+ )
385
+ p4.metric("Warehouse", "DuckDB", "no server required")
386
+ st.markdown("</div>", unsafe_allow_html=True)
387
+
388
+ # --- Business KPIs ------------------------------------------------------------
389
+ daily = load_mart(
390
+ "SELECT order_date, orders, customers, units_sold, revenue, gross_profit, "
391
+ "avg_order_value, margin_pct FROM marts.daily_revenue ORDER BY order_date"
392
+ )
393
+ daily["order_date"] = pd.to_datetime(daily["order_date"])
394
+
395
+ total_rev = float(daily["revenue"].sum())
396
+ total_orders = int(daily["orders"].sum())
397
+ total_profit = float(daily["gross_profit"].sum())
398
+ total_units = int(daily["units_sold"].sum())
399
+ aov = total_rev / total_orders if total_orders else 0.0
400
+ margin = (total_profit / total_rev * 100) if total_rev else 0.0
401
+
402
+ st.write("")
403
+ st.markdown(
404
+ "<div class='cp-section'><span class='num'>02</span>Business performance</div>"
405
+ f"<div class='cp-sub'>Modelled from completed orders across {len(daily):,} active days.</div>",
406
+ unsafe_allow_html=True,
407
+ )
408
+ k1, k2, k3, k4 = st.columns(4)
409
+ k1.metric("Revenue", f"${total_rev:,.0f}", f"{total_units:,} units sold")
410
+ k2.metric("Completed orders", f"{total_orders:,}")
411
+ k3.metric("Avg order value", f"${aov:,.2f}")
412
+ k4.metric("Gross margin", f"{margin:.1f}%", f"${total_profit:,.0f} profit")
413
+
414
+ st.write("")
415
+
416
+ # --- Revenue trend ------------------------------------------------------------
417
+ st.markdown(
418
+ "<div class='cp-section'><span class='num'>03</span>Revenue trend</div>",
419
+ unsafe_allow_html=True,
420
+ )
421
+ granularity = st.radio(
422
+ "Granularity", ["Daily", "Weekly", "Monthly"], horizontal=True, index=1, label_visibility="collapsed"
423
+ )
424
+ freq = {"Daily": "D", "Weekly": "W", "Monthly": "MS"}[granularity]
425
+ trend = (
426
+ daily.set_index("order_date")
427
+ .resample(freq)[["revenue", "gross_profit", "orders"]]
428
+ .sum()
429
+ .reset_index()
430
+ )
431
+ rev_chart = (
432
+ alt.Chart(trend)
433
+ .mark_area(
434
+ opacity=0.95,
435
+ line={"color": TEAL_500, "strokeWidth": 2.4},
436
+ color=alt.Gradient(
437
+ gradient="linear",
438
+ stops=[
439
+ alt.GradientStop(color="#ffffff", offset=0),
440
+ alt.GradientStop(color=TEAL_SOFT, offset=1),
441
+ ],
442
+ x1=1, x2=1, y1=1, y2=0,
443
+ ),
444
+ )
445
+ .encode(
446
+ x=alt.X("order_date:T", title=None, axis=alt.Axis(grid=False)),
447
+ y=alt.Y("revenue:Q", title="Revenue ($)", axis=alt.Axis(format="$~s", grid=True)),
448
+ tooltip=[
449
+ alt.Tooltip("order_date:T", title="Period"),
450
+ alt.Tooltip("revenue:Q", title="Revenue", format="$,.0f"),
451
+ alt.Tooltip("gross_profit:Q", title="Gross profit", format="$,.0f"),
452
+ alt.Tooltip("orders:Q", title="Orders", format=",.0f"),
453
+ ],
454
+ )
455
+ .properties(height=300)
456
+ )
457
+ st.altair_chart(rev_chart, use_container_width=True)
458
+
459
+ st.write("")
460
+
461
+ # --- Top products + funnel ----------------------------------------------------
462
+ left, right = st.columns([3, 2], gap="large")
463
+
464
+ with left:
465
+ st.markdown(
466
+ "<div class='cp-section'><span class='num'>04</span>Top products by revenue</div>",
467
+ unsafe_allow_html=True,
468
+ )
469
+ top_n = st.slider("Show top N", 5, 25, 10, label_visibility="collapsed")
470
+ top = load_mart(
471
+ "SELECT product_name, category, units_sold, revenue, margin_pct, revenue_rank "
472
+ f"FROM marts.top_products ORDER BY revenue_rank LIMIT {top_n}"
473
+ )
474
+ bar = (
475
+ alt.Chart(top)
476
+ .mark_bar(cornerRadiusEnd=4)
477
+ .encode(
478
+ x=alt.X("revenue:Q", title="Revenue ($)", axis=alt.Axis(format="$~s")),
479
+ y=alt.Y("product_name:N", sort="-x", title=None),
480
+ color=alt.Color(
481
+ "category:N",
482
+ scale=alt.Scale(range=CATEGORY_RANGE),
483
+ legend=alt.Legend(title="Category", orient="bottom", columns=3),
484
+ ),
485
+ tooltip=[
486
+ alt.Tooltip("product_name:N", title="Product"),
487
+ alt.Tooltip("category:N", title="Category"),
488
+ alt.Tooltip("units_sold:Q", title="Units", format=",.0f"),
489
+ alt.Tooltip("revenue:Q", title="Revenue", format="$,.0f"),
490
+ alt.Tooltip("margin_pct:Q", title="Margin", format=".1%"),
491
+ ],
492
+ )
493
+ .properties(height=380)
494
+ )
495
+ st.altair_chart(bar, use_container_width=True)
496
+
497
+ with right:
498
+ st.markdown(
499
+ "<div class='cp-section'><span class='num'>05</span>Conversion funnel</div>",
500
+ unsafe_allow_html=True,
501
+ )
502
+ funnel = load_mart(
503
+ "SELECT step_name, step_index, sessions, pct_of_top, step_conversion "
504
+ "FROM marts.funnel_conversion ORDER BY step_index"
505
+ )
506
+ funnel["label"] = funnel["step_name"].str.replace("_", " ").str.title()
507
+ base = alt.Chart(funnel).encode(
508
+ # Sort by the model's own step_index so order is data-driven, not list-driven.
509
+ y=alt.Y("label:N", sort=alt.SortField(field="step_index", order="ascending"), title=None),
510
+ )
511
+ funnel_bars = base.mark_bar(color=TEAL, cornerRadiusEnd=4).encode(
512
+ x=alt.X("sessions:Q", title="Sessions", axis=alt.Axis(format="~s")),
513
+ tooltip=[
514
+ alt.Tooltip("label:N", title="Step"),
515
+ alt.Tooltip("sessions:Q", title="Sessions", format=",.0f"),
516
+ alt.Tooltip("pct_of_top:Q", title="% of top", format=".1%"),
517
+ alt.Tooltip("step_conversion:Q", title="Step conversion", format=".1%"),
518
+ ],
519
+ )
520
+ funnel_text = base.mark_text(align="left", dx=5, color=SLATE_500, fontWeight=600).encode(
521
+ x=alt.X("sessions:Q"),
522
+ text=alt.Text("pct_of_top:Q", format=".0%"),
523
+ )
524
+ st.altair_chart((funnel_bars + funnel_text).properties(height=210), use_container_width=True)
525
+ overall = float(funnel.iloc[-1]["pct_of_top"]) if len(funnel) else 0.0
526
+ st.metric("Overall view → purchase", f"{overall:.1%}")
527
+
528
+ st.write("")
529
+
530
+ # --- Cohort retention heatmap -------------------------------------------------
531
+ st.markdown(
532
+ "<div class='cp-section'><span class='num'>06</span>Customer cohort retention</div>"
533
+ "<div class='cp-sub'>Share of each signup cohort still ordering N months later.</div>",
534
+ unsafe_allow_html=True,
535
+ )
536
+ cohort = load_mart(
537
+ "SELECT strftime(cohort_month, '%Y-%m') AS cohort, month_number, retention_rate "
538
+ "FROM marts.customer_cohort_retention "
539
+ "WHERE month_number BETWEEN 0 AND 11 ORDER BY cohort_month, month_number"
540
+ )
541
+ heat = (
542
+ alt.Chart(cohort)
543
+ .mark_rect(stroke="#ffffff", strokeWidth=1.5, cornerRadius=2)
544
+ .encode(
545
+ x=alt.X("month_number:O", title="Months since signup", axis=alt.Axis(labelAngle=0)),
546
+ y=alt.Y("cohort:O", title="Signup cohort"),
547
+ color=alt.Color(
548
+ "retention_rate:Q",
549
+ title="Retention",
550
+ scale=alt.Scale(range=TEAL_RAMP),
551
+ legend=alt.Legend(format=".0%", orient="right"),
552
+ ),
553
+ tooltip=[
554
+ alt.Tooltip("cohort:N", title="Cohort"),
555
+ alt.Tooltip("month_number:O", title="Month #"),
556
+ alt.Tooltip("retention_rate:Q", title="Retention", format=".1%"),
557
+ ],
558
+ )
559
+ .properties(height=330)
560
+ )
561
+ st.altair_chart(heat, use_container_width=True)
562
+
563
+ # --- Lineage / architecture ---------------------------------------------------
564
+ st.write("")
565
+ st.markdown(
566
+ "<div class='cp-section'><span class='num'>07</span>Lineage &amp; quality gate</div>"
567
+ "<div class='cp-sub'>A dependency-free flow composes four stages; this dashboard "
568
+ "reads only the marts the gate signed off on.</div>",
569
+ unsafe_allow_html=True,
570
+ )
571
+
572
+ # Visible lineage flow strip (bento stages → quality gate → dashboard).
573
+ marts_count = len(health["marts"])
574
+ st.markdown(
575
+ f"""
576
+ <div class="cp-flow">
577
+ <div class="cp-stage">
578
+ <div class="st-n">01 · INGEST</div><div class="st-t">Generate</div>
579
+ <div class="st-d">Seeded synthetic generator</div>
580
+ <div class="st-v">{health['raw_rows']:,} rows · {health['raw_tables']} tables</div>
581
+ </div>
582
+ <div class="cp-arrow">&rarr;</div>
583
+ <div class="cp-stage">
584
+ <div class="st-n">02 · LOAD</div><div class="st-t">Warehouse</div>
585
+ <div class="st-d">Register raw files into DuckDB</div>
586
+ <div class="st-v">schema: raw</div>
587
+ </div>
588
+ <div class="cp-arrow">&rarr;</div>
589
+ <div class="cp-stage">
590
+ <div class="st-n">03 · TRANSFORM</div><div class="st-t">SQL marts</div>
591
+ <div class="st-d">staging &rarr; intermediate &rarr; marts</div>
592
+ <div class="st-v">{marts_count} marts</div>
593
+ </div>
594
+ <div class="cp-arrow">&rarr;</div>
595
+ <div class="cp-stage gate">
596
+ <div class="st-n">04 · QUALITY GATE</div><div class="st-t">Fail-closed</div>
597
+ <div class="st-d">A single failure exits non-zero &amp; halts the build</div>
598
+ <div class="st-v">{health['gates_passed']}/{health['gates_total']} passing</div>
599
+ </div>
600
+ </div>
601
+ """,
602
+ unsafe_allow_html=True,
603
+ )
604
+
605
+ # Per-check status grid — the gate, made legible.
606
+ _checks = sorted(health["checks"], key=lambda c: (c["passed"], c["name"]))
607
+ _rows = "".join(
608
+ f"<div class='cp-check {'pass' if c['passed'] else 'fail'}'>"
609
+ f"<span class='ic'>{'✓' if c['passed'] else '!'}</span>"
610
+ f"<span class='nm'>{c['name']}</span>"
611
+ f"<span class='rel'>{c['relation']}</span></div>"
612
+ for c in _checks
613
+ )
614
+ st.markdown(f"<div class='cp-gate'>{_rows}</div>", unsafe_allow_html=True)
615
+
616
+ with st.expander("Stage-by-stage detail", expanded=False):
617
+ st.markdown(
618
+ f"""
619
+ A dependency-free flow composes four stages; the dashboard you are looking at
620
+ reads the marts the **quality gate** signed off on.
621
+
622
+ | # | Stage | What runs | Output |
623
+ |---|-------|-----------|--------|
624
+ | **1** | **Ingest** | Seeded synthetic generator | {health['raw_rows']:,} raw rows across {health['raw_tables']} Parquet/CSV tables |
625
+ | **2** | **Load** | Register raw files into DuckDB | `raw` schema |
626
+ | **3** | **Transform** | Layered SQL: staging → intermediate → marts | {len(health['marts'])} marts ({", ".join(f"`{m}`" for m in health['marts'])}) |
627
+ | **4** | **Quality gate** | Declarative checks (not-null, unique, ranges, accepted values, referential integrity, mart sanity) | **{health['gates_passed']}/{health['gates_total']} passing** — a single failure exits non-zero and **halts the build** |
628
+
629
+ ```text
630
+ ingest ─▶ load ─▶ transform ─▶ quality gate ─▶ dashboard
631
+ (gen) (DuckDB) (SQL marts) (fail-closed) (you are here)
632
+ ```
633
+
634
+ Rebuild any time with `make pipeline` (or `python -m pipeline run`). Stages are
635
+ addressable individually: `python -m pipeline {{ingest,load,transform,quality}}`.
636
+ """
637
+ )
638
+
639
+ # --- Footer -------------------------------------------------------------------
640
+ st.markdown(
641
+ f"""
642
+ <div class="cp-footer">
643
+ <span>Built by <b>Laela Zorana</b> · CommercePipeline — trustworthy e-commerce analytics.</span>
644
+ <span>Source: <code>{settings.db_path.name}</code> · {len(daily):,} active days ·
645
+ rebuild with <code>make pipeline</code></span>
646
+ </div>
647
+ """,
648
+ unsafe_allow_html=True,
649
+ )
docker-compose.yml ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Run the whole thing with: docker compose up --build
2
+ # Then open http://localhost:8501
3
+ services:
4
+ commerce-pipeline:
5
+ build: .
6
+ image: commerce-pipeline:latest
7
+ ports:
8
+ - "8501:8501"
9
+ environment:
10
+ # Override any synthetic-data knob here, e.g. a bigger dataset:
11
+ # CP_N_ORDERS: "50000"
12
+ CP_SEED: "42"
13
+ volumes:
14
+ # Persist the generated warehouse on the host (optional).
15
+ - ./data:/app/data
16
+ restart: unless-stopped
pipeline/__init__.py ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """CommercePipeline: a small but complete e-commerce analytics pipeline.
2
+
3
+ Stages
4
+ ------
5
+ ingest -> generate deterministic synthetic raw data (Parquet/CSV)
6
+ load -> register raw files into a DuckDB warehouse
7
+ transform -> build staging + mart models with SQL
8
+ quality -> enforce data-quality gates (fails the run on violation)
9
+
10
+ The stages are composed by :mod:`pipeline.flow` and exposed on the CLI.
11
+ """
12
+
13
+ from pipeline.config import Settings, get_settings
14
+
15
+ __all__ = ["Settings", "get_settings"]
16
+ __version__ = "1.0.0"
pipeline/__main__.py ADDED
@@ -0,0 +1,6 @@
 
 
 
 
 
 
 
1
+ """Enable ``python -m pipeline <stage>``."""
2
+
3
+ from pipeline.cli import main
4
+
5
+ if __name__ == "__main__":
6
+ raise SystemExit(main())
pipeline/cli.py ADDED
@@ -0,0 +1,79 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Command-line entry point for the pipeline.
2
+
3
+ Examples
4
+ --------
5
+ python -m pipeline run # full pipeline ingest -> load -> transform -> quality
6
+ python -m pipeline ingest # just regenerate raw data
7
+ python -m pipeline transform # rebuild marts from the existing warehouse
8
+ python -m pipeline quality # re-run the data-quality gate
9
+ """
10
+
11
+ from __future__ import annotations
12
+
13
+ import argparse
14
+ import logging
15
+ import sys
16
+
17
+ from pipeline import ingest, load, quality, transform
18
+ from pipeline.config import get_settings
19
+ from pipeline.flow import run_pipeline
20
+ from pipeline.load import connect
21
+
22
+
23
+ def _configure_logging(verbose: bool) -> None:
24
+ logging.basicConfig(
25
+ level=logging.INFO if verbose else logging.WARNING,
26
+ format="%(message)s",
27
+ stream=sys.stderr,
28
+ )
29
+
30
+
31
+ def main(argv: list[str] | None = None) -> int:
32
+ parser = argparse.ArgumentParser(prog="commerce-pipeline", description=__doc__)
33
+ parser.add_argument(
34
+ "stage",
35
+ choices=["run", "ingest", "load", "transform", "quality"],
36
+ help="pipeline stage to execute ('run' = full pipeline)",
37
+ )
38
+ parser.add_argument("-q", "--quiet", action="store_true", help="suppress progress logs")
39
+ args = parser.parse_args(argv)
40
+
41
+ _configure_logging(verbose=not args.quiet)
42
+ s = get_settings()
43
+
44
+ try:
45
+ if args.stage == "run":
46
+ summary = run_pipeline(s)
47
+ print(summary.render())
48
+ return 0
49
+
50
+ if args.stage == "ingest":
51
+ counts = ingest.run(s)
52
+ print(f"ingest complete: {sum(counts.values()):,} rows across {len(counts)} tables")
53
+ return 0
54
+
55
+ con = connect(s)
56
+ try:
57
+ if args.stage == "load":
58
+ counts = load.run(con, s)
59
+ print(f"load complete: {counts}")
60
+ elif args.stage == "transform":
61
+ marts = transform.run(con, s)
62
+ print(f"transform complete: built marts {marts}")
63
+ elif args.stage == "quality":
64
+ results = quality.run(con, s)
65
+ print(f"quality complete: {len(results)} checks passed")
66
+ finally:
67
+ con.close()
68
+ return 0
69
+
70
+ except quality.DataQualityError as exc:
71
+ print(f"DATA QUALITY GATE FAILED: {exc}", file=sys.stderr)
72
+ return 1
73
+ except FileNotFoundError as exc:
74
+ print(f"ERROR: {exc}", file=sys.stderr)
75
+ return 2
76
+
77
+
78
+ if __name__ == "__main__": # pragma: no cover
79
+ raise SystemExit(main())
pipeline/config.py ADDED
@@ -0,0 +1,56 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Central configuration for the pipeline.
2
+
3
+ Paths are resolved relative to the project root so the pipeline behaves the
4
+ same whether it is invoked from the repo root, a Makefile target, or CI.
5
+ Everything is overridable via environment variables, which keeps the code
6
+ container- and cloud-friendly without introducing a config framework.
7
+ """
8
+
9
+ from __future__ import annotations
10
+
11
+ import os
12
+ from dataclasses import dataclass, field
13
+ from functools import lru_cache
14
+ from pathlib import Path
15
+
16
+ # pipeline/config.py -> project root is two levels up.
17
+ PROJECT_ROOT = Path(__file__).resolve().parent.parent
18
+
19
+
20
+ def _env_path(var: str, default: Path) -> Path:
21
+ raw = os.environ.get(var)
22
+ return Path(raw).expanduser().resolve() if raw else default
23
+
24
+
25
+ @dataclass(frozen=True)
26
+ class Settings:
27
+ """Immutable run configuration."""
28
+
29
+ project_root: Path = PROJECT_ROOT
30
+ raw_dir: Path = field(default_factory=lambda: _env_path("CP_RAW_DIR", PROJECT_ROOT / "data" / "raw"))
31
+ warehouse_dir: Path = field(
32
+ default_factory=lambda: _env_path("CP_WAREHOUSE_DIR", PROJECT_ROOT / "data" / "warehouse")
33
+ )
34
+ sql_dir: Path = field(default_factory=lambda: PROJECT_ROOT / "pipeline" / "sql")
35
+
36
+ # Synthetic-data knobs (deterministic given the seed).
37
+ seed: int = int(os.environ.get("CP_SEED", "42"))
38
+ n_customers: int = int(os.environ.get("CP_N_CUSTOMERS", "2000"))
39
+ n_orders: int = int(os.environ.get("CP_N_ORDERS", "12000"))
40
+ n_products: int = int(os.environ.get("CP_N_PRODUCTS", "120"))
41
+ start_date: str = os.environ.get("CP_START_DATE", "2024-01-01")
42
+ end_date: str = os.environ.get("CP_END_DATE", "2024-12-31")
43
+
44
+ @property
45
+ def db_path(self) -> Path:
46
+ return self.warehouse_dir / "commerce.duckdb"
47
+
48
+ def ensure_dirs(self) -> None:
49
+ self.raw_dir.mkdir(parents=True, exist_ok=True)
50
+ self.warehouse_dir.mkdir(parents=True, exist_ok=True)
51
+
52
+
53
+ @lru_cache(maxsize=1)
54
+ def get_settings() -> Settings:
55
+ """Return process-wide settings (cached)."""
56
+ return Settings()
pipeline/flow.py ADDED
@@ -0,0 +1,88 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Orchestration: run the full pipeline ingest -> load -> transform -> quality.
2
+
3
+ This is a plain, dependency-free DAG runner. Each stage is a function returning
4
+ a small summary; the flow threads a single DuckDB connection through load,
5
+ transform, and quality so they operate on the same warehouse transaction-side.
6
+
7
+ If `prefect` is installed, ``pipeline.orchestrate`` exposes the same DAG as a
8
+ Prefect flow for schedulable/observable runs -- but it is entirely optional and
9
+ the project runs end to end without it.
10
+ """
11
+
12
+ from __future__ import annotations
13
+
14
+ import logging
15
+ import time
16
+ from dataclasses import asdict, dataclass, field
17
+ from typing import Any
18
+
19
+ from pipeline import ingest, load, quality, transform
20
+ from pipeline.config import Settings, get_settings
21
+ from pipeline.load import connect
22
+
23
+ log = logging.getLogger("commerce.flow")
24
+
25
+
26
+ @dataclass
27
+ class RunSummary:
28
+ raw_counts: dict[str, int] = field(default_factory=dict)
29
+ loaded_counts: dict[str, int] = field(default_factory=dict)
30
+ marts: list[str] = field(default_factory=list)
31
+ quality_passed: int = 0
32
+ quality_failed: int = 0
33
+ seconds: float = 0.0
34
+
35
+ def as_dict(self) -> dict[str, Any]:
36
+ return asdict(self)
37
+
38
+ def render(self) -> str:
39
+ lines = [
40
+ "=" * 60,
41
+ " CommercePipeline run summary",
42
+ "=" * 60,
43
+ f" raw rows written : {sum(self.raw_counts.values()):,}",
44
+ ]
45
+ for name, n in self.raw_counts.items():
46
+ lines.append(f" - {name:<12} {n:>10,}")
47
+ lines.append(f" marts produced : {len(self.marts)} -> {', '.join(self.marts)}")
48
+ lines.append(
49
+ f" quality gates : {self.quality_passed} passed, {self.quality_failed} failed"
50
+ )
51
+ lines.append(f" elapsed : {self.seconds:.2f}s")
52
+ lines.append("=" * 60)
53
+ return "\n".join(lines)
54
+
55
+
56
+ def run_pipeline(settings: Settings | None = None) -> RunSummary:
57
+ """Execute the full pipeline. Raises on a data-quality gate failure."""
58
+ s = settings or get_settings()
59
+ t0 = time.perf_counter()
60
+ summary = RunSummary()
61
+
62
+ log.info("[1/4] ingest: generating synthetic dataset (seed=%d)", s.seed)
63
+ summary.raw_counts = ingest.run(s)
64
+
65
+ con = connect(s)
66
+ try:
67
+ log.info("[2/4] load: registering raw files into DuckDB")
68
+ summary.loaded_counts = load.run(con, s)
69
+
70
+ log.info("[3/4] transform: building staging + mart models")
71
+ summary.marts = transform.run(con, s)
72
+
73
+ log.info("[4/4] quality: enforcing data-quality gates")
74
+ results = quality.run(con, s, raise_on_fail=False)
75
+ summary.quality_passed = sum(1 for r in results if r.passed)
76
+ summary.quality_failed = sum(1 for r in results if not r.passed)
77
+ failed = [r for r in results if not r.passed]
78
+ finally:
79
+ con.close()
80
+
81
+ summary.seconds = time.perf_counter() - t0
82
+
83
+ if failed:
84
+ names = ", ".join(f"{r.name} ({r.failing_rows} rows)" for r in failed)
85
+ raise quality.DataQualityError(
86
+ f"pipeline halted: {len(failed)} data-quality gate(s) failed: {names}"
87
+ )
88
+ return summary
pipeline/ingest.py ADDED
@@ -0,0 +1,240 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Ingest stage: generate a realistic, deterministic synthetic e-commerce dataset.
2
+
3
+ We use a seeded ``numpy`` generator so every run produces byte-identical data,
4
+ which makes the downstream marts and tests reproducible. Four raw tables are
5
+ emitted, mirroring a typical operational store:
6
+
7
+ * ``customers`` - one row per customer (signup date, marketing channel, country)
8
+ * ``products`` - product catalogue (category, price, cost)
9
+ * ``orders`` - one row per order (customer, status, timestamp)
10
+ * ``order_items`` - order line items (product, quantity, unit price)
11
+ * ``events`` - clickstream funnel events (view -> cart -> checkout -> purchase)
12
+
13
+ Outputs are written as Parquet (analytics-native, typed) with a CSV mirror of
14
+ ``orders`` so the raw layer is also human-inspectable.
15
+ """
16
+
17
+ from __future__ import annotations
18
+
19
+ import logging
20
+ from dataclasses import dataclass
21
+ from datetime import datetime, timedelta
22
+
23
+ import numpy as np
24
+ import pandas as pd
25
+
26
+ from pipeline.config import Settings, get_settings
27
+
28
+ log = logging.getLogger("commerce.ingest")
29
+
30
+ # Reference data ----------------------------------------------------------------
31
+ CHANNELS = ["organic", "paid_search", "social", "email", "referral", "affiliate"]
32
+ CHANNEL_WEIGHTS = [0.34, 0.22, 0.18, 0.12, 0.09, 0.05]
33
+ COUNTRIES = ["US", "GB", "DE", "FR", "CA", "AU", "NL", "SE"]
34
+ COUNTRY_WEIGHTS = [0.42, 0.14, 0.10, 0.08, 0.09, 0.07, 0.06, 0.04]
35
+ CATEGORIES = ["Apparel", "Home", "Electronics", "Beauty", "Outdoors", "Toys"]
36
+ ORDER_STATUSES = ["completed", "completed", "completed", "completed", "refunded", "cancelled"]
37
+ # Funnel: probability of progressing to the next step.
38
+ FUNNEL_STEPS = ["view", "add_to_cart", "checkout", "purchase"]
39
+
40
+
41
+ @dataclass
42
+ class RawTables:
43
+ customers: pd.DataFrame
44
+ products: pd.DataFrame
45
+ orders: pd.DataFrame
46
+ order_items: pd.DataFrame
47
+ events: pd.DataFrame
48
+
49
+
50
+ def _daterange_days(start: str, end: str) -> int:
51
+ d0 = datetime.fromisoformat(start)
52
+ d1 = datetime.fromisoformat(end)
53
+ return (d1 - d0).days
54
+
55
+
56
+ def generate(settings: Settings | None = None) -> RawTables:
57
+ """Build the in-memory raw tables. Pure function of ``settings`` + seed."""
58
+ s = settings or get_settings()
59
+ rng = np.random.default_rng(s.seed)
60
+ span_days = _daterange_days(s.start_date, s.end_date)
61
+ start = datetime.fromisoformat(s.start_date)
62
+
63
+ # --- customers -----------------------------------------------------------
64
+ cust_ids = np.arange(1, s.n_customers + 1)
65
+ signup_offsets = rng.integers(0, span_days + 1, size=s.n_customers)
66
+ customers = pd.DataFrame(
67
+ {
68
+ "customer_id": cust_ids,
69
+ "signup_date": [(start + timedelta(days=int(o))).date() for o in signup_offsets],
70
+ "channel": rng.choice(CHANNELS, size=s.n_customers, p=CHANNEL_WEIGHTS),
71
+ "country": rng.choice(COUNTRIES, size=s.n_customers, p=COUNTRY_WEIGHTS),
72
+ }
73
+ )
74
+
75
+ # --- products ------------------------------------------------------------
76
+ prod_ids = np.arange(1, s.n_products + 1)
77
+ base_price = np.round(rng.gamma(shape=2.2, scale=18.0, size=s.n_products) + 5.0, 2)
78
+ margin = rng.uniform(0.35, 0.65, size=s.n_products) # gross margin fraction
79
+ products = pd.DataFrame(
80
+ {
81
+ "product_id": prod_ids,
82
+ "product_name": [f"SKU-{i:04d}" for i in prod_ids],
83
+ "category": rng.choice(CATEGORIES, size=s.n_products),
84
+ "unit_price": base_price,
85
+ "unit_cost": np.round(base_price * (1.0 - margin), 2),
86
+ }
87
+ )
88
+
89
+ # --- orders --------------------------------------------------------------
90
+ # Repeat-purchase behaviour: pick customers with replacement, weighted so a
91
+ # subset of customers buy more often (a realistic long tail).
92
+ cust_affinity = rng.gamma(shape=1.6, scale=1.0, size=s.n_customers)
93
+ cust_affinity /= cust_affinity.sum()
94
+ order_customers = rng.choice(cust_ids, size=s.n_orders, replace=True, p=cust_affinity)
95
+
96
+ # Order date must be on/after that customer's signup date.
97
+ signup_by_id = dict(zip(cust_ids, signup_offsets))
98
+ order_offsets = np.empty(s.n_orders, dtype=int)
99
+ for i, c in enumerate(order_customers):
100
+ lo = int(signup_by_id[c])
101
+ order_offsets[i] = rng.integers(lo, span_days + 1) if lo < span_days else span_days
102
+ # Seasonal lift towards Q4 (holiday season) via a soft multiplier on selection.
103
+ order_ids = np.arange(1, s.n_orders + 1)
104
+ order_ts = [start + timedelta(days=int(o), seconds=int(rng.integers(0, 86400))) for o in order_offsets]
105
+ orders = pd.DataFrame(
106
+ {
107
+ "order_id": order_ids,
108
+ "customer_id": order_customers,
109
+ "order_ts": order_ts,
110
+ "status": rng.choice(ORDER_STATUSES, size=s.n_orders),
111
+ }
112
+ ).sort_values("order_ts", kind="stable", ignore_index=True)
113
+ # Reassign sequential ids by time so order_id is monotonic with order_ts.
114
+ orders["order_id"] = np.arange(1, s.n_orders + 1)
115
+
116
+ # --- order_items ---------------------------------------------------------
117
+ # 1-4 line items per order.
118
+ n_items = rng.integers(1, 5, size=s.n_orders)
119
+ item_order_ids = np.repeat(orders["order_id"].to_numpy(), n_items)
120
+ total_items = int(n_items.sum())
121
+ item_products = rng.choice(prod_ids, size=total_items)
122
+ quantities = rng.integers(1, 6, size=total_items)
123
+ # Capture unit price at time of sale (small jitter to mimic promos/price drift).
124
+ price_lookup = products.set_index("product_id")["unit_price"]
125
+ sale_unit_price = np.round(
126
+ price_lookup.loc[item_products].to_numpy() * rng.uniform(0.9, 1.0, size=total_items), 2
127
+ )
128
+ order_items = pd.DataFrame(
129
+ {
130
+ "order_item_id": np.arange(1, total_items + 1),
131
+ "order_id": item_order_ids,
132
+ "product_id": item_products,
133
+ "quantity": quantities,
134
+ "unit_price": sale_unit_price,
135
+ }
136
+ )
137
+
138
+ # --- events (funnel) -----------------------------------------------------
139
+ events = _generate_events(rng, orders, start, span_days)
140
+
141
+ return RawTables(customers, products, orders, order_items, events)
142
+
143
+
144
+ def _generate_events(rng, orders: pd.DataFrame, start: datetime, span_days: int) -> pd.DataFrame:
145
+ """Generate a clickstream funnel.
146
+
147
+ Every completed order yields a full view->purchase chain (so purchase events
148
+ reconcile with orders). On top of that we add abandoned sessions that drop
149
+ out partway, which is what makes funnel_conversion interesting.
150
+ """
151
+ rows: list[dict] = []
152
+ event_id = 1
153
+ completed = orders[orders["status"] == "completed"]
154
+
155
+ # Funded chains for real purchases.
156
+ for order_id, customer_id, ts in zip(
157
+ completed["order_id"], completed["customer_id"], completed["order_ts"]
158
+ ):
159
+ session = int(order_id)
160
+ t = ts - timedelta(minutes=int(rng.integers(3, 40)))
161
+ for step in FUNNEL_STEPS:
162
+ rows.append(
163
+ {
164
+ "event_id": event_id,
165
+ "session_id": session,
166
+ "customer_id": int(customer_id),
167
+ "event_type": step,
168
+ "event_ts": t,
169
+ }
170
+ )
171
+ event_id += 1
172
+ t += timedelta(seconds=int(rng.integers(20, 600)))
173
+
174
+ # Abandoned sessions (no order). Roughly 2x the purchase sessions. These
175
+ # never reach 'purchase' -- by construction the only sessions that purchase
176
+ # are the funded chains above, so funnel purchases reconcile 1:1 with
177
+ # completed orders. We advance at most to 'checkout'.
178
+ n_abandoned = len(completed) * 2
179
+ abandon_steps = FUNNEL_STEPS[1:-1] # add_to_cart, checkout (no purchase)
180
+ drop_probs = [0.55, 0.30] # P(advance) into add_to_cart, then checkout
181
+ base_session = int(orders["order_id"].max()) + 1
182
+ for k in range(n_abandoned):
183
+ session = base_session + k
184
+ customer_id = int(rng.choice(orders["customer_id"]))
185
+ offset = int(rng.integers(0, span_days + 1))
186
+ t = start + timedelta(days=offset, seconds=int(rng.integers(0, 86400)))
187
+ rows.append(
188
+ {
189
+ "event_id": event_id,
190
+ "session_id": session,
191
+ "customer_id": customer_id,
192
+ "event_type": "view",
193
+ "event_ts": t,
194
+ }
195
+ )
196
+ event_id += 1
197
+ for step, p in zip(abandon_steps, drop_probs):
198
+ if rng.random() > p:
199
+ break
200
+ t += timedelta(seconds=int(rng.integers(20, 600)))
201
+ rows.append(
202
+ {
203
+ "event_id": event_id,
204
+ "session_id": session,
205
+ "customer_id": customer_id,
206
+ "event_type": step,
207
+ "event_ts": t,
208
+ }
209
+ )
210
+ event_id += 1
211
+
212
+ events = pd.DataFrame(rows)
213
+ return events.sort_values("event_ts", kind="stable", ignore_index=True)
214
+
215
+
216
+ def write_raw(tables: RawTables, settings: Settings | None = None) -> dict[str, int]:
217
+ """Persist raw tables to the raw directory. Returns row counts per table."""
218
+ s = settings or get_settings()
219
+ s.ensure_dirs()
220
+ counts: dict[str, int] = {}
221
+ for name in ("customers", "products", "orders", "order_items", "events"):
222
+ df: pd.DataFrame = getattr(tables, name)
223
+ df.to_parquet(s.raw_dir / f"{name}.parquet", index=False)
224
+ counts[name] = len(df)
225
+ # Human-readable CSV mirror of the headline table.
226
+ tables.orders.to_csv(s.raw_dir / "orders.csv", index=False)
227
+ log.info("wrote raw tables: %s", counts)
228
+ return counts
229
+
230
+
231
+ def run(settings: Settings | None = None) -> dict[str, int]:
232
+ """Ingest entry point used by the flow/CLI."""
233
+ s = settings or get_settings()
234
+ tables = generate(s)
235
+ return write_raw(tables, s)
236
+
237
+
238
+ if __name__ == "__main__": # pragma: no cover
239
+ logging.basicConfig(level=logging.INFO, format="%(message)s")
240
+ print(run())
pipeline/load.py ADDED
@@ -0,0 +1,53 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Load stage: register the raw Parquet files into a DuckDB warehouse.
2
+
3
+ We load into a ``raw`` schema with explicit ``CREATE TABLE AS SELECT`` so the
4
+ warehouse is materialised (self-contained, queryable with the DuckDB CLI, and
5
+ safe to ship to the dashboard) rather than relying on live file scans.
6
+ """
7
+
8
+ from __future__ import annotations
9
+
10
+ import logging
11
+
12
+ import duckdb
13
+
14
+ from pipeline.config import Settings, get_settings
15
+
16
+ log = logging.getLogger("commerce.load")
17
+
18
+ RAW_TABLES = ("customers", "products", "orders", "order_items", "events")
19
+
20
+
21
+ def connect(settings: Settings | None = None, read_only: bool = False) -> duckdb.DuckDBPyConnection:
22
+ """Open a connection to the warehouse database."""
23
+ s = settings or get_settings()
24
+ s.ensure_dirs()
25
+ return duckdb.connect(str(s.db_path), read_only=read_only)
26
+
27
+
28
+ def run(con: duckdb.DuckDBPyConnection | None = None, settings: Settings | None = None) -> dict[str, int]:
29
+ """Load all raw Parquet files into ``raw.*`` tables. Returns row counts."""
30
+ s = settings or get_settings()
31
+ owns_con = con is None
32
+ con = con or connect(s)
33
+ try:
34
+ con.execute("CREATE SCHEMA IF NOT EXISTS raw;")
35
+ counts: dict[str, int] = {}
36
+ for name in RAW_TABLES:
37
+ path = s.raw_dir / f"{name}.parquet"
38
+ if not path.exists():
39
+ raise FileNotFoundError(
40
+ f"raw table {name!r} not found at {path} - run the ingest stage first"
41
+ )
42
+ con.execute(f"CREATE OR REPLACE TABLE raw.{name} AS SELECT * FROM read_parquet(?);", [str(path)])
43
+ counts[name] = con.execute(f"SELECT count(*) FROM raw.{name};").fetchone()[0]
44
+ log.info("loaded raw tables into DuckDB: %s", counts)
45
+ return counts
46
+ finally:
47
+ if owns_con:
48
+ con.close()
49
+
50
+
51
+ if __name__ == "__main__": # pragma: no cover
52
+ logging.basicConfig(level=logging.INFO, format="%(message)s")
53
+ print(run())
pipeline/orchestrate.py ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Optional Prefect orchestration of the same DAG exposed in :mod:`pipeline.flow`.
2
+
3
+ Prefect is **not** a hard dependency. The Makefile DAG and ``pipeline.flow`` run
4
+ the pipeline end to end without it. Install the extra to get a schedulable,
5
+ observable flow::
6
+
7
+ pip install prefect
8
+ python -m pipeline.orchestrate # run the flow once
9
+ prefect server start # (optional) UI for run history
10
+
11
+ Each stage is a Prefect ``@task`` so retries, logging, and the run graph come for
12
+ free; the orchestration mirrors ``flow.run_pipeline`` exactly.
13
+ """
14
+
15
+ from __future__ import annotations
16
+
17
+ import sys
18
+
19
+ from pipeline import ingest, load, quality, transform
20
+ from pipeline.config import get_settings
21
+ from pipeline.load import connect
22
+
23
+ try:
24
+ from prefect import flow, task
25
+ except ImportError: # pragma: no cover - optional dependency
26
+ print(
27
+ "Prefect is not installed. Install it with `pip install prefect`,\n"
28
+ "or run the dependency-free pipeline with `python -m pipeline run`.",
29
+ file=sys.stderr,
30
+ )
31
+ raise SystemExit(2)
32
+
33
+
34
+ @task(name="ingest", retries=1)
35
+ def ingest_task(settings) -> dict:
36
+ return ingest.run(settings)
37
+
38
+
39
+ @task(name="load")
40
+ def load_task(settings) -> dict:
41
+ con = connect(settings)
42
+ try:
43
+ return load.run(con, settings)
44
+ finally:
45
+ con.close()
46
+
47
+
48
+ @task(name="transform")
49
+ def transform_task(settings) -> list:
50
+ con = connect(settings)
51
+ try:
52
+ return transform.run(con, settings)
53
+ finally:
54
+ con.close()
55
+
56
+
57
+ @task(name="quality")
58
+ def quality_task(settings) -> int:
59
+ # raise_on_fail=True so a failed gate fails the Prefect run.
60
+ results = quality.run(settings=settings, raise_on_fail=True)
61
+ return len(results)
62
+
63
+
64
+ @flow(name="commerce-pipeline")
65
+ def commerce_pipeline_flow() -> None:
66
+ settings = get_settings()
67
+ raw = ingest_task(settings)
68
+ loaded = load_task(settings, wait_for=[raw])
69
+ marts = transform_task(settings, wait_for=[loaded])
70
+ quality_task(settings, wait_for=[marts])
71
+
72
+
73
+ if __name__ == "__main__": # pragma: no cover
74
+ commerce_pipeline_flow()
pipeline/quality.py ADDED
@@ -0,0 +1,210 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Data-quality stage: declarative checks that gate the pipeline.
2
+
3
+ Each check is a small dataclass that compiles to a single COUNT query returning
4
+ the number of *violating* rows. A non-zero count fails the check; any failed
5
+ check makes :func:`run` raise :class:`DataQualityError`, which the flow surfaces
6
+ as a non-zero exit code. This is the "quality gate" -- bad data stops the run.
7
+
8
+ Supported check types:
9
+ * not_null - column has no NULLs
10
+ * unique - column (or column set) has no duplicates
11
+ * accepted_range - numeric column within [min, max]
12
+ * accepted_values - column only contains a given value set
13
+ * relationship - every FK value exists in a referenced PK column
14
+ * expression - arbitrary boolean SQL that must hold for every row
15
+ """
16
+
17
+ from __future__ import annotations
18
+
19
+ import logging
20
+ from dataclasses import dataclass, field
21
+ from typing import Iterable, Sequence
22
+
23
+ import duckdb
24
+
25
+ from pipeline.config import Settings, get_settings
26
+
27
+ log = logging.getLogger("commerce.quality")
28
+
29
+
30
+ class DataQualityError(RuntimeError):
31
+ """Raised when one or more data-quality checks fail."""
32
+
33
+
34
+ @dataclass
35
+ class CheckResult:
36
+ name: str
37
+ relation: str
38
+ passed: bool
39
+ failing_rows: int
40
+ detail: str = ""
41
+
42
+
43
+ @dataclass
44
+ class Check:
45
+ """A single declarative data-quality assertion."""
46
+
47
+ name: str
48
+ relation: str
49
+ kind: str
50
+ column: str | None = None
51
+ columns: Sequence[str] | None = None
52
+ min: float | None = None
53
+ max: float | None = None
54
+ values: Sequence[object] | None = None
55
+ to_relation: str | None = None
56
+ to_column: str | None = None
57
+ expression: str | None = None
58
+
59
+ def _violation_query(self) -> str:
60
+ rel = self.relation
61
+ if self.kind == "not_null":
62
+ return f"SELECT count(*) FROM {rel} WHERE {self.column} IS NULL"
63
+ if self.kind == "unique":
64
+ cols = ", ".join(self.columns or [self.column]) # type: ignore[list-item]
65
+ # rows that participate in a duplicate group
66
+ return (
67
+ f"SELECT COALESCE(sum(c), 0) FROM ("
68
+ f" SELECT count(*) AS c FROM {rel} GROUP BY {cols} HAVING count(*) > 1"
69
+ f") d"
70
+ )
71
+ if self.kind == "accepted_range":
72
+ conds = []
73
+ if self.min is not None:
74
+ conds.append(f"{self.column} < {self.min}")
75
+ if self.max is not None:
76
+ conds.append(f"{self.column} > {self.max}")
77
+ conds.append(f"{self.column} IS NULL")
78
+ return f"SELECT count(*) FROM {rel} WHERE {' OR '.join(conds)}"
79
+ if self.kind == "accepted_values":
80
+ rendered = ", ".join(_sql_literal(v) for v in (self.values or []))
81
+ return f"SELECT count(*) FROM {rel} WHERE {self.column} NOT IN ({rendered})"
82
+ if self.kind == "relationship":
83
+ return (
84
+ f"SELECT count(*) FROM {rel} child "
85
+ f"LEFT JOIN {self.to_relation} parent "
86
+ f" ON child.{self.column} = parent.{self.to_column} "
87
+ f"WHERE child.{self.column} IS NOT NULL AND parent.{self.to_column} IS NULL"
88
+ )
89
+ if self.kind == "expression":
90
+ return f"SELECT count(*) FROM {rel} WHERE NOT ({self.expression})"
91
+ raise ValueError(f"unknown check kind: {self.kind!r}")
92
+
93
+ def run(self, con: duckdb.DuckDBPyConnection) -> CheckResult:
94
+ failing = con.execute(self._violation_query()).fetchone()[0] or 0
95
+ return CheckResult(
96
+ name=self.name,
97
+ relation=self.relation,
98
+ passed=failing == 0,
99
+ failing_rows=int(failing),
100
+ )
101
+
102
+
103
+ def _sql_literal(value: object) -> str:
104
+ if isinstance(value, str):
105
+ escaped = value.replace("'", "''")
106
+ return f"'{escaped}'"
107
+ if isinstance(value, bool):
108
+ return "TRUE" if value else "FALSE"
109
+ return str(value)
110
+
111
+
112
+ def default_checks() -> list[Check]:
113
+ """The suite enforced on every pipeline run."""
114
+ return [
115
+ # --- raw integrity ---
116
+ Check("customers.pk_unique", "raw.customers", "unique", column="customer_id"),
117
+ Check("customers.id_not_null", "raw.customers", "not_null", column="customer_id"),
118
+ Check("products.pk_unique", "raw.products", "unique", column="product_id"),
119
+ Check("orders.pk_unique", "raw.orders", "unique", column="order_id"),
120
+ Check("order_items.pk_unique", "raw.order_items", "unique", column="order_item_id"),
121
+ # referential integrity
122
+ Check(
123
+ "orders.customer_fk",
124
+ "raw.orders",
125
+ "relationship",
126
+ column="customer_id",
127
+ to_relation="raw.customers",
128
+ to_column="customer_id",
129
+ ),
130
+ Check(
131
+ "order_items.order_fk",
132
+ "raw.order_items",
133
+ "relationship",
134
+ column="order_id",
135
+ to_relation="raw.orders",
136
+ to_column="order_id",
137
+ ),
138
+ Check(
139
+ "order_items.product_fk",
140
+ "raw.order_items",
141
+ "relationship",
142
+ column="product_id",
143
+ to_relation="raw.products",
144
+ to_column="product_id",
145
+ ),
146
+ # accepted ranges / values
147
+ Check("products.price_positive", "raw.products", "accepted_range", column="unit_price", min=0.01),
148
+ Check("products.cost_nonneg", "raw.products", "accepted_range", column="unit_cost", min=0.0),
149
+ Check("order_items.qty_range", "raw.order_items", "accepted_range", column="quantity", min=1, max=100),
150
+ Check(
151
+ "orders.status_values",
152
+ "raw.orders",
153
+ "accepted_values",
154
+ column="status",
155
+ values=["completed", "refunded", "cancelled"],
156
+ ),
157
+ # --- mart sanity ---
158
+ Check("daily_revenue.nonneg", "marts.daily_revenue", "accepted_range", column="revenue", min=0.0),
159
+ Check(
160
+ "daily_revenue.margin_bounded",
161
+ "marts.daily_revenue",
162
+ "expression",
163
+ expression="margin_pct BETWEEN -1 AND 1",
164
+ ),
165
+ Check(
166
+ "cohort.retention_bounded",
167
+ "marts.customer_cohort_retention",
168
+ "expression",
169
+ expression="retention_rate >= 0 AND retention_rate <= 1",
170
+ ),
171
+ Check(
172
+ "funnel.monotonic_nonincreasing",
173
+ "marts.funnel_conversion",
174
+ "expression",
175
+ expression="pct_of_top <= 1.0",
176
+ ),
177
+ ]
178
+
179
+
180
+ def run(
181
+ con: duckdb.DuckDBPyConnection | None = None,
182
+ settings: Settings | None = None,
183
+ checks: Iterable[Check] | None = None,
184
+ raise_on_fail: bool = True,
185
+ ) -> list[CheckResult]:
186
+ """Run all checks. Raises :class:`DataQualityError` if any fail."""
187
+ s = settings or get_settings()
188
+ owns_con = con is None
189
+ con = con or duckdb.connect(str(s.db_path), read_only=True)
190
+ checks = list(checks if checks is not None else default_checks())
191
+ try:
192
+ results = [c.run(con) for c in checks]
193
+ finally:
194
+ if owns_con:
195
+ con.close()
196
+
197
+ failed = [r for r in results if not r.passed]
198
+ for r in results:
199
+ status = "PASS" if r.passed else "FAIL"
200
+ log.info(" [%s] %-32s %s (%d violating rows)", status, r.name, r.relation, r.failing_rows)
201
+ if failed and raise_on_fail:
202
+ names = ", ".join(f"{r.name} ({r.failing_rows} rows)" for r in failed)
203
+ raise DataQualityError(f"{len(failed)} data-quality check(s) failed: {names}")
204
+ return results
205
+
206
+
207
+ if __name__ == "__main__": # pragma: no cover
208
+ logging.basicConfig(level=logging.INFO, format="%(message)s")
209
+ run()
210
+ print("all data-quality checks passed")
pipeline/sql/marts/customer_cohort_retention.sql ADDED
@@ -0,0 +1,39 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Mart: monthly cohort retention.
2
+ -- A customer's cohort is their signup month. For each cohort we measure how many
3
+ -- customers placed a completed order N months later (month_number = 0,1,2,...).
4
+ CREATE OR REPLACE TABLE marts.customer_cohort_retention AS
5
+ WITH cohorts AS (
6
+ SELECT customer_id, signup_month AS cohort_month
7
+ FROM staging.stg_customers
8
+ ),
9
+ -- Distinct active months per customer (from completed orders).
10
+ activity AS (
11
+ SELECT DISTINCT customer_id, order_month AS activity_month
12
+ FROM marts.int_order_revenue
13
+ WHERE is_completed
14
+ ),
15
+ cohort_size AS (
16
+ SELECT cohort_month, count(*) AS cohort_customers
17
+ FROM cohorts
18
+ GROUP BY cohort_month
19
+ ),
20
+ retained AS (
21
+ SELECT
22
+ c.cohort_month,
23
+ -- whole months between signup and activity
24
+ (date_diff('month', c.cohort_month, a.activity_month)) AS month_number,
25
+ count(DISTINCT c.customer_id) AS active_customers
26
+ FROM cohorts c
27
+ JOIN activity a USING (customer_id)
28
+ WHERE a.activity_month >= c.cohort_month
29
+ GROUP BY c.cohort_month, month_number
30
+ )
31
+ SELECT
32
+ r.cohort_month,
33
+ r.month_number,
34
+ s.cohort_customers,
35
+ r.active_customers,
36
+ CAST(r.active_customers AS DOUBLE) / NULLIF(s.cohort_customers, 0) AS retention_rate
37
+ FROM retained r
38
+ JOIN cohort_size s USING (cohort_month)
39
+ ORDER BY r.cohort_month, r.month_number;
pipeline/sql/marts/daily_revenue.sql ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Mart: daily revenue and order KPIs from completed orders.
2
+ -- Refunded/cancelled orders are excluded from revenue but counted separately.
3
+ CREATE OR REPLACE TABLE marts.daily_revenue AS
4
+ WITH completed AS (
5
+ SELECT * FROM marts.int_order_revenue WHERE is_completed
6
+ ),
7
+ by_day AS (
8
+ SELECT
9
+ order_date,
10
+ count(*) AS orders,
11
+ count(DISTINCT customer_id) AS customers,
12
+ sum(n_items) AS units_sold,
13
+ CAST(sum(gross_revenue) AS DECIMAL(14, 2)) AS revenue,
14
+ CAST(sum(gross_profit) AS DECIMAL(14, 2)) AS gross_profit
15
+ FROM completed
16
+ GROUP BY order_date
17
+ )
18
+ SELECT
19
+ order_date,
20
+ orders,
21
+ customers,
22
+ units_sold,
23
+ revenue,
24
+ gross_profit,
25
+ CAST(revenue / NULLIF(orders, 0) AS DECIMAL(12, 2)) AS avg_order_value,
26
+ CAST(gross_profit / NULLIF(revenue, 0) AS DECIMAL(6, 4)) AS margin_pct
27
+ FROM by_day
28
+ ORDER BY order_date;
pipeline/sql/marts/funnel_conversion.sql ADDED
@@ -0,0 +1,31 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Mart: funnel conversion across the view -> add_to_cart -> checkout -> purchase steps.
2
+ -- Sessions are counted at the deepest step they reached, then we report the count
3
+ -- reaching each step and the step-over-step + overall conversion rates.
4
+ CREATE OR REPLACE TABLE marts.funnel_conversion AS
5
+ WITH session_depth AS (
6
+ SELECT session_id, max(funnel_step) AS max_step
7
+ FROM staging.stg_events
8
+ GROUP BY session_id
9
+ ),
10
+ steps(step_name, step_index) AS (
11
+ VALUES ('view', 1), ('add_to_cart', 2), ('checkout', 3), ('purchase', 4)
12
+ ),
13
+ reached AS (
14
+ SELECT
15
+ s.step_name,
16
+ s.step_index,
17
+ count(*) FILTER (WHERE d.max_step >= s.step_index) AS sessions
18
+ FROM steps s
19
+ CROSS JOIN session_depth d
20
+ GROUP BY s.step_name, s.step_index
21
+ )
22
+ SELECT
23
+ step_name,
24
+ step_index,
25
+ sessions,
26
+ CAST(sessions AS DOUBLE)
27
+ / NULLIF(first_value(sessions) OVER (ORDER BY step_index), 0) AS pct_of_top,
28
+ CAST(sessions AS DOUBLE)
29
+ / NULLIF(lag(sessions) OVER (ORDER BY step_index), 0) AS step_conversion
30
+ FROM reached
31
+ ORDER BY step_index;
pipeline/sql/marts/int_order_revenue.sql ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Intermediate: one row per order with rolled-up revenue/profit from its items.
2
+ -- Shared by several marts, so we materialise it as a table.
3
+ CREATE OR REPLACE TABLE marts.int_order_revenue AS
4
+ SELECT
5
+ o.order_id,
6
+ o.customer_id,
7
+ o.order_date,
8
+ o.order_month,
9
+ o.status,
10
+ o.is_completed,
11
+ COALESCE(i.n_items, 0) AS n_items,
12
+ COALESCE(i.gross_revenue, 0) AS gross_revenue,
13
+ COALESCE(i.gross_profit, 0) AS gross_profit
14
+ FROM staging.stg_orders o
15
+ LEFT JOIN (
16
+ SELECT
17
+ order_id,
18
+ sum(quantity) AS n_items,
19
+ sum(line_revenue) AS gross_revenue,
20
+ sum(line_gross_profit) AS gross_profit
21
+ FROM staging.stg_order_items
22
+ GROUP BY order_id
23
+ ) i USING (order_id);
pipeline/sql/marts/top_products.sql ADDED
@@ -0,0 +1,23 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Mart: product leaderboard by revenue, with category and units.
2
+ -- Built from completed orders only so it reflects realised sales.
3
+ CREATE OR REPLACE TABLE marts.top_products AS
4
+ WITH completed_items AS (
5
+ SELECT oi.*
6
+ FROM staging.stg_order_items oi
7
+ JOIN marts.int_order_revenue o USING (order_id)
8
+ WHERE o.is_completed
9
+ )
10
+ SELECT
11
+ p.product_id,
12
+ p.product_name,
13
+ p.category,
14
+ sum(ci.quantity) AS units_sold,
15
+ count(DISTINCT ci.order_id) AS orders,
16
+ CAST(sum(ci.line_revenue) AS DECIMAL(14, 2)) AS revenue,
17
+ CAST(sum(ci.line_gross_profit) AS DECIMAL(14, 2)) AS gross_profit,
18
+ CAST(sum(ci.line_gross_profit) / NULLIF(sum(ci.line_revenue), 0) AS DECIMAL(6, 4)) AS margin_pct,
19
+ row_number() OVER (ORDER BY sum(ci.line_revenue) DESC) AS revenue_rank
20
+ FROM completed_items ci
21
+ JOIN staging.stg_products p USING (product_id)
22
+ GROUP BY p.product_id, p.product_name, p.category
23
+ ORDER BY revenue DESC;
pipeline/sql/staging/stg_customers.sql ADDED
@@ -0,0 +1,9 @@
 
 
 
 
 
 
 
 
 
 
1
+ -- Staging: typed, deduplicated customers with derived signup cohort month.
2
+ CREATE OR REPLACE VIEW staging.stg_customers AS
3
+ SELECT
4
+ customer_id,
5
+ CAST(signup_date AS DATE) AS signup_date,
6
+ date_trunc('month', CAST(signup_date AS DATE)) AS signup_month,
7
+ lower(channel) AS channel,
8
+ upper(country) AS country
9
+ FROM raw.customers;
pipeline/sql/staging/stg_events.sql ADDED
@@ -0,0 +1,16 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Staging: funnel events normalised to an ordinal step index.
2
+ CREATE OR REPLACE VIEW staging.stg_events AS
3
+ SELECT
4
+ event_id,
5
+ session_id,
6
+ customer_id,
7
+ lower(event_type) AS event_type,
8
+ CASE lower(event_type)
9
+ WHEN 'view' THEN 1
10
+ WHEN 'add_to_cart' THEN 2
11
+ WHEN 'checkout' THEN 3
12
+ WHEN 'purchase' THEN 4
13
+ ELSE 0
14
+ END AS funnel_step,
15
+ CAST(event_ts AS TIMESTAMP) AS event_ts
16
+ FROM raw.events;
pipeline/sql/staging/stg_order_items.sql ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Staging: order line items with computed line revenue and cost.
2
+ CREATE OR REPLACE VIEW staging.stg_order_items AS
3
+ SELECT
4
+ oi.order_item_id,
5
+ oi.order_id,
6
+ oi.product_id,
7
+ oi.quantity,
8
+ CAST(oi.unit_price AS DECIMAL(10, 2)) AS unit_price,
9
+ CAST(oi.unit_price * oi.quantity AS DECIMAL(12, 2)) AS line_revenue,
10
+ CAST(p.unit_cost * oi.quantity AS DECIMAL(12, 2)) AS line_cost,
11
+ CAST((oi.unit_price - p.unit_cost) * oi.quantity AS DECIMAL(12, 2)) AS line_gross_profit
12
+ FROM raw.order_items oi
13
+ JOIN staging.stg_products p USING (product_id);
pipeline/sql/staging/stg_orders.sql ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Staging: orders with split date/time parts for downstream rollups.
2
+ CREATE OR REPLACE VIEW staging.stg_orders AS
3
+ SELECT
4
+ order_id,
5
+ customer_id,
6
+ CAST(order_ts AS TIMESTAMP) AS order_ts,
7
+ CAST(order_ts AS DATE) AS order_date,
8
+ date_trunc('month', CAST(order_ts AS DATE)) AS order_month,
9
+ lower(status) AS status,
10
+ (lower(status) = 'completed') AS is_completed
11
+ FROM raw.orders;
pipeline/sql/staging/stg_products.sql ADDED
@@ -0,0 +1,10 @@
 
 
 
 
 
 
 
 
 
 
 
1
+ -- Staging: products with a derived unit gross margin.
2
+ CREATE OR REPLACE VIEW staging.stg_products AS
3
+ SELECT
4
+ product_id,
5
+ product_name,
6
+ category,
7
+ CAST(unit_price AS DECIMAL(10, 2)) AS unit_price,
8
+ CAST(unit_cost AS DECIMAL(10, 2)) AS unit_cost,
9
+ CAST(unit_price - unit_cost AS DECIMAL(10, 2)) AS unit_margin
10
+ FROM raw.products;
pipeline/transform.py ADDED
@@ -0,0 +1,73 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """Transform stage: build staging views and mart tables from SQL files.
2
+
3
+ The SQL lives in ``pipeline/sql`` and is executed in a deterministic, dependency
4
+ -aware order. Keeping the transformation in plain ``.sql`` files (rather than
5
+ inline strings) means the models read like a dbt project and can be inspected or
6
+ run by hand with the DuckDB CLI.
7
+ """
8
+
9
+ from __future__ import annotations
10
+
11
+ import logging
12
+
13
+ import duckdb
14
+
15
+ from pipeline.config import Settings, get_settings
16
+
17
+ log = logging.getLogger("commerce.transform")
18
+
19
+ # Execution order. Staging first (views over raw), then the intermediate model,
20
+ # then the marts (which may depend on the intermediate model and each other-free).
21
+ STAGING_MODELS = [
22
+ "staging/stg_customers.sql",
23
+ "staging/stg_products.sql",
24
+ "staging/stg_orders.sql",
25
+ "staging/stg_order_items.sql",
26
+ "staging/stg_events.sql",
27
+ ]
28
+ MART_MODELS = [
29
+ "marts/int_order_revenue.sql", # intermediate; must precede the marts below
30
+ "marts/daily_revenue.sql",
31
+ "marts/top_products.sql",
32
+ "marts/customer_cohort_retention.sql",
33
+ "marts/funnel_conversion.sql",
34
+ ]
35
+
36
+
37
+ def _run_model(con: duckdb.DuckDBPyConnection, sql_dir, rel_path: str) -> None:
38
+ path = sql_dir / rel_path
39
+ sql = path.read_text(encoding="utf-8")
40
+ log.info("building model %s", rel_path)
41
+ con.execute(sql)
42
+
43
+
44
+ def run(con: duckdb.DuckDBPyConnection | None = None, settings: Settings | None = None) -> list[str]:
45
+ """Execute all models. Returns the list of mart relation names built."""
46
+ s = settings or get_settings()
47
+ owns_con = con is None
48
+ con = con or duckdb.connect(str(s.db_path))
49
+ try:
50
+ con.execute("CREATE SCHEMA IF NOT EXISTS staging;")
51
+ con.execute("CREATE SCHEMA IF NOT EXISTS marts;")
52
+ for model in STAGING_MODELS:
53
+ _run_model(con, s.sql_dir, model)
54
+ for model in MART_MODELS:
55
+ _run_model(con, s.sql_dir, model)
56
+
57
+ marts = [
58
+ row[0]
59
+ for row in con.execute(
60
+ "SELECT table_name FROM information_schema.tables "
61
+ "WHERE table_schema = 'marts' ORDER BY table_name;"
62
+ ).fetchall()
63
+ ]
64
+ log.info("built marts: %s", marts)
65
+ return marts
66
+ finally:
67
+ if owns_con:
68
+ con.close()
69
+
70
+
71
+ if __name__ == "__main__": # pragma: no cover
72
+ logging.basicConfig(level=logging.INFO, format="%(message)s")
73
+ print(run())
pyproject.toml ADDED
@@ -0,0 +1,34 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ [build-system]
2
+ requires = ["setuptools>=68"]
3
+ build-backend = "setuptools.build_meta"
4
+
5
+ [project]
6
+ name = "commerce-pipeline"
7
+ version = "1.0.0"
8
+ description = "An end-to-end e-commerce analytics pipeline: synthetic ingest, DuckDB warehouse, SQL marts, data-quality gates, and a Streamlit dashboard."
9
+ readme = "README.md"
10
+ requires-python = ">=3.9"
11
+ license = { text = "MIT" }
12
+ authors = [{ name = "Laela Zorana" }]
13
+ dependencies = [
14
+ "duckdb>=1.0",
15
+ "pandas>=2.0",
16
+ "pyarrow>=14.0",
17
+ ]
18
+
19
+ [project.optional-dependencies]
20
+ dashboard = ["streamlit>=1.30", "altair>=5.0"]
21
+ dev = ["pytest>=8.0"]
22
+
23
+ [project.scripts]
24
+ commerce-pipeline = "pipeline.cli:main"
25
+
26
+ [tool.setuptools.packages.find]
27
+ include = ["pipeline*"]
28
+
29
+ [tool.setuptools.package-data]
30
+ pipeline = ["sql/**/*.sql"]
31
+
32
+ [tool.pytest.ini_options]
33
+ addopts = "-q"
34
+ testpaths = ["tests"]
requirements.txt ADDED
@@ -0,0 +1,11 @@
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Core pipeline (ingest -> load -> transform -> quality)
2
+ duckdb>=1.0
3
+ pandas>=2.0
4
+ pyarrow>=14.0
5
+
6
+ # Dashboard
7
+ streamlit>=1.30
8
+ altair>=5.0
9
+
10
+ # Tests
11
+ pytest>=8.0