from __future__ import annotations from datetime import datetime, timedelta, timezone import duckdb import pandas as pd import streamlit as st from data_access import download_gold_path, load_gold_table, load_log_table st.set_page_config(page_title="WorldCup Pulse DataOps", page_icon="โš™๏ธ", layout="wide", initial_sidebar_state="expanded") CSS = """ """ st.markdown(CSS, unsafe_allow_html=True) GOLD_TABLES = [ "kpi_summary.parquet", "goals_by_matchday.parquet", "goals_by_minute_bucket.parquet", "host_cities.parquet", "team_radar_stats.parquet", "team_key_metrics.parquet", "top_players.parquet", "team_table.parquet", "matches.parquet", "group_standings.parquet", "match_events.parquet", "substitutions.parquet", "lineups.parquet", "goalkeepers.parquet", "match_player_stats.parquet", ] def status_badge(status: str) -> str: s = str(status or "").lower() if "success" in s: return 'Success' if "fail" in s: return 'Fail' return f'{status}' def next_run_text() -> str: now = datetime.now(timezone.utc) minute = ((now.minute // 5) + 1) * 5 nxt = now.replace(second=0, microsecond=0) if minute >= 60: nxt = nxt.replace(minute=0) + timedelta(hours=1) else: nxt = nxt.replace(minute=minute) return nxt.strftime("%H:%M UTC") def card(label: str, value: str, icon: str): st.markdown(f'
{icon} {label}
{value}
', unsafe_allow_html=True) def heartbeat(): runs = load_log_table("pipeline_runs.csv") if "finished_at" in runs.columns: runs = runs.sort_values("finished_at", ascending=False) latest = runs.iloc[0].to_dict() if not runs.empty else {} c1, c2, c3, c4 = st.columns(4) with c1: card("Latest Status", str(latest.get("status", "Unknown")), "๐Ÿซ€") with c2: card("Last Sync from HF Dataset", str(latest.get("finished_at", "N/A"))[:19], "๐Ÿ”„") with c3: card("Next Scheduled Run", next_run_text(), "โฑ๏ธ") with c4: card("Gold Rows", str(latest.get("rows_gold", "N/A")), "๐Ÿ…") st.markdown("#### Pipeline Heartbeat") view = runs.head(15).copy() if not view.empty and "status" in view.columns: view["status_badge"] = view["status"].map(status_badge) html = view[[c for c in ["run_id","started_at","finished_at","status_badge","rows_bronze","rows_silver","rows_gold","error_message"] if c in view.columns]].to_html(escape=False, index=False) st.markdown(html, unsafe_allow_html=True) else: st.info("No pipeline runs available yet. Mock fallback is being used.") def quality_tabs(): q = load_log_table("quality_checks.csv") tabs = st.tabs(["Overview", "Bronze", "Silver", "Gold"]) layers = [None, "Bronze", "Silver", "Gold"] for tab, layer in zip(tabs, layers): with tab: df = q.copy() if layer is None else q[q["layer"].astype(str).str.lower().eq(layer.lower())].copy() if df.empty: st.warning("No checks found for this layer.") continue pass_count = int(df["status"].astype(str).str.lower().eq("pass").sum()) fail_count = int(len(df) - pass_count) c1, c2 = st.columns(2) with c1: card("Checks Passed", str(pass_count), "โœ…") with c2: card("Checks Failed", str(fail_count), "๐Ÿšจ") for _, row in df.iterrows(): passed = str(row.get("status", "")).lower() == "pass" with st.status(f"{row.get('layer','')} ยท {row.get('table','')} ยท {row.get('check_name','')}", state="complete" if passed else "error"): st.write(row.get("message", "")) def duckdb_console(): st.markdown("#### DuckDB Console") table = st.selectbox("Gold table", GOLD_TABLES) path = download_gold_path(table) query = f"SELECT * FROM '{path}' LIMIT 10" if path else f"SELECT * FROM mock_{table.replace('.parquet','')} LIMIT 10" st.markdown(f'
{query}
', unsafe_allow_html=True) if path: try: df = duckdb.sql(query).df() except Exception as exc: st.error(f"DuckDB read failed: {exc}") df = load_gold_table(table).head(10) else: df = load_gold_table(table).head(10) search = st.text_input("Smart search", placeholder="Type to filter rows...") if search and not df.empty: mask = df.astype(str).apply(lambda col: col.str.contains(search, case=False, na=False)).any(axis=1) df = df[mask] st.dataframe(df, use_container_width=True, hide_index=True) st.sidebar.markdown("### WorldCup Pulse") st.sidebar.radio("Navigation", ["โš™๏ธ Data Ops Monitor"], label_visibility="collapsed") st.sidebar.markdown('', unsafe_allow_html=True) st.sidebar.caption("Cloudflare cron โ†’ GitHub Actions โ†’ HF Dataset") st.markdown('

โš™๏ธ WorldCup Pulse Data Ops Monitor

Operational view for near-real-time ETL, lakehouse quality checks, and DuckDB gold marts.

', unsafe_allow_html=True) heartbeat() st.markdown("---") st.markdown("### Data Quality Tabs") quality_tabs() st.markdown("---") duckdb_console()