File size: 3,023 Bytes
d02bacd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
from __future__ import annotations

import pandas as pd

from subenvs.autodatalab.analytics import (
    clean_orders,
    compute_kpis,
    compute_revenue_share,
    data_quality_score,
    derive_revenue,
    validate_schema,
)

from ..models import ExpertReport

_ANALYST_QUERY = (
    "data quality cleaning duplicates imputation category median "
    "KPIs total revenue top category data_quality_score"
)


class DataAnalystExpert:
    expert_id = "analyst"

    def run(
        self,
        task_name: str,
        question: str,
        raw_df: pd.DataFrame,
        focused: bool = False,
        use_rag: bool = False,
    ) -> ExpertReport:
        cleaned = clean_orders(raw_df)
        enriched = derive_revenue(cleaned.df)
        kpis = compute_kpis(cleaned.df)
        share = compute_revenue_share(cleaned.df)
        schema = validate_schema(enriched)
        top_row = share.iloc[0]
        summary = (
            f"Cleaned {len(raw_df)} raw rows into {len(cleaned.df)} trusted rows. "
            f"Top category is {top_row['Category']} and total revenue is {kpis['total_revenue']:.2f}."
        )
        bullets = [
            f"Removed {cleaned.duplicates_removed} duplicate rows and imputed {cleaned.imputed_prices} missing prices.",
            f"Data quality score is {data_quality_score(cleaned.df):.2f} with {schema['invalid_date_rows']} invalid date rows.",
            f"{top_row['Category']} leads revenue at {float(top_row['Revenue']):.2f}.",
        ]
        metrics = {
            "duplicates_removed": cleaned.duplicates_removed,
            "imputed_prices": cleaned.imputed_prices,
            "data_quality_score": data_quality_score(cleaned.df),
            "total_revenue": kpis["total_revenue"],
            "avg_order_value": kpis["avg_order_value"],
            "top_category": str(top_row["Category"]),
            "top_category_revenue": round(float(top_row["Revenue"]), 2),
        }
        issues = [f"risk:{name}" for name in schema["risk_flags"]]
        citations = share["Category"].astype(str).head(3).tolist()

        memory_citations: list[str] = []
        memory_snippets: list[str] = []
        if use_rag:
            from memory import get_retriever

            hits = get_retriever().query(_ANALYST_QUERY, k=2)
            memory_citations = [h.as_citation() for h in hits]
            memory_snippets = [h.snippet for h in hits]
            if hits:
                summary = summary + f" Grounded in SOP {hits[0].source.split('#')[0]}."
                bullets.append(
                    f"Grounded against SOP: {hits[0].source.split('#')[0]} (score {hits[0].score:.2f})."
                )

        return ExpertReport(
            expert_id="analyst",
            title="Data Analyst Report",
            summary=summary,
            metrics=metrics,
            bullet_points=bullets,
            issues=issues,
            citations=citations,
            memory_citations=memory_citations,
            memory_snippets=memory_snippets,
        )