AutoDataLab2.0 / ceo_brief_env /experts /data_analyst.py
uchihamadara1816's picture
Upload 172 files
d02bacd verified
from __future__ import annotations
import pandas as pd
from subenvs.autodatalab.analytics import (
clean_orders,
compute_kpis,
compute_revenue_share,
data_quality_score,
derive_revenue,
validate_schema,
)
from ..models import ExpertReport
_ANALYST_QUERY = (
"data quality cleaning duplicates imputation category median "
"KPIs total revenue top category data_quality_score"
)
class DataAnalystExpert:
expert_id = "analyst"
def run(
self,
task_name: str,
question: str,
raw_df: pd.DataFrame,
focused: bool = False,
use_rag: bool = False,
) -> ExpertReport:
cleaned = clean_orders(raw_df)
enriched = derive_revenue(cleaned.df)
kpis = compute_kpis(cleaned.df)
share = compute_revenue_share(cleaned.df)
schema = validate_schema(enriched)
top_row = share.iloc[0]
summary = (
f"Cleaned {len(raw_df)} raw rows into {len(cleaned.df)} trusted rows. "
f"Top category is {top_row['Category']} and total revenue is {kpis['total_revenue']:.2f}."
)
bullets = [
f"Removed {cleaned.duplicates_removed} duplicate rows and imputed {cleaned.imputed_prices} missing prices.",
f"Data quality score is {data_quality_score(cleaned.df):.2f} with {schema['invalid_date_rows']} invalid date rows.",
f"{top_row['Category']} leads revenue at {float(top_row['Revenue']):.2f}.",
]
metrics = {
"duplicates_removed": cleaned.duplicates_removed,
"imputed_prices": cleaned.imputed_prices,
"data_quality_score": data_quality_score(cleaned.df),
"total_revenue": kpis["total_revenue"],
"avg_order_value": kpis["avg_order_value"],
"top_category": str(top_row["Category"]),
"top_category_revenue": round(float(top_row["Revenue"]), 2),
}
issues = [f"risk:{name}" for name in schema["risk_flags"]]
citations = share["Category"].astype(str).head(3).tolist()
memory_citations: list[str] = []
memory_snippets: list[str] = []
if use_rag:
from memory import get_retriever
hits = get_retriever().query(_ANALYST_QUERY, k=2)
memory_citations = [h.as_citation() for h in hits]
memory_snippets = [h.snippet for h in hits]
if hits:
summary = summary + f" Grounded in SOP {hits[0].source.split('#')[0]}."
bullets.append(
f"Grounded against SOP: {hits[0].source.split('#')[0]} (score {hits[0].score:.2f})."
)
return ExpertReport(
expert_id="analyst",
title="Data Analyst Report",
summary=summary,
metrics=metrics,
bullet_points=bullets,
issues=issues,
citations=citations,
memory_citations=memory_citations,
memory_snippets=memory_snippets,
)