Spaces:
Sleeping
Sleeping
| """ | |
| demo.py β Analytics Validation Demo | |
| Entry point for the analytics automation demo. Loads a CSV of daily business | |
| metrics, runs a deterministic rule-based validation engine, optionally enriches | |
| the findings with an LLM narrative, and prints a structured console report. | |
| Usage: | |
| python demo.py | |
| The script expects data.csv to be in the same directory as this file. | |
| """ | |
| from __future__ import annotations | |
| import sys | |
| from datetime import datetime | |
| from pathlib import Path | |
| import pandas as pd # type: ignore | |
| from llm_utils import generate_summary # type: ignore | |
| # --------------------------------------------------------------------------- | |
| # Constants | |
| # --------------------------------------------------------------------------- | |
| DIVIDER = "=" * 60 | |
| SECTION_DIV = "-" * 60 | |
| # A day-over-day drop beyond this threshold is flagged as an anomaly. | |
| # Why 20%: this is a common business heuristic for "something unusual happened" | |
| # in daily revenue or order metrics. Adjust for your domain as needed. | |
| ANOMALY_THRESHOLD = -0.20 | |
| REQUIRED_COLUMNS = {"date", "revenue", "orders"} | |
| # --------------------------------------------------------------------------- | |
| # Data loading | |
| # --------------------------------------------------------------------------- | |
| def load_data(filepath: str) -> pd.DataFrame: | |
| """ | |
| Read and validate the input CSV file. | |
| Args: | |
| filepath: Absolute or relative path to the CSV. | |
| Returns: | |
| A DataFrame with columns [date (datetime64), revenue (float), orders (numeric)], | |
| sorted ascending by date. | |
| Raises: | |
| FileNotFoundError: if the file does not exist. | |
| ValueError: if required columns are missing or dates cannot be parsed. | |
| pd.errors.EmptyDataError: if the file is empty. | |
| """ | |
| df = pd.read_csv(filepath, parse_dates=["date"]) | |
| # Validate required columns before any processing | |
| missing_cols = REQUIRED_COLUMNS - set(df.columns) | |
| if missing_cols: | |
| found = sorted(df.columns.tolist()) | |
| raise ValueError( | |
| f"CSV is missing required column(s): {sorted(missing_cols)}. " | |
| f"Found: {found}" | |
| ) | |
| # Confirm date column parsed correctly β bad date formats produce object dtype | |
| if not pd.api.types.is_datetime64_any_dtype(df["date"]): | |
| raise ValueError( | |
| "The 'date' column could not be parsed as dates. " | |
| "Ensure dates are in YYYY-MM-DD format." | |
| ) | |
| # Sort ascending so day-over-day calculations are always forward-looking | |
| df = df.sort_values("date").reset_index(drop=True) | |
| return df | |
| # --------------------------------------------------------------------------- | |
| # Rule engine | |
| # --------------------------------------------------------------------------- | |
| def run_checks(df: pd.DataFrame) -> dict: | |
| """ | |
| Run all deterministic validation checks against the DataFrame. | |
| This function never calls any LLM or external service. Every result is | |
| derived purely from the data using pandas arithmetic and comparisons. | |
| Args: | |
| df: Clean DataFrame from load_data(). | |
| Returns: | |
| A dict with keys: | |
| "issues": list of issue dicts (see _make_issue for structure) | |
| "stats": dict with "revenue", "orders", and "date_range" sub-dicts | |
| """ | |
| issues: list[dict] = [] | |
| # -- Check A: Missing values ----------------------------------------- | |
| # NaN values in numeric columns make any aggregation over that period | |
| # potentially misleading. Surface them so analysts know before reporting. | |
| for col in ["revenue", "orders"]: | |
| missing_mask = df[col].isna() | |
| missing_count = int(missing_mask.sum()) | |
| if missing_count > 0: | |
| affected_dates = df.loc[missing_mask, "date"].dt.strftime("%Y-%m-%d").tolist() | |
| issues.append( | |
| _make_issue( | |
| type_="missing_values", | |
| severity="WARNING", | |
| column=col, | |
| detail=f"{missing_count} missing value(s) in '{col}' column", | |
| dates=affected_dates, | |
| value=float(missing_count), | |
| ) | |
| ) | |
| # -- Check B: Row count consistency ---------------------------------- | |
| # A missing date in a time series is invisible to most BI tools and | |
| # creates silent gaps in trend lines. Flagging it early prevents charts | |
| # that appear continuous but are actually dropping a day of data. | |
| date_start = df["date"].min() | |
| date_end = df["date"].max() | |
| expected_rows = (date_end - date_start).days + 1 | |
| actual_rows = len(df) | |
| row_gap = expected_rows - actual_rows | |
| if row_gap != 0: | |
| issues.append( | |
| _make_issue( | |
| type_="row_count", | |
| severity="WARNING", | |
| column=None, | |
| detail=( | |
| f"Expected {expected_rows} rows for date range " | |
| f"{date_start.date()} to {date_end.date()}, " | |
| f"found {actual_rows} (gap: {row_gap})" | |
| ), | |
| dates=[], | |
| value=float(row_gap), | |
| ) | |
| ) | |
| # -- Check C: Day-over-day anomaly drops ----------------------------- | |
| # A >20% single-day drop almost always signals either a data quality | |
| # problem or a significant business event requiring executive attention. | |
| # We drop NaN rows before calling pct_change() to prevent a missing value | |
| # from propagating into the percentage calculation for adjacent rows. | |
| for col in ["revenue", "orders"]: | |
| series_clean = df[["date", col]].dropna(subset=[col]).copy() | |
| series_clean["pct_change"] = series_clean[col].pct_change() | |
| anomalies = series_clean[series_clean["pct_change"] < ANOMALY_THRESHOLD] | |
| for _, row in anomalies.iterrows(): | |
| pct = float(row["pct_change"]) | |
| issues.append( | |
| _make_issue( | |
| type_="anomaly_drop", | |
| severity="WARNING", | |
| column=col, | |
| detail=( | |
| f"'{col}' dropped {pct:.1%} on " | |
| f"{row['date'].strftime('%Y-%m-%d')}" | |
| ), | |
| dates=[str(row["date"].strftime("%Y-%m-%d"))], | |
| value=round(float(pct * 100), 2), # type: ignore | |
| ) | |
| ) | |
| # -- Check D: Duplicate dates ---------------------------------------- | |
| # Duplicate dates cause silent double-counting in GROUP BY aggregations. | |
| # This is classified as ERROR (not WARNING) because it corrupts totals. | |
| dup_mask = df["date"].duplicated(keep=False) | |
| dup_count = int(dup_mask.sum()) | |
| if dup_count > 0: | |
| raw_dates = df.loc[dup_mask, "date"].dt.strftime("%Y-%m-%d").unique().tolist() | |
| dup_dates: list[str] = sorted(str(d) for d in raw_dates) | |
| issues.append( | |
| _make_issue( | |
| type_="duplicate_dates", | |
| severity="ERROR", | |
| column="date", | |
| detail=f"{dup_count} rows share duplicate dates: {', '.join(dup_dates)}", | |
| dates=dup_dates, | |
| value=float(dup_count), | |
| ) | |
| ) | |
| # -- Statistics (always computed, skipna=True so one NaN doesn't block all stats) | |
| stats = _compute_stats(df, date_start, date_end, actual_rows, expected_rows, row_gap) | |
| return {"issues": issues, "stats": stats} | |
| def _make_issue( | |
| *, | |
| type_: str, | |
| severity: str, | |
| column: str | None, | |
| detail: str, | |
| dates: list[str], | |
| value: float | None, | |
| ) -> dict: | |
| """Return a consistently structured issue dict.""" | |
| return { | |
| "type": type_, | |
| "severity": severity, | |
| "column": column, | |
| "detail": detail, | |
| "dates": dates, | |
| "value": value, | |
| } | |
| def _compute_stats( | |
| df: pd.DataFrame, | |
| date_start: pd.Timestamp, | |
| date_end: pd.Timestamp, | |
| actual_rows: int, | |
| expected_rows: int, | |
| row_gap: int, | |
| ) -> dict: | |
| """Compute descriptive statistics for revenue and orders.""" | |
| stats: dict = {} | |
| for col in ["revenue", "orders"]: | |
| col_data = df[col] | |
| stats[col] = { | |
| "min": _safe_round(col_data.min(skipna=True), 2), | |
| "max": _safe_round(col_data.max(skipna=True), 2), | |
| "mean": _safe_round(col_data.mean(skipna=True), 2), | |
| "std": _safe_round(col_data.std(skipna=True), 2), | |
| "total": _safe_round(col_data.sum(skipna=True), 2), | |
| "missing_count": int(col_data.isna().sum()), | |
| } | |
| stats["date_range"] = { | |
| "start": date_start.strftime("%Y-%m-%d"), | |
| "end": date_end.strftime("%Y-%m-%d"), | |
| "actual_rows": actual_rows, | |
| "expected_rows": expected_rows, | |
| "row_gap": row_gap, | |
| } | |
| return stats | |
| def _safe_round(value: float | None, ndigits: int) -> float | None: | |
| """Round a value, returning None if it is NaN (e.g., all-NaN column).""" | |
| try: | |
| if value is None or (isinstance(value, float) and pd.isna(value)): | |
| return None | |
| return round(float(value), int(ndigits)) # type: ignore | |
| except (TypeError, ValueError): | |
| return None | |
| # --------------------------------------------------------------------------- | |
| # Console rendering | |
| # --------------------------------------------------------------------------- | |
| def format_console_output(results: dict, llm_summary: str) -> None: | |
| """ | |
| Print a structured, ASCII-safe validation report to stdout. | |
| This is a pure I/O function. It reads from results and llm_summary only; | |
| it does not compute, modify, or validate anything. | |
| Args: | |
| results: The dict returned by run_checks(). | |
| llm_summary: The string returned by generate_summary(). | |
| """ | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| dr = results["stats"]["date_range"] | |
| issues = results["issues"] | |
| # Header | |
| print(DIVIDER) | |
| print(" ANALYTICS VALIDATION REPORT") | |
| print(f" Generated : {timestamp}") | |
| print(f" Data file : data.csv") | |
| print(DIVIDER) | |
| # Section 1: Data Overview | |
| _section("1: DATA OVERVIEW") | |
| print(f" Date Range : {dr['start']} to {dr['end']}") | |
| print(f" Actual Rows : {dr['actual_rows']}") | |
| print(f" Expected Rows : {dr['expected_rows']}") | |
| gap_label = f"{dr['row_gap']} row gap detected" if dr["row_gap"] != 0 else "Consistent" | |
| print(f" Row Status : {gap_label}") | |
| # Section 2: KPI Statistics | |
| _section("2: KPI STATISTICS") | |
| rev = results["stats"]["revenue"] | |
| print("\n REVENUE (USD)") | |
| print(f" Min : {_fmt_usd(rev['min'])}") | |
| print(f" Max : {_fmt_usd(rev['max'])}") | |
| print(f" Mean : {_fmt_usd(rev['mean'])}") | |
| print(f" Std Dev : {_fmt_usd(rev['std'])}") | |
| print(f" Total : {_fmt_usd(rev['total'])}") | |
| print(f" Missing : {rev['missing_count']} value(s)") | |
| ord_ = results["stats"]["orders"] | |
| print("\n ORDERS") | |
| print(f" Min : {_fmt_int(ord_['min'])}") | |
| print(f" Max : {_fmt_int(ord_['max'])}") | |
| print(f" Mean : {_fmt_int(ord_['mean'])}") | |
| print(f" Std Dev : {_fmt_int(ord_['std'])}") | |
| print(f" Total : {_fmt_int(ord_['total'])}") | |
| print(f" Missing : {ord_['missing_count']} value(s)") | |
| # Section 3: Detected Issues | |
| _section("3: DETECTED ISSUES") | |
| print(f"\n Total issues found: {len(issues)}") | |
| if not issues: | |
| print("\n No issues detected. Data appears clean.") | |
| else: | |
| for issue in issues: | |
| col_label = issue["column"] if issue["column"] else "N/A" | |
| print(f"\n [{issue['severity']}] {issue['type']} | Column: {col_label}") | |
| print(f" Detail : {issue['detail']}") | |
| if issue["dates"]: | |
| print(f" Dates : {', '.join(issue['dates'])}") | |
| # Section 4: Executive Summary | |
| _section("4: EXECUTIVE SUMMARY") | |
| print() | |
| print(llm_summary) | |
| # Footer | |
| print(f"\n{DIVIDER}") | |
| print(" END OF REPORT") | |
| print(DIVIDER) | |
| def _section(title: str) -> None: | |
| """Print a section divider line.""" | |
| header = f"---- [ SECTION {title} ] " | |
| print(f"\n{header}{'-' * max(0, 60 - len(header))}") | |
| def _fmt_usd(value: float | None) -> str: | |
| if value is None: | |
| return "N/A" | |
| return f"${value:,.2f}" | |
| def _fmt_int(value: float | None) -> str: | |
| if value is None: | |
| return "N/A" | |
| return f"{int(value):,}" | |
| # --------------------------------------------------------------------------- | |
| # Entry point | |
| # --------------------------------------------------------------------------- | |
| def main() -> None: | |
| # Resolve data path relative to this script's directory so the demo works | |
| # regardless of where the user invokes it from (e.g., from the repo root). | |
| script_dir = Path(__file__).parent | |
| data_path = script_dir / "data.csv" | |
| # Load β surface file and format errors before doing any work | |
| try: | |
| df = load_data(str(data_path)) | |
| except FileNotFoundError: | |
| print(f"ERROR: data.csv not found at '{data_path}'") | |
| print("Ensure data.csv is in the same directory as demo.py.") | |
| sys.exit(1) | |
| except (ValueError, pd.errors.EmptyDataError) as exc: | |
| print(f"ERROR: Could not load data β {exc}") | |
| sys.exit(1) | |
| # Rule engine β always deterministic, never calls external services | |
| results = run_checks(df) | |
| # LLM narration β may fall back to rule-based text (handled inside generate_summary) | |
| llm_summary = generate_summary(results) | |
| # Render report | |
| format_console_output(results, llm_summary) | |
| if __name__ == "__main__": | |
| main() | |