Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import Dict, Iterable, List | |
| import pandas as pd | |
| from .models import Brief, ExpertReport | |
| _SCORE_MIN = 0.001 | |
| _SCORE_MAX = 0.999 | |
| # RAG+ mode: Stooq watchlist citations (see ``ceo_brief_env.stooq_scrape``). | |
| STOOQ_CITATION_PREFIX = "stooq:" | |
| STOOQ_CITATION_SUFFIXES = frozenset({"nvda.us", "aapl.us", "jpm.us"}) | |
| def _citation_grounds(retriever_sources: set[str], c: str) -> bool: | |
| if not isinstance(c, str) or not c: | |
| return False | |
| if c.startswith("memory:") and c[len("memory:") :] in retriever_sources: | |
| return True | |
| if c.startswith(STOOQ_CITATION_PREFIX): | |
| rest = c[len(STOOQ_CITATION_PREFIX) :] | |
| return rest in STOOQ_CITATION_SUFFIXES | |
| return False | |
| def grounding_score(reports: Dict[str, ExpertReport]) -> float: | |
| """Fraction of specialists with a resolvable ``memory:`` or ``stooq:`` citation (RAG on only).""" | |
| if not reports: | |
| return 0.0 | |
| try: | |
| from memory import get_retriever | |
| except Exception: | |
| known_sources: set[str] = set() | |
| else: | |
| known_sources = set(get_retriever().sources()) | |
| grounded = 0 | |
| for report in reports.values(): | |
| cites = getattr(report, "memory_citations", []) or [] | |
| if any(_citation_grounds(known_sources, c) for c in cites): | |
| grounded += 1 | |
| return grounded / max(len(reports), 1) | |
| def _clamp(score: float) -> float: | |
| return max(_SCORE_MIN, min(_SCORE_MAX, round(float(score), 4))) | |
| def load_metric_ground_truth(path: str) -> Dict[str, str]: | |
| df = pd.read_csv(path) | |
| return {str(row["metric"]): str(row["value"]) for _, row in df.iterrows()} | |
| def _try_float(value: object) -> float | None: | |
| try: | |
| return float(value) | |
| except (TypeError, ValueError): | |
| return None | |
| def metric_match_score(expected: Dict[str, str], actual: Dict[str, object]) -> float: | |
| if not expected: | |
| return 1.0 | |
| matched = 0.0 | |
| for key, expected_value in expected.items(): | |
| actual_value = actual.get(key) | |
| ev = _try_float(expected_value) | |
| av = _try_float(actual_value) | |
| if ev is not None and av is not None: | |
| tolerance = max(0.5, abs(ev) * 0.02) | |
| if abs(ev - av) <= tolerance: | |
| matched += 1.0 | |
| elif str(actual_value) == str(expected_value): | |
| matched += 1.0 | |
| return matched / max(len(expected), 1) | |
| def strategy_rubric_score(recommendations: List[str], evidence_numbers: Iterable[str], categories: Iterable[str]) -> float: | |
| if not recommendations: | |
| return 0.001 | |
| score = 0.0 | |
| if len(recommendations) == 3: | |
| score += 0.25 | |
| lowered = [r.lower() for r in recommendations] | |
| nums = [str(n) for n in evidence_numbers] | |
| if nums and all(any(n in rec for n in nums) for rec in lowered[: min(3, len(lowered))]): | |
| score += 0.25 | |
| cat_hits = 0 | |
| for cat in set(str(c).lower() for c in categories if c): | |
| if any(cat in rec for rec in lowered): | |
| cat_hits += 1 | |
| if cat_hits >= 2: | |
| score += 0.25 | |
| if any("projection" in rec or "variance" in rec or "break-even" in rec for rec in lowered): | |
| score += 0.25 | |
| return _clamp(score) | |
| def grade_episode( | |
| expected_metrics: Dict[str, str], | |
| task_meta: Dict[str, object], | |
| brief: Brief, | |
| reports: Dict[str, ExpertReport], | |
| use_rag: bool = False, | |
| ) -> float: | |
| required_experts = list(task_meta.get("required_experts", [])) | |
| coverage = 0.0 | |
| if required_experts: | |
| covered = sum(1 for expert in required_experts if expert in brief.consulted_experts) | |
| coverage = covered / len(required_experts) | |
| metric_score = metric_match_score(expected_metrics, brief.metrics) | |
| hr_score = reports.get("hr").score if reports.get("hr") and reports.get("hr").score is not None else 0.001 | |
| analyst_cats = [] | |
| if reports.get("analyst"): | |
| analyst_cats = [c for c in reports["analyst"].citations if c] | |
| evidence_numbers = [str(v) for v in brief.metrics.values()] | |
| strategy_needed = "strategy" in required_experts | |
| strategy_score = 1.0 if not strategy_needed else strategy_rubric_score(brief.recommendations, evidence_numbers, analyst_cats) | |
| if use_rag: | |
| ground_score = grounding_score(reports) | |
| weights = { | |
| "coverage": 0.2, | |
| "metrics": 0.40, | |
| "hr": 0.15, | |
| "strategy": 0.15, | |
| "grounding": 0.10, | |
| } | |
| if not strategy_needed: | |
| weights["metrics"] += weights["strategy"] | |
| weights["strategy"] = 0.0 | |
| total = ( | |
| weights["coverage"] * coverage | |
| + weights["metrics"] * metric_score | |
| + weights["hr"] * hr_score | |
| + weights["strategy"] * strategy_score | |
| + weights["grounding"] * ground_score | |
| ) | |
| else: | |
| weights = { | |
| "coverage": 0.2, | |
| "metrics": 0.5, | |
| "hr": 0.15, | |
| "strategy": 0.15, | |
| } | |
| if not strategy_needed: | |
| weights["metrics"] += weights["strategy"] | |
| weights["strategy"] = 0.0 | |
| total = ( | |
| weights["coverage"] * coverage | |
| + weights["metrics"] * metric_score | |
| + weights["hr"] * hr_score | |
| + weights["strategy"] * strategy_score | |
| ) | |
| return _clamp(total) | |