Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import pandas as pd | |
| from subenvs.autodatalab.analytics import ( | |
| clean_orders, | |
| compute_kpis, | |
| compute_revenue_share, | |
| data_quality_score, | |
| derive_revenue, | |
| validate_schema, | |
| ) | |
| from ..models import ExpertReport | |
| _ANALYST_QUERY = ( | |
| "data quality cleaning duplicates imputation category median " | |
| "KPIs total revenue top category data_quality_score" | |
| ) | |
| class DataAnalystExpert: | |
| expert_id = "analyst" | |
| def run( | |
| self, | |
| task_name: str, | |
| question: str, | |
| raw_df: pd.DataFrame, | |
| focused: bool = False, | |
| use_rag: bool = False, | |
| ) -> ExpertReport: | |
| cleaned = clean_orders(raw_df) | |
| enriched = derive_revenue(cleaned.df) | |
| kpis = compute_kpis(cleaned.df) | |
| share = compute_revenue_share(cleaned.df) | |
| schema = validate_schema(enriched) | |
| top_row = share.iloc[0] | |
| summary = ( | |
| f"Cleaned {len(raw_df)} raw rows into {len(cleaned.df)} trusted rows. " | |
| f"Top category is {top_row['Category']} and total revenue is {kpis['total_revenue']:.2f}." | |
| ) | |
| bullets = [ | |
| f"Removed {cleaned.duplicates_removed} duplicate rows and imputed {cleaned.imputed_prices} missing prices.", | |
| f"Data quality score is {data_quality_score(cleaned.df):.2f} with {schema['invalid_date_rows']} invalid date rows.", | |
| f"{top_row['Category']} leads revenue at {float(top_row['Revenue']):.2f}.", | |
| ] | |
| metrics = { | |
| "duplicates_removed": cleaned.duplicates_removed, | |
| "imputed_prices": cleaned.imputed_prices, | |
| "data_quality_score": data_quality_score(cleaned.df), | |
| "total_revenue": kpis["total_revenue"], | |
| "avg_order_value": kpis["avg_order_value"], | |
| "top_category": str(top_row["Category"]), | |
| "top_category_revenue": round(float(top_row["Revenue"]), 2), | |
| } | |
| issues = [f"risk:{name}" for name in schema["risk_flags"]] | |
| citations = share["Category"].astype(str).head(3).tolist() | |
| memory_citations: list[str] = [] | |
| memory_snippets: list[str] = [] | |
| if use_rag: | |
| from memory import get_retriever | |
| hits = get_retriever().query(_ANALYST_QUERY, k=2) | |
| memory_citations = [h.as_citation() for h in hits] | |
| memory_snippets = [h.snippet for h in hits] | |
| if hits: | |
| summary = summary + f" Grounded in SOP {hits[0].source.split('#')[0]}." | |
| bullets.append( | |
| f"Grounded against SOP: {hits[0].source.split('#')[0]} (score {hits[0].score:.2f})." | |
| ) | |
| return ExpertReport( | |
| expert_id="analyst", | |
| title="Data Analyst Report", | |
| summary=summary, | |
| metrics=metrics, | |
| bullet_points=bullets, | |
| issues=issues, | |
| citations=citations, | |
| memory_citations=memory_citations, | |
| memory_snippets=memory_snippets, | |
| ) | |