analytics-bot / demo.py
Arunvithyasegar's picture
Upload 6 files
7878c12 verified
"""
demo.py β€” Analytics Validation Demo
Entry point for the analytics automation demo. Loads a CSV of daily business
metrics, runs a deterministic rule-based validation engine, optionally enriches
the findings with an LLM narrative, and prints a structured console report.
Usage:
python demo.py
The script expects data.csv to be in the same directory as this file.
"""
from __future__ import annotations
import sys
from datetime import datetime
from pathlib import Path
import pandas as pd # type: ignore
from llm_utils import generate_summary # type: ignore
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DIVIDER = "=" * 60
SECTION_DIV = "-" * 60
# A day-over-day drop beyond this threshold is flagged as an anomaly.
# Why 20%: this is a common business heuristic for "something unusual happened"
# in daily revenue or order metrics. Adjust for your domain as needed.
ANOMALY_THRESHOLD = -0.20
REQUIRED_COLUMNS = {"date", "revenue", "orders"}
# ---------------------------------------------------------------------------
# Data loading
# ---------------------------------------------------------------------------
def load_data(filepath: str) -> pd.DataFrame:
"""
Read and validate the input CSV file.
Args:
filepath: Absolute or relative path to the CSV.
Returns:
A DataFrame with columns [date (datetime64), revenue (float), orders (numeric)],
sorted ascending by date.
Raises:
FileNotFoundError: if the file does not exist.
ValueError: if required columns are missing or dates cannot be parsed.
pd.errors.EmptyDataError: if the file is empty.
"""
df = pd.read_csv(filepath, parse_dates=["date"])
# Validate required columns before any processing
missing_cols = REQUIRED_COLUMNS - set(df.columns)
if missing_cols:
found = sorted(df.columns.tolist())
raise ValueError(
f"CSV is missing required column(s): {sorted(missing_cols)}. "
f"Found: {found}"
)
# Confirm date column parsed correctly β€” bad date formats produce object dtype
if not pd.api.types.is_datetime64_any_dtype(df["date"]):
raise ValueError(
"The 'date' column could not be parsed as dates. "
"Ensure dates are in YYYY-MM-DD format."
)
# Sort ascending so day-over-day calculations are always forward-looking
df = df.sort_values("date").reset_index(drop=True)
return df
# ---------------------------------------------------------------------------
# Rule engine
# ---------------------------------------------------------------------------
def run_checks(df: pd.DataFrame) -> dict:
"""
Run all deterministic validation checks against the DataFrame.
This function never calls any LLM or external service. Every result is
derived purely from the data using pandas arithmetic and comparisons.
Args:
df: Clean DataFrame from load_data().
Returns:
A dict with keys:
"issues": list of issue dicts (see _make_issue for structure)
"stats": dict with "revenue", "orders", and "date_range" sub-dicts
"""
issues: list[dict] = []
# -- Check A: Missing values -----------------------------------------
# NaN values in numeric columns make any aggregation over that period
# potentially misleading. Surface them so analysts know before reporting.
for col in ["revenue", "orders"]:
missing_mask = df[col].isna()
missing_count = int(missing_mask.sum())
if missing_count > 0:
affected_dates = df.loc[missing_mask, "date"].dt.strftime("%Y-%m-%d").tolist()
issues.append(
_make_issue(
type_="missing_values",
severity="WARNING",
column=col,
detail=f"{missing_count} missing value(s) in '{col}' column",
dates=affected_dates,
value=float(missing_count),
)
)
# -- Check B: Row count consistency ----------------------------------
# A missing date in a time series is invisible to most BI tools and
# creates silent gaps in trend lines. Flagging it early prevents charts
# that appear continuous but are actually dropping a day of data.
date_start = df["date"].min()
date_end = df["date"].max()
expected_rows = (date_end - date_start).days + 1
actual_rows = len(df)
row_gap = expected_rows - actual_rows
if row_gap != 0:
issues.append(
_make_issue(
type_="row_count",
severity="WARNING",
column=None,
detail=(
f"Expected {expected_rows} rows for date range "
f"{date_start.date()} to {date_end.date()}, "
f"found {actual_rows} (gap: {row_gap})"
),
dates=[],
value=float(row_gap),
)
)
# -- Check C: Day-over-day anomaly drops -----------------------------
# A >20% single-day drop almost always signals either a data quality
# problem or a significant business event requiring executive attention.
# We drop NaN rows before calling pct_change() to prevent a missing value
# from propagating into the percentage calculation for adjacent rows.
for col in ["revenue", "orders"]:
series_clean = df[["date", col]].dropna(subset=[col]).copy()
series_clean["pct_change"] = series_clean[col].pct_change()
anomalies = series_clean[series_clean["pct_change"] < ANOMALY_THRESHOLD]
for _, row in anomalies.iterrows():
pct = float(row["pct_change"])
issues.append(
_make_issue(
type_="anomaly_drop",
severity="WARNING",
column=col,
detail=(
f"'{col}' dropped {pct:.1%} on "
f"{row['date'].strftime('%Y-%m-%d')}"
),
dates=[str(row["date"].strftime("%Y-%m-%d"))],
value=round(float(pct * 100), 2), # type: ignore
)
)
# -- Check D: Duplicate dates ----------------------------------------
# Duplicate dates cause silent double-counting in GROUP BY aggregations.
# This is classified as ERROR (not WARNING) because it corrupts totals.
dup_mask = df["date"].duplicated(keep=False)
dup_count = int(dup_mask.sum())
if dup_count > 0:
raw_dates = df.loc[dup_mask, "date"].dt.strftime("%Y-%m-%d").unique().tolist()
dup_dates: list[str] = sorted(str(d) for d in raw_dates)
issues.append(
_make_issue(
type_="duplicate_dates",
severity="ERROR",
column="date",
detail=f"{dup_count} rows share duplicate dates: {', '.join(dup_dates)}",
dates=dup_dates,
value=float(dup_count),
)
)
# -- Statistics (always computed, skipna=True so one NaN doesn't block all stats)
stats = _compute_stats(df, date_start, date_end, actual_rows, expected_rows, row_gap)
return {"issues": issues, "stats": stats}
def _make_issue(
*,
type_: str,
severity: str,
column: str | None,
detail: str,
dates: list[str],
value: float | None,
) -> dict:
"""Return a consistently structured issue dict."""
return {
"type": type_,
"severity": severity,
"column": column,
"detail": detail,
"dates": dates,
"value": value,
}
def _compute_stats(
df: pd.DataFrame,
date_start: pd.Timestamp,
date_end: pd.Timestamp,
actual_rows: int,
expected_rows: int,
row_gap: int,
) -> dict:
"""Compute descriptive statistics for revenue and orders."""
stats: dict = {}
for col in ["revenue", "orders"]:
col_data = df[col]
stats[col] = {
"min": _safe_round(col_data.min(skipna=True), 2),
"max": _safe_round(col_data.max(skipna=True), 2),
"mean": _safe_round(col_data.mean(skipna=True), 2),
"std": _safe_round(col_data.std(skipna=True), 2),
"total": _safe_round(col_data.sum(skipna=True), 2),
"missing_count": int(col_data.isna().sum()),
}
stats["date_range"] = {
"start": date_start.strftime("%Y-%m-%d"),
"end": date_end.strftime("%Y-%m-%d"),
"actual_rows": actual_rows,
"expected_rows": expected_rows,
"row_gap": row_gap,
}
return stats
def _safe_round(value: float | None, ndigits: int) -> float | None:
"""Round a value, returning None if it is NaN (e.g., all-NaN column)."""
try:
if value is None or (isinstance(value, float) and pd.isna(value)):
return None
return round(float(value), int(ndigits)) # type: ignore
except (TypeError, ValueError):
return None
# ---------------------------------------------------------------------------
# Console rendering
# ---------------------------------------------------------------------------
def format_console_output(results: dict, llm_summary: str) -> None:
"""
Print a structured, ASCII-safe validation report to stdout.
This is a pure I/O function. It reads from results and llm_summary only;
it does not compute, modify, or validate anything.
Args:
results: The dict returned by run_checks().
llm_summary: The string returned by generate_summary().
"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
dr = results["stats"]["date_range"]
issues = results["issues"]
# Header
print(DIVIDER)
print(" ANALYTICS VALIDATION REPORT")
print(f" Generated : {timestamp}")
print(f" Data file : data.csv")
print(DIVIDER)
# Section 1: Data Overview
_section("1: DATA OVERVIEW")
print(f" Date Range : {dr['start']} to {dr['end']}")
print(f" Actual Rows : {dr['actual_rows']}")
print(f" Expected Rows : {dr['expected_rows']}")
gap_label = f"{dr['row_gap']} row gap detected" if dr["row_gap"] != 0 else "Consistent"
print(f" Row Status : {gap_label}")
# Section 2: KPI Statistics
_section("2: KPI STATISTICS")
rev = results["stats"]["revenue"]
print("\n REVENUE (USD)")
print(f" Min : {_fmt_usd(rev['min'])}")
print(f" Max : {_fmt_usd(rev['max'])}")
print(f" Mean : {_fmt_usd(rev['mean'])}")
print(f" Std Dev : {_fmt_usd(rev['std'])}")
print(f" Total : {_fmt_usd(rev['total'])}")
print(f" Missing : {rev['missing_count']} value(s)")
ord_ = results["stats"]["orders"]
print("\n ORDERS")
print(f" Min : {_fmt_int(ord_['min'])}")
print(f" Max : {_fmt_int(ord_['max'])}")
print(f" Mean : {_fmt_int(ord_['mean'])}")
print(f" Std Dev : {_fmt_int(ord_['std'])}")
print(f" Total : {_fmt_int(ord_['total'])}")
print(f" Missing : {ord_['missing_count']} value(s)")
# Section 3: Detected Issues
_section("3: DETECTED ISSUES")
print(f"\n Total issues found: {len(issues)}")
if not issues:
print("\n No issues detected. Data appears clean.")
else:
for issue in issues:
col_label = issue["column"] if issue["column"] else "N/A"
print(f"\n [{issue['severity']}] {issue['type']} | Column: {col_label}")
print(f" Detail : {issue['detail']}")
if issue["dates"]:
print(f" Dates : {', '.join(issue['dates'])}")
# Section 4: Executive Summary
_section("4: EXECUTIVE SUMMARY")
print()
print(llm_summary)
# Footer
print(f"\n{DIVIDER}")
print(" END OF REPORT")
print(DIVIDER)
def _section(title: str) -> None:
"""Print a section divider line."""
header = f"---- [ SECTION {title} ] "
print(f"\n{header}{'-' * max(0, 60 - len(header))}")
def _fmt_usd(value: float | None) -> str:
if value is None:
return "N/A"
return f"${value:,.2f}"
def _fmt_int(value: float | None) -> str:
if value is None:
return "N/A"
return f"{int(value):,}"
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main() -> None:
# Resolve data path relative to this script's directory so the demo works
# regardless of where the user invokes it from (e.g., from the repo root).
script_dir = Path(__file__).parent
data_path = script_dir / "data.csv"
# Load β€” surface file and format errors before doing any work
try:
df = load_data(str(data_path))
except FileNotFoundError:
print(f"ERROR: data.csv not found at '{data_path}'")
print("Ensure data.csv is in the same directory as demo.py.")
sys.exit(1)
except (ValueError, pd.errors.EmptyDataError) as exc:
print(f"ERROR: Could not load data β€” {exc}")
sys.exit(1)
# Rule engine β€” always deterministic, never calls external services
results = run_checks(df)
# LLM narration β€” may fall back to rule-based text (handled inside generate_summary)
llm_summary = generate_summary(results)
# Render report
format_console_output(results, llm_summary)
if __name__ == "__main__":
main()