from __future__ import annotations
from datetime import datetime, timedelta, timezone
import duckdb
import pandas as pd
import streamlit as st
from data_access import download_gold_path, load_gold_table, load_log_table
st.set_page_config(page_title="WorldCup Pulse DataOps", page_icon="โ๏ธ", layout="wide", initial_sidebar_state="expanded")
CSS = """
"""
st.markdown(CSS, unsafe_allow_html=True)
GOLD_TABLES = [
"kpi_summary.parquet",
"goals_by_matchday.parquet",
"goals_by_minute_bucket.parquet",
"host_cities.parquet",
"team_radar_stats.parquet",
"team_key_metrics.parquet",
"top_players.parquet",
"team_table.parquet",
"matches.parquet",
"group_standings.parquet",
"match_events.parquet",
"substitutions.parquet",
"lineups.parquet",
"goalkeepers.parquet",
"match_player_stats.parquet",
]
def status_badge(status: str) -> str:
s = str(status or "").lower()
if "success" in s:
return 'Success'
if "fail" in s:
return 'Fail'
return f'{status}'
def next_run_text() -> str:
now = datetime.now(timezone.utc)
minute = ((now.minute // 5) + 1) * 5
nxt = now.replace(second=0, microsecond=0)
if minute >= 60:
nxt = nxt.replace(minute=0) + timedelta(hours=1)
else:
nxt = nxt.replace(minute=minute)
return nxt.strftime("%H:%M UTC")
def card(label: str, value: str, icon: str):
st.markdown(f'
', unsafe_allow_html=True)
def heartbeat():
runs = load_log_table("pipeline_runs.csv")
if "finished_at" in runs.columns:
runs = runs.sort_values("finished_at", ascending=False)
latest = runs.iloc[0].to_dict() if not runs.empty else {}
c1, c2, c3, c4 = st.columns(4)
with c1: card("Latest Status", str(latest.get("status", "Unknown")), "๐ซ")
with c2: card("Last Sync from HF Dataset", str(latest.get("finished_at", "N/A"))[:19], "๐")
with c3: card("Next Scheduled Run", next_run_text(), "โฑ๏ธ")
with c4: card("Gold Rows", str(latest.get("rows_gold", "N/A")), "๐
")
st.markdown("#### Pipeline Heartbeat")
view = runs.head(15).copy()
if not view.empty and "status" in view.columns:
view["status_badge"] = view["status"].map(status_badge)
html = view[[c for c in ["run_id","started_at","finished_at","status_badge","rows_bronze","rows_silver","rows_gold","error_message"] if c in view.columns]].to_html(escape=False, index=False)
st.markdown(html, unsafe_allow_html=True)
else:
st.info("No pipeline runs available yet. Mock fallback is being used.")
def quality_tabs():
q = load_log_table("quality_checks.csv")
tabs = st.tabs(["Overview", "Bronze", "Silver", "Gold"])
layers = [None, "Bronze", "Silver", "Gold"]
for tab, layer in zip(tabs, layers):
with tab:
df = q.copy() if layer is None else q[q["layer"].astype(str).str.lower().eq(layer.lower())].copy()
if df.empty:
st.warning("No checks found for this layer.")
continue
pass_count = int(df["status"].astype(str).str.lower().eq("pass").sum())
fail_count = int(len(df) - pass_count)
c1, c2 = st.columns(2)
with c1: card("Checks Passed", str(pass_count), "โ
")
with c2: card("Checks Failed", str(fail_count), "๐จ")
for _, row in df.iterrows():
passed = str(row.get("status", "")).lower() == "pass"
with st.status(f"{row.get('layer','')} ยท {row.get('table','')} ยท {row.get('check_name','')}", state="complete" if passed else "error"):
st.write(row.get("message", ""))
def duckdb_console():
st.markdown("#### DuckDB Console")
table = st.selectbox("Gold table", GOLD_TABLES)
path = download_gold_path(table)
query = f"SELECT * FROM '{path}' LIMIT 10" if path else f"SELECT * FROM mock_{table.replace('.parquet','')} LIMIT 10"
st.markdown(f'{query}
', unsafe_allow_html=True)
if path:
try:
df = duckdb.sql(query).df()
except Exception as exc:
st.error(f"DuckDB read failed: {exc}")
df = load_gold_table(table).head(10)
else:
df = load_gold_table(table).head(10)
search = st.text_input("Smart search", placeholder="Type to filter rows...")
if search and not df.empty:
mask = df.astype(str).apply(lambda col: col.str.contains(search, case=False, na=False)).any(axis=1)
df = df[mask]
st.dataframe(df, use_container_width=True, hide_index=True)
st.sidebar.markdown("### WorldCup Pulse")
st.sidebar.radio("Navigation", ["โ๏ธ Data Ops Monitor"], label_visibility="collapsed")
st.sidebar.markdown('', unsafe_allow_html=True)
st.sidebar.caption("Cloudflare cron โ GitHub Actions โ HF Dataset")
st.markdown('โ๏ธ WorldCup Pulse Data Ops Monitor
Operational view for near-real-time ETL, lakehouse quality checks, and DuckDB gold marts.
', unsafe_allow_html=True)
heartbeat()
st.markdown("---")
st.markdown("### Data Quality Tabs")
quality_tabs()
st.markdown("---")
duckdb_console()