| from __future__ import annotations
|
|
|
| import pandas as pd
|
|
|
| from .ai_engine import df_to_records
|
| from .models import ColumnItem, SeriesItem, Visualization
|
|
|
|
|
| DEFAULT_COLORS = [
|
| "#2563eb",
|
| "#10b981",
|
| "#f59e0b",
|
| "#ef4444",
|
| "#8b5cf6",
|
| "#14b8a6",
|
| ]
|
|
|
|
|
| def _safe_float(x) -> float | None:
|
| try:
|
| if x is None:
|
| return None
|
| v = float(x)
|
| if pd.isna(v):
|
| return None
|
| return v
|
| except Exception:
|
| return None
|
|
|
|
|
| def _format_value(v) -> str:
|
| if v is None:
|
| return "—"
|
| try:
|
| if isinstance(v, (int, float)) and not pd.isna(v):
|
| if float(v).is_integer():
|
| return f"{int(v):,}"
|
| return f"{float(v):,.2f}"
|
| except Exception:
|
| pass
|
| try:
|
| if isinstance(v, pd.Timestamp):
|
| return str(v.to_pydatetime())
|
| except Exception:
|
| pass
|
| return str(v)
|
|
|
|
|
| def _summarize_small_df(df: pd.DataFrame) -> str | None:
|
| if df is None or df.empty:
|
| return None
|
|
|
|
|
| if len(df) > 5 or df.shape[1] > 4:
|
| return None
|
|
|
| sample = df.head(5)
|
|
|
|
|
| try:
|
| sample = sample.drop_duplicates().head(5)
|
| except Exception:
|
| pass
|
|
|
|
|
| if len(sample) == 1:
|
| row = sample.iloc[0]
|
| parts = []
|
| for c in sample.columns:
|
| parts.append(f"**{c}**: {_format_value(row.get(c))}")
|
| return ", ".join(parts)
|
|
|
|
|
| if sample.shape[1] == 2:
|
| c1 = str(sample.columns[0])
|
| c2 = str(sample.columns[1])
|
| lines = []
|
| for _, r in sample.iterrows():
|
| lines.append(f"- **{_format_value(r.get(c1))}**: {_format_value(r.get(c2))}")
|
| return "\n".join(lines)
|
|
|
| return None
|
|
|
|
|
| def _build_insights_for_table(df: pd.DataFrame) -> list[str]:
|
| if df is None or df.empty:
|
| return []
|
|
|
| insights: list[str] = []
|
| insights.append(f"Rows returned: **{len(df)}**")
|
|
|
|
|
| numeric_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c])]
|
| if numeric_cols:
|
| metric = str(numeric_cols[0])
|
| top_idx = df[metric].astype(float, errors='ignore').idxmax() if len(df) else None
|
| if top_idx is not None and top_idx in df.index:
|
| top_row = df.loc[top_idx]
|
| label_col = str(df.columns[0])
|
| label_val = str(top_row.get(label_col, ""))
|
| metric_val = _safe_float(top_row.get(metric))
|
| if metric_val is not None:
|
| insights.append(f"Top by **{metric}**: **{label_val}** ({metric_val:,.2f})")
|
|
|
| return insights[:3]
|
|
|
|
|
| def _build_insights_for_distribution(df: pd.DataFrame, category: str, value: str) -> list[str]:
|
| if df is None or df.empty or category not in df.columns or value not in df.columns:
|
| return []
|
|
|
| s = pd.to_numeric(df[value], errors="coerce")
|
| if s.dropna().empty:
|
| return []
|
|
|
| insights: list[str] = []
|
|
|
| try:
|
| total = float(s.sum(skipna=True))
|
| if total and total > 0:
|
| idx = s.astype(float).idxmax()
|
| top_cat = str(df.loc[idx, category]) if idx in df.index else None
|
| top_val = _safe_float(df.loc[idx, value]) if idx in df.index else None
|
| if top_cat is not None and top_val is not None:
|
| share = (top_val / total) * 100.0
|
| insights.append(f"Top category: **{top_cat}** ({top_val:,.0f}, {share:,.1f}% of total)")
|
| insights.append(f"Total (**{value}**): {total:,.0f}")
|
| except Exception:
|
| pass
|
|
|
| try:
|
|
|
| insights.append(f"Categories: **{df[category].nunique()}**")
|
| except Exception:
|
| pass
|
|
|
| return insights[:3]
|
|
|
|
|
| def _is_time_like(series: pd.Series) -> bool:
|
| try:
|
| if pd.api.types.is_datetime64_any_dtype(series):
|
| return True
|
| parsed = pd.to_datetime(series, errors="coerce")
|
| return parsed.notna().mean() >= 0.7
|
| except Exception:
|
| return False
|
|
|
|
|
| def _build_insights_for_series(df: pd.DataFrame, x: str, y: str) -> list[str]:
|
| if df is None or df.empty or y not in df.columns:
|
| return []
|
|
|
| s = pd.to_numeric(df[y], errors="coerce").dropna()
|
| if s.empty:
|
| return []
|
|
|
| insights: list[str] = []
|
|
|
|
|
| try:
|
| min_v = float(s.min())
|
| max_v = float(s.max())
|
| insights.append(f"Range (**{y}**): {min_v:,.2f} to {max_v:,.2f}")
|
| except Exception:
|
| pass
|
|
|
|
|
| if x in df.columns and _is_time_like(df[x]):
|
| try:
|
| first_v = float(s.iloc[0])
|
| last_v = float(s.iloc[-1])
|
| delta = last_v - first_v
|
| direction = "increased" if delta > 0 else "decreased" if delta < 0 else "remained flat"
|
| insights.append(f"Overall {direction} by {abs(delta):,.2f} ({first_v:,.2f} → {last_v:,.2f})")
|
| except Exception:
|
| pass
|
|
|
| try:
|
| last_x = df[x].iloc[len(df) - 1]
|
| last_v = float(s.iloc[-1])
|
| insights.append(f"Latest at **{x}={last_x}**: **{last_v:,.2f}**")
|
| except Exception:
|
| pass
|
|
|
| return insights[:3]
|
|
|
|
|
| def _guess_columns(df: pd.DataFrame, max_cols: int = 6) -> list[ColumnItem]:
|
| cols = list(df.columns)[:max_cols]
|
| return [ColumnItem(key=str(c), label=str(c)) for c in cols]
|
|
|
|
|
| def _guess_x_key(df: pd.DataFrame) -> str | None:
|
| if df.empty:
|
| return None
|
| keys = list(df.columns)
|
| preferred = ["date", "time", "day", "month", "year", "name", "region", "district", "label"]
|
| lower = {str(k).lower(): str(k) for k in keys}
|
| for p in preferred:
|
| if p in lower:
|
| return lower[p]
|
| return str(keys[0]) if keys else None
|
|
|
|
|
| def _guess_numeric_key(df: pd.DataFrame, exclude: str | None = None) -> str | None:
|
| if df.empty:
|
| return None
|
|
|
| for c in df.columns:
|
| if exclude and str(c) == exclude:
|
| continue
|
| if pd.api.types.is_numeric_dtype(df[c]):
|
| return str(c)
|
|
|
| for c in df.columns:
|
| if exclude and str(c) == exclude:
|
| continue
|
| try:
|
| pd.to_numeric(df[c])
|
| return str(c)
|
| except Exception:
|
| continue
|
| return None
|
|
|
|
|
| def build_visualization_from_intent(
|
| intent: str,
|
| reply: str,
|
| df: pd.DataFrame | None,
|
| sql: str | None,
|
| chart_config: dict | None,
|
| ) -> tuple[str, Visualization | None]:
|
|
|
| answer = reply or ""
|
|
|
| if intent == "chat":
|
| return answer, None
|
|
|
| if df is None or df.empty:
|
|
|
| if intent in ["sql", "analytics"]:
|
| vis = Visualization(type="table", title="Query Result", data=[], columns=[])
|
| return answer, vis
|
| return answer, None
|
|
|
| if intent == "sql":
|
| summary = _summarize_small_df(df)
|
| if summary:
|
| if answer.strip():
|
| answer = f"{answer.strip()}\n\n{summary}"
|
| else:
|
| answer = summary
|
| vis = Visualization(
|
| type="table",
|
| title="Query Result",
|
| data=df_to_records(df),
|
| columns=_guess_columns(df),
|
| insights=_build_insights_for_table(df),
|
| )
|
| return answer, vis
|
|
|
| if intent == "analytics":
|
| cfg = chart_config or {}
|
| ctype = cfg.get("type") or "line"
|
| title = cfg.get("title") or "Analytics Result"
|
| x = cfg.get("x") or _guess_x_key(df)
|
| y = cfg.get("y") or _guess_numeric_key(df, exclude=x)
|
|
|
| if ctype not in ["bar", "line", "area", "pie"]:
|
|
|
| vis = Visualization(
|
| type="table",
|
| title=title,
|
| data=df_to_records(df),
|
| columns=_guess_columns(df),
|
| )
|
| return answer, vis
|
|
|
| if not x or not y:
|
| vis = Visualization(
|
| type="table",
|
| title=title,
|
| data=df_to_records(df),
|
| columns=_guess_columns(df),
|
| )
|
| return answer, vis
|
|
|
| series = [SeriesItem(dataKey=str(y), name=str(y), color=DEFAULT_COLORS[0])]
|
|
|
| insights = _build_insights_for_series(df, str(x), str(y))
|
| if ctype == "pie":
|
| insights = _build_insights_for_distribution(df, str(x), str(y))
|
|
|
| summary = _summarize_small_df(df)
|
| if summary and answer.strip():
|
| answer = f"{answer.strip()}\n\n{summary}"
|
| elif summary and not answer.strip():
|
| answer = summary
|
| vis = Visualization(
|
| type=ctype,
|
| title=title,
|
| data=df_to_records(df),
|
| xAxisKey=str(x),
|
| series=series,
|
| insights=insights,
|
| )
|
| return answer, vis
|
|
|
| return answer, None
|
|
|