Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| PGC RAGAS Evaluation Framework (v2026.1 — Cerebras Edition) | |
| Evaluates the production Hybrid Retrieval Pipeline (BGE-M3 + FTS + RRF k=60) | |
| with Student gpt-oss-120b (Cerebras) and Teacher gpt-4o-mini (OpenAI). | |
| Metric Suites: | |
| - Component-Level: Context Precision, Context Recall, Context Relevance, MRR | |
| - End-to-End: Faithfulness, Answer Correctness (0.7/0.3), Answer Relevance | |
| - PGC Logic: Temporal Adherence, Numerical Rigor, Constraint Satisfaction | |
| - Indonesian Terminology Nuance (5-case sub-suite) | |
| - Operational: Latency, TPS | |
| - Youden's J calibration (golden_retrieval_cases.json) | |
| Thesis train/test split: | |
| - Calibration set: golden_retrieval_cases.json → thresholds, MRR | |
| - Test set (100+ cases from synthetic + human-adversarial) → all RAGAS metrics | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import csv | |
| import os | |
| import re | |
| import sys | |
| import time | |
| import warnings | |
| from collections import defaultdict | |
| from datetime import datetime | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Set, Tuple | |
| import numpy as np | |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) | |
| # Ensure UTF-8 stdout for Windows console compatibility (emojis, arrows in print) | |
| if hasattr(sys.stdout, 'reconfigure'): | |
| sys.stdout.reconfigure(encoding='utf-8') | |
| sys.stderr.reconfigure(encoding='utf-8') | |
| from dotenv import load_dotenv | |
| load_dotenv(Path(__file__).resolve().parent.parent / ".env") | |
| # ============================================================================= | |
| # CONFIGURATION | |
| # ============================================================================= | |
| RESULTS_DIR = Path(__file__).resolve().parent.parent / "results" | |
| FIXTURES_DIR = Path(__file__).resolve().parent.parent / "tests" / "fixtures" | |
| DATA_DIR = Path(__file__).resolve().parent.parent / "data" | |
| OPENAI_MODEL = "gpt-4o-mini" | |
| # ragas 0.4.3 InstructorLLM uses max_tokens which GPT-5 series rejects; | |
| # gpt-4o-mini supports max_tokens and is the correct ragas critic model | |
| RAGAS_CRITIC_MODEL = "gpt-4o-mini" | |
| OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "") | |
| CEREBRAS_API_KEY = os.getenv("CEREBRAS_API_KEY", "") | |
| # RAGAS metrics batch size per critic call | |
| CRITIC_BATCH_SIZE = 3 | |
| # Semaphores for async API throttling | |
| CEREBRAS_SEMAPHORE = asyncio.Semaphore(3) | |
| OPENAI_SEMAPHORE = asyncio.Semaphore(5) | |
| # Retry parameters for RAGAS API calls | |
| MAX_RETRIES = 3 | |
| RETRY_BASE_DELAY = 2.0 | |
| CRITIC_TIMEOUT = 60.0 | |
| # Numerical Rigor tolerance | |
| NUMERICAL_TOLERANCE = 0.5 | |
| NUMERICAL_PARAM_CUES = { | |
| "temperature": ["suhu", "temperature", "temp", "°c", "celsius", "fahrenheit", "kelvin"], | |
| "humidity": ["kelembaban", "humidity", "rh", "relative humidity"], | |
| "light": ["cahaya", "light", "lux", "lumen", "ppfd"], | |
| } | |
| # Youden's J calibration parameters | |
| CALIB_THRESHOLD = 0.20 | |
| CALIB_COUNT = 20 | |
| HISTOGRAM_BINS = 15 | |
| HISTOGRAM_LOW = 0.40 | |
| HISTOGRAM_HIGH = 0.82 | |
| CANDIDATE_RANGE = range(45, 81) | |
| # ============================================================================= | |
| # RETRY HELPER FOR RAGAS METRICS | |
| # ============================================================================= | |
| async def _retry_ragas_call(metric_call, metric_name, timeout=None, max_retries=None): | |
| """ | |
| Retry a RAGAS metric async call with exponential backoff and timeout. | |
| Args: | |
| metric_call: Async callable (e.g., lambda: f_metric.ascore(...)) | |
| metric_name: String for logging | |
| timeout: Seconds per attempt (default: CRITIC_TIMEOUT) | |
| max_retries: Max retry attempts (default: MAX_RETRIES) | |
| Returns: | |
| float score | |
| Raises: | |
| Last exception if all retries fail. | |
| """ | |
| timeout = timeout if timeout is not None else CRITIC_TIMEOUT | |
| max_retries = max_retries if max_retries is not None else MAX_RETRIES | |
| last_exception = None | |
| for attempt in range(1, max_retries + 1): | |
| try: | |
| score = await asyncio.wait_for(metric_call(), timeout=timeout) | |
| return float(score) | |
| except asyncio.TimeoutError: | |
| print(f"[RAGAS] {metric_name} attempt {attempt}/{max_retries} timed out ({timeout}s)") | |
| last_exception = asyncio.TimeoutError("timeout") | |
| if attempt < max_retries: | |
| delay = RETRY_BASE_DELAY * (2 ** (attempt - 1)) | |
| print(f"[RAGAS] Retrying {metric_name} in {delay:.1f}s...") | |
| await asyncio.sleep(delay) | |
| except Exception as e: | |
| estr = str(e) | |
| print(f"[RAGAS] {metric_name} attempt {attempt}/{max_retries} failed: {estr[:120]}") | |
| last_exception = e | |
| if any(kw in estr.lower() for kw in ("api_key", "authorization", "invalid_api")): | |
| raise | |
| if attempt < max_retries: | |
| delay = RETRY_BASE_DELAY * (2 ** (attempt - 1)) | |
| print(f"[RAGAS] Retrying {metric_name} in {delay:.1f}s...") | |
| await asyncio.sleep(delay) | |
| raise last_exception # type: ignore[misc] | |
| def _categorize_ragas_error(e: Exception) -> str: | |
| """Categorize a RAGAS exception into a short error string.""" | |
| estr = str(e).lower() | |
| if "max_tokens" in estr or "length" in estr or "incomplete" in estr: | |
| return "error: max_tokens" | |
| if "timeout" in estr: | |
| return "error: timeout" | |
| if "rate" in estr and "limit" in estr: | |
| return "error: rate_limit" | |
| return f"error: {e}" | |
| # ============================================================================= | |
| # PLANT FAMILY MAPPING (for Graded Relevance Scoring) | |
| # ============================================================================= | |
| PLANT_ALIAS_MAP = { | |
| "bok_choy": "pak_choy", | |
| "amaranth": "spinach_amaranth", | |
| "spinach": "spinach_amaranth", | |
| } | |
| PLANT_FAMILY_MAP = { | |
| "lettuce": "Asteraceae", | |
| "pak_choy": "Brassicaceae", | |
| "spinach_amaranth": "Amaranthaceae", | |
| "mustard_greens": "Brassicaceae", | |
| "kailan": "Brassicaceae", | |
| "chinese_cabbage": "Brassicaceae", | |
| "water_spinach": "Convolvulaceae", | |
| "celery": "Apiaceae", | |
| "green_onion": "Amaryllidaceae", | |
| "chili_pepper": "Solanaceae", | |
| "tomato": "Solanaceae", | |
| "melon": "Cucurbitaceae", | |
| "watermelon": "Cucurbitaceae", | |
| "cucumber": "Cucurbitaceae", | |
| "eggplant": "Solanaceae", | |
| "pumpkin": "Cucurbitaceae", | |
| "cauliflower": "Brassicaceae", | |
| "shallot": "Amaryllidaceae", | |
| "papaya": "Caricaceae", | |
| "marigold": "Asteraceae", | |
| "cabbage": "Brassicaceae", | |
| } | |
| def _resolve_plant_id(plant_id: str) -> str: | |
| return PLANT_ALIAS_MAP.get(plant_id, plant_id) | |
| def _get_plant_family(plant_id: str) -> str: | |
| resolved = _resolve_plant_id(plant_id) | |
| return PLANT_FAMILY_MAP.get(resolved, "") | |
| def _get_source_category(source: str) -> str: | |
| src_lower = source.lower() | |
| if "sop" in src_lower: | |
| return "sop" | |
| if "handbook" in src_lower: | |
| return "handbook" | |
| if "juknis" in src_lower: | |
| return "juknis" | |
| if "buku" in src_lower or "book" in src_lower: | |
| return "buku" | |
| return "general" | |
| def _build_source_plant_map(cases: list) -> dict: | |
| mapping = {} | |
| for case in cases: | |
| source = case.get("expected_source", "").strip() | |
| plant = case.get("expected_plant") | |
| if source and plant: | |
| mapping[source] = plant | |
| return mapping | |
| # ============================================================================= | |
| # IMPORTS (with graceful fallbacks) | |
| # ============================================================================= | |
| HAS_RAGAS = False | |
| HAS_OPENAI = False | |
| try: | |
| from ragas.llms import llm_factory | |
| from ragas.metrics.collections import ( | |
| Faithfulness, | |
| ContextPrecision, | |
| ContextRecall, | |
| AnswerCorrectness, | |
| ) | |
| try: | |
| from ragas.metrics.collections import ResponseRelevancy as _AnswerRelevanceMetric | |
| ResponseRelevancy = _AnswerRelevanceMetric # expose for monkeypatching / tests | |
| except ImportError: | |
| from ragas.metrics.collections import AnswerRelevancy as _AnswerRelevanceMetric | |
| AnswerRelevancy = _AnswerRelevanceMetric # expose for monkeypatching / tests | |
| _RAGAS_METRIC_CLASSES = [ | |
| Faithfulness, | |
| ContextPrecision, | |
| ContextRecall, | |
| AnswerCorrectness, | |
| _AnswerRelevanceMetric, | |
| ] | |
| HAS_RAGAS_METRICS = True | |
| HAS_RAGAS = True | |
| except ImportError: | |
| HAS_RAGAS = False | |
| warnings.warn("RAGAS or langchain-openai not installed. Install with: pip install ragas langchain-openai") | |
| try: | |
| from openai import OpenAI as OpenAIClient | |
| HAS_OPENAI = True | |
| except ImportError: | |
| warnings.warn("openai not installed. Install with: pip install openai") | |
| # ============================================================================= | |
| # PROJECT IMPORTS | |
| # ============================================================================= | |
| from app.ai_engine import generate_context_aware_response, call_llm_with_history | |
| from app.retrieval_eval import load_golden_retrieval_cases | |
| # ============================================================================= | |
| # LOGGING OPENAI WRAPPER (Critic Reasoning Capture) | |
| # ============================================================================= | |
| class CriticReasoningLogger: | |
| """Logs every gpt-4o-mini critic call to a JSONL file for auditing. | |
| If a score is low, the user can inspect the reasoning to determine whether | |
| the Teacher (mini) misunderstood the agronomic context. | |
| """ | |
| def __init__(self, log_path: Path): | |
| self.log_path = log_path | |
| self.log_path.parent.mkdir(parents=True, exist_ok=True) | |
| self._entries: List[dict] = [] | |
| def log(self, entry: dict): | |
| entry["timestamp"] = datetime.utcnow().isoformat() + "Z" | |
| self._entries.append(entry) | |
| with open(self.log_path, "a", encoding="utf-8") as f: | |
| f.write(json.dumps(entry) + "\n") | |
| def flush(self): | |
| pass | |
| class LoggingOpenAIClient(OpenAIClient): | |
| """OpenAI client subclass that logs every chat completion for auditing. | |
| Inherits from openai.OpenAI directly so llm_factory recognizes the type. | |
| Overrides chat.completions.create to log requests/responses. | |
| """ | |
| def __init__(self, logger: CriticReasoningLogger, **kwargs): | |
| super().__init__(**kwargs) | |
| self._critic_logger = logger | |
| def chat(self): | |
| return _LoggingChatCompletions(super().chat.completions, self._critic_logger) | |
| class _LoggingChatCompletions: | |
| """Wraps chat.completions to log requests/responses. | |
| Preserves the client.chat.completions.create() chain that Instructor expects. | |
| """ | |
| def __init__(self, inner, logger: CriticReasoningLogger): | |
| self._inner = inner | |
| self._critic_logger = logger | |
| self.completions = self # client.chat.completions.create() chain | |
| def create(self, *args, **kwargs): | |
| response = self._inner.create(*args, **kwargs) | |
| self._critic_logger.log({ | |
| "event": "critic_call", | |
| "model": kwargs.get("model", ""), | |
| "messages_preview": [str(m)[:200] for m in kwargs.get("messages", [])], | |
| "response_preview": str(response.choices[0].message.content)[:500] if response.choices else "", | |
| "usage": response.usage.__dict__ if response.usage else {}, | |
| }) | |
| return response | |
| # ============================================================================= | |
| # RANGE-AWARE NUMERICAL RIGOR CHECKER | |
| # ============================================================================= | |
| def _requested_numeric_params(query: str) -> Set[str]: | |
| query_lower = query.lower() | |
| requested = { | |
| param | |
| for param, cues in NUMERICAL_PARAM_CUES.items() | |
| if any(cue in query_lower for cue in cues) | |
| } | |
| return requested or {"temperature", "humidity", "light"} | |
| class NumericalRigorChecker: | |
| """Strict Data Rule: checks if numerical values in answer match ground truth. | |
| Pass condition (Agronomic Envelope Adherence): | |
| - Answer value is within ±0.5 of ground truth optimal value, OR | |
| - Answer value is within the ground truth safety range [min, max] | |
| Supports both range answers ("22 to 24 degrees") and single values ("23.5°C"). | |
| """ | |
| _TEMP_RANGE = re.compile( | |
| r'(\d+(?:\.\d+)?)\s*(?:[-\u2013tohingga]+)\s*(\d+(?:\.\d+)?)\s*(?:°C|derajat\s+celsius|degrees?\s*celsius)', | |
| re.IGNORECASE, | |
| ) | |
| _TEMP_SINGLE = re.compile( | |
| r'(\d+(?:\.\d+)?)\s*(?:°C|derajat\s+celsius|degrees?\s*celsius)', | |
| re.IGNORECASE, | |
| ) | |
| _RH_RANGE = re.compile( | |
| r'(\d+(?:\.\d+)?)\s*(?:[-\u2013tohingga]+)\s*(\d+(?:\.\d+)?)\s*(?:%|persen|percent)', | |
| re.IGNORECASE, | |
| ) | |
| _RH_SINGLE = re.compile( | |
| r'(\d+(?:\.\d+)?)\s*(?:%|persen|percent)', | |
| re.IGNORECASE, | |
| ) | |
| _LUX_RANGE = re.compile( | |
| r'([\d,\s\u00a0\u202f\u2009]+)\s*(?:[-\u2013tohingga]+)\s*([\d,\s\u00a0\u202f\u2009]+)\s*(?:lux|lux|lumen)', | |
| re.IGNORECASE, | |
| ) | |
| _LUX_SINGLE = re.compile( | |
| r'([\d,\s\u00a0\u202f\u2009]+)\s*(?:lux|lux|lumen)', | |
| re.IGNORECASE, | |
| ) | |
| def _parse_number(text: str) -> Optional[float]: | |
| normalized = ( | |
| text.replace(",", "") | |
| .replace(" ", "") | |
| .replace("\u00a0", "") | |
| .replace("\u202f", "") | |
| .replace("\u2009", "") | |
| ) | |
| if not normalized: | |
| return None | |
| try: | |
| return float(normalized) | |
| except ValueError: | |
| return None | |
| def _extract_all_params( | |
| answer: str, range_re: re.Pattern, single_re: re.Pattern, | |
| ) -> List[Tuple[str, List[float]]]: | |
| """Extract ALL param values found. Returns list of (type, values) tuples.""" | |
| results = [] | |
| for match in range_re.finditer(answer): | |
| lo, hi = match.group(1), match.group(2) | |
| lo_val = NumericalRigorChecker._parse_number(lo) | |
| hi_val = NumericalRigorChecker._parse_number(hi) | |
| if lo_val is not None and hi_val is not None: | |
| results.append(("range", [lo_val, hi_val])) | |
| for match in single_re.finditer(answer): | |
| val = match.group(1) | |
| parsed = NumericalRigorChecker._parse_number(val) | |
| if parsed is not None: | |
| results.append(("single", [parsed])) | |
| return results | |
| def _check_value(cls, ans_val: float, label: str, gt_value: Optional[float], gt_min: Optional[float], gt_max: Optional[float]) -> Optional[Dict]: | |
| """Check one answer value against ground truth. Returns detail dict on PASS, None on fail.""" | |
| # Check ±0.5 of optimal value | |
| if gt_value is not None and abs(ans_val - gt_value) <= NUMERICAL_TOLERANCE: | |
| return {"label": label, "ans_val": ans_val, "gt_val": gt_value, "method": "optimal_tolerance", "pass": True} | |
| # Check if within safety range | |
| if gt_min is not None and gt_max is not None and gt_min <= ans_val <= gt_max: | |
| return {"label": label, "ans_val": ans_val, "range": f"[{gt_min}, {gt_max}]", "method": "safety_range", "pass": True} | |
| return None | |
| def evaluate_answer( | |
| cls, | |
| answer: str, | |
| ground_truth: Dict[str, Dict], | |
| requested_params: Optional[Set[str]] = None, | |
| ) -> Dict: | |
| """Evaluate all numerical parameters in the answer. | |
| Scans ALL temperature/humidity/light values in the answer. | |
| PASS if ANY value satisfies Agronomic Envelope Adherence: | |
| - Within ±0.5 of ground truth optimal, OR | |
| - Within ground truth safety range [min, max] | |
| ground_truth schema: { | |
| "temperature": {"value": 20.0, "min": 18.0, "max": 22.0}, | |
| "humidity": {"value": 85.0, "min": 75.0, "max": 90.0}, | |
| "light": {"value": 15000, "min": 12000, "max": 20000}, | |
| } | |
| """ | |
| results = {} | |
| requested = set(ground_truth.keys()) if requested_params is None else set(requested_params) | |
| all_pass = True | |
| param_configs = [ | |
| ("temperature", cls._TEMP_RANGE, cls._TEMP_SINGLE), | |
| ("humidity", cls._RH_RANGE, cls._RH_SINGLE), | |
| ("light", cls._LUX_RANGE, cls._LUX_SINGLE), | |
| ] | |
| for param, range_re, single_re in param_configs: | |
| if param not in requested: | |
| continue | |
| gt = ground_truth.get(param, {}) | |
| gt_value = gt.get("value") | |
| gt_min = gt.get("min") | |
| gt_max = gt.get("max") | |
| if gt_value is None and gt_min is None: | |
| continue # No ground truth for this param, skip | |
| extracted = cls._extract_all_params(answer, range_re, single_re) | |
| if not extracted: | |
| results[param] = {"status": "NOT_FOUND", "reason": f"No {param} value found in answer"} | |
| all_pass = False | |
| continue | |
| # Try each extracted value; PASS if ANY matches | |
| any_pass = False | |
| passed_details = [] | |
| for val_type, values in extracted: | |
| if val_type == "single": | |
| detail = cls._check_value(values[0], param, gt_value, gt_min, gt_max) | |
| if detail: | |
| any_pass = True | |
| passed_details.append(detail) | |
| elif val_type == "range": | |
| ans_min, ans_max = values | |
| # Check if ground truth optimal falls within answer range | |
| if gt_value is not None and ans_min <= gt_value <= ans_max: | |
| any_pass = True | |
| passed_details.append({"method": "optimal_in_range", "ans_range": f"[{ans_min}, {ans_max}]"}) | |
| # Check if midpoint falls within answer range | |
| elif gt_min is not None and gt_max is not None: | |
| gt_mid = (gt_min + gt_max) / 2.0 | |
| if ans_min <= gt_mid <= ans_max: | |
| any_pass = True | |
| passed_details.append({"method": "midpoint_in_range", "ans_range": f"[{ans_min}, {ans_max}]"}) | |
| if any_pass: | |
| results[param] = {"status": "PASS", "details": passed_details} | |
| else: | |
| results[param] = {"status": "FAIL", "extracted_values": extracted, "gt": gt} | |
| all_pass = False | |
| return { | |
| "applicable": bool(requested), | |
| "requested_params": sorted(requested), | |
| "param_results": results, | |
| "overall_pass": all_pass, | |
| "factual_score_override": 1.0 if all_pass else 0.0, | |
| } | |
| # ============================================================================= | |
| # TEMPORAL ADHERENCE CHECKER | |
| # ============================================================================= | |
| class TemporalAdherenceChecker: | |
| """Binary check: does the answer reference the correct day/night phase? | |
| Uses the resolved_phase from _evaluation_metadata: | |
| - "day": answer must reference "siklus siang" (ID) or "day" (EN) | |
| - "night": answer must reference "siklus malam" (ID) or "night" (EN) | |
| - None or "general": not applicable, always pass | |
| """ | |
| _DAY_PATTERNS = re.compile( | |
| r'\bsiklus\s+siang\b|\bfase\s+siang\b|\bday\s+schedule\b|\bdaytime\b|\bday\s+cycle\b|\bsiang\s*hari\b', | |
| re.IGNORECASE, | |
| ) | |
| _NIGHT_PATTERNS = re.compile( | |
| r'\bsiklus\s+malam\b|\bfase\s+malam\b|\bnight\s+schedule\b|\bnighttime\b|\bnight\s+cycle\b|\bmalam\s*hari\b', | |
| re.IGNORECASE, | |
| ) | |
| def check(cls, answer: str, resolved_phase: Optional[str]) -> Dict: | |
| if resolved_phase is None or resolved_phase == "general": | |
| return {"applicable": False, "status": "N/A", "pass": True} | |
| # If answer is a no-data disclaimer, skip temporal check | |
| no_data_indicators = ["not currently online", "no live chamber data", "tidak ada data sensor", | |
| "chamber is not currently", "tidak terhubung", "tidak online"] | |
| if any(indicator in answer.lower() for indicator in no_data_indicators): | |
| return {"applicable": False, "status": "NO_LIVE_DATA", "pass": True} | |
| has_day = bool(cls._DAY_PATTERNS.search(answer)) | |
| has_night = bool(cls._NIGHT_PATTERNS.search(answer)) | |
| if resolved_phase == "day": | |
| passed = has_day | |
| expected = "day/siklus siang" | |
| found = "day" if has_day else "none" | |
| elif resolved_phase == "night": | |
| passed = has_night | |
| expected = "night/siklus malam" | |
| found = "night" if has_night else "none" | |
| else: | |
| passed = True | |
| expected = resolved_phase | |
| found = "unknown" | |
| return { | |
| "applicable": True, | |
| "resolved_phase": resolved_phase, | |
| "expected": expected, | |
| "found": found, | |
| "pass": passed, | |
| } | |
| # ============================================================================= | |
| # CONSTRAINT SATISFACTION CHECKER | |
| # ============================================================================= | |
| class ConstraintSatisfactionChecker: | |
| """Three-state context-aware constraint checker. | |
| State A — Qualitative/SOP mode (use_structured_params=False): | |
| Forbidden terms are ALLOWED because the AI is quoting verified documents. | |
| Always passes. | |
| State B — Guarded mode (use_structured_params=True, no explicit request): | |
| Forbidden terms are HALLUCINATIONS unless they appear only within the | |
| system-approved breadcrumb text (see BREADCRUMB_PATTERNS). Fails if | |
| forbidden terms found outside breadcrumb. | |
| State C — Explicit request mode (use_structured_params=True, user asked): | |
| Forbidden terms are ALLOWED but the answer MUST contain the bifurcation | |
| warning (⚠️ + 'di luar kendali otomatis' / 'outside automatic control'). | |
| Passes if warning present, fails if missing. | |
| """ | |
| FORBIDDEN_TERMS = ["ph", "ec", "co2", "co\u2082", "o2", "o\u2082", | |
| "fertilizer", "fertiliser", "pupuk", | |
| "spacing", "jarak tanam", "ppm", "conductivity", "tds"] | |
| EXPLICIT_REQUEST_TERMS = ["ph", "ec", "nutrisi", "nutrient", | |
| "pupuk", "fertilizer", "conductivity", | |
| "ppm", "co2", "co\u2082", "o2", "o\u2082", | |
| "karbon dioksida", "oksigen", | |
| "kadar nutrisi", "ph air", "ec larutan", | |
| "soil", "tanah", "larutan"] | |
| # System-approved breadcrumb text — forbidden terms inside this text are | |
| # intentionally placed by Rule 9 and do NOT count as hallucinations. | |
| BREADCRUMB_EXCERPTS = [ | |
| # Indonesian breadcrumb — full text variations | |
| "seperti ph, ec, co\u2082, atau o\u2082", | |
| "seperti ph, ec, co2, atau o2", | |
| "seperti ph, ec, co\u2082, dan o\u2082", | |
| "(seperti ph, ec", | |
| "(seperti ph, ec, co", | |
| "panduan manual untuk nutrisi (seperti ph, ec", | |
| "panduan manual untuk nutrisi seperti ph, ec", | |
| "parameter terverifikasi pgc hanya mencakup suhu, kelembaban, dan cahaya", | |
| # English breadcrumb — full text variations | |
| "such as ph, ec, co\u2082, or o\u2082", | |
| "such as ph, ec, co2, or o2", | |
| "(such as ph, ec", | |
| "(such as ph, ec, co", | |
| "manual guidance for nutrition (such as ph, ec", | |
| "manual guidance for nutrition such as ph, ec", | |
| "pgc verified parameters cover temperature, humidity, and light only", | |
| # Extra catch-alls for leftover fragments | |
| "seperti ph, ec, co", "such as ph, ec, co", | |
| "ph, ec, atau o", "ph, ec, or o", | |
| "ph, ec, dan o", "ph, ec, dan", | |
| "ec, co\u2082, atau", "ec, co2, atau", | |
| "ec, co\u2082, dan", "ec, co2, dan", | |
| "ec, co\u2082, or", "ec, co2, or", | |
| ] | |
| # Patterns that indicate the AI is correctly stating a parameter's | |
| # unavailability rather than presenting it as a value. | |
| NOT_AVAILABLE_PATTERNS = [ | |
| "tidak tersedia", "not available", "not found", "tidak ditemukan", | |
| "tidak ada di dokumen", "not in my document", | |
| "hanya menyimpan data suhu", "only stores temperature", | |
| "hanya menyimpan suhu, kelembaban", | |
| "only temp", "only temperature, humidity", | |
| ] | |
| BIFURCATION_WARNING_EXCERPTS = [ | |
| "di luar kendali otomatis", | |
| "outside automatic control", | |
| "tidak dikendalikan oleh pgc", | |
| "not controlled by pgc", | |
| "panduan manual", | |
| "manual guidance", | |
| "bersifat panduan manual", | |
| "manual guidance only", | |
| ] | |
| # Forbidden terms that require word-boundary matching (avoid false positives | |
| # like "ph" inside "aphanadermatum" or "ec" inside "Perkecambahan"). | |
| _FORBIDDEN_WORD_RE = None | |
| def _compile_re(cls): | |
| if cls._FORBIDDEN_WORD_RE is not None: | |
| return cls._FORBIDDEN_WORD_RE | |
| # Build patterns — short terms (<4 chars) require word boundaries | |
| patterns = [] | |
| for term in cls.FORBIDDEN_TERMS: | |
| if len(term) <= 3: | |
| patterns.append(r'\b' + re.escape(term) + r'\b') | |
| else: | |
| patterns.append(re.escape(term)) | |
| cls._FORBIDDEN_WORD_RE = re.compile('|'.join(patterns), re.IGNORECASE) | |
| return cls._FORBIDDEN_WORD_RE | |
| def _find_forbidden_terms(cls, text: str) -> list: | |
| """Find forbidden terms using word-boundary-aware matching.""" | |
| regex = cls._compile_re() | |
| found = set() | |
| for m in regex.finditer(text): | |
| found.add(m.group().lower()) | |
| return sorted(found) | |
| def _strip_breadcrumb(cls, text: str) -> str: | |
| """Remove system-approved breadcrumb text so forbidden terms inside it | |
| are not counted as hallucinations.""" | |
| for excerpt in cls.BREADCRUMB_EXCERPTS: | |
| text = text.replace(excerpt, "") | |
| return text | |
| def _has_warning(cls, text: str) -> bool: | |
| lower = text.lower() | |
| return any(w in lower for w in cls.BIFURCATION_WARNING_EXCERPTS) | |
| def _also_has_doc_citation(cls, answer: str) -> bool: | |
| """Check if answer contains verified document citations (📖).""" | |
| return bool(re.search(r'📖', answer)) | |
| def check(cls, answer: str, query: str, use_structured_params: bool) -> Dict: | |
| answer_lower = answer.lower() | |
| # State A: Qualitative/SOP — quoting documents is allowed | |
| if not use_structured_params: | |
| return { | |
| "applicable": True, | |
| "pass": True, | |
| "mode": "qualitative_quoted", | |
| "found_terms": [], | |
| } | |
| # Strip the system-approved breadcrumb before checking | |
| check_text = cls._strip_breadcrumb(answer_lower) | |
| found_terms = cls._find_forbidden_terms(check_text) | |
| # No forbidden terms outside breadcrumb → clean pass | |
| if not found_terms: | |
| return { | |
| "applicable": True, | |
| "pass": True, | |
| "mode": "guarded", | |
| "found_terms": [], | |
| } | |
| # If answer has verified document citations (📖), forbidden terms | |
| # are from the quoted document, not hallucinations. | |
| if cls._also_has_doc_citation(answer): | |
| return { | |
| "applicable": True, | |
| "pass": True, | |
| "mode": "document_quoted", | |
| "found_terms": found_terms, | |
| } | |
| # If all found forbidden terms appear only in an unavailability | |
| # disclaimer context (e.g., "pH is not available in documents"), | |
| # the answer is correctly acknowledging its limitations. | |
| if any(p in answer_lower for p in cls.NOT_AVAILABLE_PATTERNS): | |
| return { | |
| "applicable": True, | |
| "pass": True, | |
| "mode": "unavailable_disclaimed", | |
| "found_terms": found_terms, | |
| } | |
| # State C: User explicitly asked for out-of-scope metrics | |
| query_lower = query.lower() | |
| explicit_request = any(term in query_lower for term in cls.EXPLICIT_REQUEST_TERMS) | |
| if explicit_request: | |
| if cls._has_warning(answer_lower): | |
| return { | |
| "applicable": True, | |
| "pass": True, | |
| "mode": "explicit_request_warned", | |
| "found_terms": found_terms, | |
| } | |
| return { | |
| "applicable": True, | |
| "pass": False, | |
| "mode": "explicit_request_unwarned", | |
| "found_terms": found_terms, | |
| "reason": "User asked for out-of-scope params but AI omitted mandatory bifurcation warning", | |
| } | |
| # State B: AI hallucinated unprompted (outside breadcrumb) | |
| return { | |
| "applicable": True, | |
| "pass": False, | |
| "mode": "unprompted_hallucination", | |
| "found_terms": found_terms, | |
| "reason": "AI mentioned forbidden terms without explicit user request", | |
| } | |
| # ============================================================================= | |
| # CITATION ACCURACY CHECKER (Emoji Audit) | |
| # ============================================================================= | |
| class CitationAccuracyChecker: | |
| """Verifies correct emoji usage (📚, 📖, ⚠️) based on retrieval tier metadata. | |
| Rules: | |
| - If use_structured_params=True AND answer cites temp/humidity/light → expect 📚 | |
| - If verified chunks present and alias filter passes → expect 📖 for each cited source | |
| - If no verified chunks or plant not in DB → expect ⚠️ | |
| """ | |
| _EMOJI_PATTERNS = { | |
| "db": re.compile(r'📚'), | |
| "doc": re.compile(r'📖'), | |
| "ai": re.compile(r'⚠️'), | |
| } | |
| def check(cls, answer: str, metadata: Dict) -> Dict: | |
| has_db = bool(cls._EMOJI_PATTERNS["db"].search(answer)) | |
| has_doc = bool(cls._EMOJI_PATTERNS["doc"].search(answer)) | |
| has_ai = bool(cls._EMOJI_PATTERNS["ai"].search(answer)) | |
| use_structured = metadata.get("use_structured_params", False) | |
| chunks = metadata.get("retrieved_chunks", []) | |
| aliases = metadata.get("plant_aliases") | |
| # Determine expected emojis | |
| expected = set() | |
| if use_structured: | |
| expected.add("📚") | |
| if chunks: | |
| # Check if any chunk is verified AND passes alias filter | |
| from app.vector_store import _is_verified, _chunk_mentions_plant | |
| verified_docs = any( | |
| _is_verified(c) and (aliases is None or _chunk_mentions_plant(c, aliases)) | |
| for c in chunks | |
| ) | |
| if verified_docs: | |
| expected.add("📖") | |
| if not expected or not (use_structured or any(_is_verified(c) for c in chunks)): | |
| # No verified sources → must have ⚠️ | |
| pass # ⚠️ is always acceptable | |
| found = set() | |
| if has_db: | |
| found.add("📚") | |
| if has_doc: | |
| found.add("📖") | |
| if has_ai: | |
| found.add("⚠️") | |
| issues = [] | |
| # RULE: ⚠️ cannot coexist with 📚 (verified DB) | |
| if has_db and has_ai: | |
| issues.append("⚠️ mixed with 📚 — AI estimate cannot appear alongside verified database data") | |
| # Warning: if 📚 is found but shouldn't be | |
| if not use_structured and has_db: | |
| issues.append("Unexpected 📚 (use_structured_params=False)") | |
| # Warning: if 📖 is found but no verified docs | |
| if has_doc and not any(_is_verified(c) for c in chunks): | |
| issues.append("Unexpected 📖 (no verified chunks)") | |
| # Warning: if no ⚠️ but answer uses AI content | |
| if not has_ai and not use_structured and not chunks: | |
| issues.append("Missing ⚠️ (AI-generated content without disclaimer)") | |
| return { | |
| "found_emojis": list(found), | |
| "expected_emojis": list(expected), | |
| "issues": issues, | |
| "pass": len(issues) == 0, | |
| } | |
| # ============================================================================= | |
| # INDONESIAN TERMINOLOGY NUANCE CHECKER | |
| # ============================================================================= | |
| class TerminologyNuanceChecker: | |
| """5-case sub-suite for Indonesian agricultural terminology accuracy.""" | |
| def check_kecambah_tunas(answer: str) -> Dict: | |
| """Case 1: Must distinguish k生长发育 from tunas correctly.""" | |
| has_mungbean = any(t in answer.lower() for t in ["mung bean sprouts", "mung bean", "kacang hijau", "kecambah", "toge", "tauge"]) | |
| has_tunas_as_plant = bool(re.search(r'(?:📚|📖)\s*S(?:ource|umber).*tunas', answer, re.IGNORECASE)) | |
| has_vegetative = any(t in answer.lower() for t in ["vegetatif", "vegetative", "tunas"]) | |
| return { | |
| "case": "kecambah_vs_tunas", | |
| "mungbean_identified": has_mungbean, | |
| "tunas_not_misidentified_as_plant": not has_tunas_as_plant, | |
| "tunas_as_vegetative": has_vegetative, | |
| "pass": has_mungbean and not has_tunas_as_plant and has_vegetative, | |
| } | |
| def check_layu_fusarium(answer: str) -> Dict: | |
| """Case 2: Must distinguish Fusarium wilt from drought wilt.""" | |
| has_fusarium = "fusarium" in answer.lower() | |
| has_diagnosis = any(t in answer.lower() for t in ["pembuluh", "vascular", "bercak", "layu"]) | |
| return { | |
| "case": "layu_fusarium_vs_kekeringan", | |
| "fusarium_mentioned": has_fusarium, | |
| "has_diagnostic_content": has_diagnosis, | |
| "pass": has_fusarium, | |
| } | |
| def check_busuk_akar_pythium(answer: str) -> Dict: | |
| """Case 3: Must explain busuk akar (symptom) vs Pythium (pathogen).""" | |
| has_pythium = "pythium" in answer.lower() | |
| has_hierarchy = any(t in answer.lower() for t in ["disebabkan", "caused by", "patogen", "pathogen", "jamur air", "water mold"]) | |
| return { | |
| "case": "busuk_akar_vs_pythium", | |
| "pythium_mentioned": has_pythium, | |
| "has_hierarchy_explanation": has_hierarchy, | |
| "pass": has_pythium and has_hierarchy, | |
| } | |
| def check_kacang_hijau(answer: str) -> Dict: | |
| """Case 4: Must resolve kacang hijau to mung bean (not green beans/buncis).""" | |
| has_mung = "mung" in answer.lower() or "kacang hijau" in answer.lower() | |
| has_wrong = any(t in answer.lower() for t in ["buncis", "green bean", "snap bean", "string bean"]) | |
| return { | |
| "case": "kacang_hijau", | |
| "correct_plant": has_mung, | |
| "wrong_plant": has_wrong, | |
| "pass": has_mung and not has_wrong, | |
| } | |
| def check_baginda_f1(answer: str) -> Dict: | |
| """Case 5: Must resolve Baginda F1 to watermelon.""" | |
| has_watermelon = any(t in answer.lower() for t in ["watermelon", "semangka"]) | |
| has_parameters = any(t in answer.lower() for t in ["°c", "derajat", "celsius", "lux", "%", "persen", "kelembaban"]) | |
| return { | |
| "case": "baginda_f1", | |
| "watermelon_resolved": has_watermelon, | |
| "has_parameters": has_parameters, | |
| "pass": has_watermelon and has_parameters, | |
| } | |
| def evaluate_all(answer: str) -> Dict: | |
| results = { | |
| "kecambah_vs_tunas": TerminologyNuanceChecker.check_kecambah_tunas(answer), | |
| "layu_fusarium_vs_kekeringan": TerminologyNuanceChecker.check_layu_fusarium(answer), | |
| "busuk_akar_vs_pythium": TerminologyNuanceChecker.check_busuk_akar_pythium(answer), | |
| "kacang_hijau": TerminologyNuanceChecker.check_kacang_hijau(answer), | |
| "baginda_f1": TerminologyNuanceChecker.check_baginda_f1(answer), | |
| } | |
| passed = sum(1 for r in results.values() if r["pass"]) | |
| total = len(results) | |
| return { | |
| "results": results, | |
| "total": total, | |
| "passed": passed, | |
| "accuracy": passed / total if total > 0 else 0, | |
| } | |
| # ============================================================================= | |
| # GRADED RELEVANCE COMPUTATION (Phase 1) | |
| # ============================================================================= | |
| def compute_relevance_grade( | |
| chunk_source: str, | |
| content: str, | |
| expected_source: str, | |
| expected_plant: str, | |
| expected_keywords: list, | |
| source_plant_map: dict, | |
| ) -> float: | |
| """Compute graded relevance (0.0, 0.25, 0.5, 1.0) for a retrieved chunk.""" | |
| source_match = chunk_source == expected_source.strip() | |
| keyword_match = any(kw.lower() in content for kw in expected_keywords) if expected_keywords else False | |
| if source_match and keyword_match: | |
| return 1.0 | |
| if not keyword_match: | |
| return 0.0 | |
| expected_family = _get_plant_family(expected_plant) if expected_plant else "" | |
| if expected_family: | |
| chunk_plant = source_plant_map.get(chunk_source, "") | |
| chunk_family = _get_plant_family(chunk_plant) if chunk_plant else "" | |
| if chunk_family and chunk_family == expected_family: | |
| return 0.5 | |
| expected_cat = _get_source_category(expected_source) | |
| chunk_cat = _get_source_category(chunk_source) | |
| if expected_cat != "general" and chunk_cat != "general" and expected_cat == chunk_cat: | |
| return 0.25 | |
| return 0.0 | |
| # ============================================================================= | |
| # RETRIEVAL SPECIFICITY CLASSIFICATION (Phase 4) | |
| # ============================================================================= | |
| def classify_top1_retrieval( | |
| chunk: dict, | |
| case: dict, | |
| source_plant_map: dict, | |
| ) -> str: | |
| """Classify the top-1 RRF result into Exact/Family/Topic/Irrelevant match.""" | |
| chunk_source = chunk.get("source", "").strip() | |
| content = (chunk.get("content") or "").lower() | |
| expected_source = case.get("expected_source", "").strip() | |
| expected_keywords = case.get("expected_content_keywords") or [] | |
| expected_plant = case.get("expected_plant", "") | |
| source_match = chunk_source == expected_source | |
| keyword_match = any(kw.lower() in content for kw in expected_keywords) if expected_keywords else False | |
| if source_match and keyword_match: | |
| return "Exact Match" | |
| if keyword_match: | |
| expected_family = _get_plant_family(expected_plant) if expected_plant else "" | |
| if expected_family: | |
| chunk_plant = source_plant_map.get(chunk_source, "") | |
| chunk_family = _get_plant_family(chunk_plant) if chunk_plant else "" | |
| if chunk_family and chunk_family == expected_family: | |
| return "Family Match" | |
| return "Topic Match" | |
| return "Irrelevant" | |
| # ============================================================================= | |
| # SYSTEM PRECISION EVALUATOR (Phase 3) | |
| # ============================================================================= | |
| class SystemPrecisionEvaluator: | |
| """End-to-end correctness audit of the full pipeline.""" | |
| DIMENSIONS = ["numerical_rigor", "citation_accuracy", "constraint_satisfaction"] | |
| def __init__(self): | |
| self.results: List[Dict] = [] | |
| async def evaluate_case(self, case: Dict) -> Dict: | |
| query = case["query"] | |
| case_id = case.get("case_id", "unknown") | |
| result = await generate_context_aware_response( | |
| query=query, | |
| sensors=None, | |
| has_live_sensors=False, | |
| plant_override=case.get("expected_plant"), | |
| stage_override=case.get("expected_stage"), | |
| history=None, | |
| ) | |
| answer = result.get("response", "") | |
| metadata = result.get("_evaluation_metadata", {}) | |
| retrieved_chunks = metadata.get("retrieved_chunks", []) | |
| use_structured = metadata.get("use_structured_params", False) | |
| gt_params = {} | |
| if case.get("expected_plant"): | |
| from app.local_plant_db import get_plant_parameters | |
| params = get_plant_parameters(case["expected_plant"], case.get("expected_stage") or "vegetative") | |
| if params: | |
| gt_params["temperature"] = {"value": params.get("ideal_temp_optimal"), "min": params.get("ideal_temp_min"), "max": params.get("ideal_temp_max")} | |
| gt_params["humidity"] = {"value": params.get("ideal_rh_optimal"), "min": params.get("ideal_rh_min"), "max": params.get("ideal_rh_max")} | |
| gt_params["light"] = {"value": params.get("ideal_light_optimal") or params.get("ideal_light_min"), "min": params.get("ideal_light_min"), "max": params.get("ideal_light_max")} | |
| should_score_numerical = ( | |
| case.get("case_group") == "quantitative" | |
| and use_structured | |
| and bool(gt_params) | |
| ) | |
| if should_score_numerical: | |
| numerical = NumericalRigorChecker.evaluate_answer( | |
| answer, | |
| gt_params, | |
| requested_params=_requested_numeric_params(query), | |
| ) | |
| else: | |
| numerical = {"applicable": False, "overall_pass": True} | |
| constraint = ConstraintSatisfactionChecker.check(answer, query, use_structured) | |
| citation = CitationAccuracyChecker.check(answer, metadata) | |
| if not constraint["pass"]: | |
| print(f" [DEBUG] Case {case_id} failed constraint: mode={constraint.get('mode','?')}, terms={constraint.get('found_terms',[])}") | |
| else: | |
| print(f" [DEBUG] Case {case_id} constraint: mode={constraint.get('mode','?')}") | |
| eval_result = { | |
| "case_id": case_id, | |
| "query": query, | |
| "answer": answer[:300], | |
| "numerical_rigor": numerical["overall_pass"], | |
| "citation_accuracy": citation["pass"], | |
| "constraint_satisfaction": constraint["pass"], | |
| } | |
| self.results.append(eval_result) | |
| return eval_result | |
| def compute_precision(self) -> Dict: | |
| if not self.results: | |
| return {"system_precision": 0.0, "dimension_scores": {}, "n": 0} | |
| n = len(self.results) | |
| dim_scores = {} | |
| for dim in self.DIMENSIONS: | |
| passed = sum(1 for r in self.results if r.get(dim, False)) | |
| dim_scores[dim] = round(passed / n, 4) | |
| overall = sum(dim_scores.values()) / len(self.DIMENSIONS) | |
| return { | |
| "system_precision": round(overall, 4), | |
| "dimension_scores": dim_scores, | |
| "n": n, | |
| } | |
| def print_report(self): | |
| summary = self.compute_precision() | |
| print() | |
| print("-" * 50) | |
| print(" SYSTEM PRECISION (End-to-End Audit)") | |
| print("-" * 50) | |
| print(f" Cases evaluated: {summary['n']}") | |
| for dim, score in summary["dimension_scores"].items(): | |
| label = dim.replace("_", " ").title() | |
| print(f" {label:25} {score:.1%}") | |
| print(f" {'System Precision':25} {summary['system_precision']:.1%}") | |
| print("-" * 50) | |
| def export(self, path: Path): | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump({"results": self.results, "summary": self.compute_precision()}, f, indent=2) | |
| # ============================================================================= | |
| # YOUDEN'S J CALIBRATION | |
| # ============================================================================= | |
| class YoudenJCalibrator: | |
| """Youden's J = Sensitivity + Specificity - 1 for threshold optimization.""" | |
| def __init__(self): | |
| self.records: List[Tuple[float, bool, bool, str, str]] = [] # (similarity, is_tp, is_cross_modal, category, case_id) | |
| self.graded_records: List[Tuple[float, float, bool, str, str]] = [] # (similarity, relevance_grade, is_cross_modal, category, case_id) | |
| self.category_records: Dict[str, List[Tuple[float, bool, str]]] = {} | |
| def add_record(self, similarity: float, is_tp: bool, is_cross_modal: bool, category: str = "unknown", case_id: str = ""): | |
| self.records.append((similarity, is_tp, is_cross_modal, category, case_id)) | |
| if category not in self.category_records: | |
| self.category_records[category] = [] | |
| self.category_records[category].append((similarity, is_tp, case_id)) | |
| def add_record_graded(self, similarity: float, relevance_grade: float, is_cross_modal: bool, category: str = "unknown", case_id: str = ""): | |
| self.graded_records.append((similarity, relevance_grade, is_cross_modal, category, case_id)) | |
| def compute_graded(self, cross_modal_only: bool = False) -> Tuple[float, float]: | |
| subset = [(s, rg) for s, rg, cm, _, _ in self.graded_records if not cross_modal_only or cm] | |
| if not subset: | |
| return 0.0, 0.0 | |
| grade_pos = [(s, rg) for s, rg in subset if rg > 0] | |
| grade_neg = [(s, rg) for s, rg in subset if rg == 0] | |
| if not grade_pos or not grade_neg: | |
| return 0.0, 0.0 | |
| total_possible_grade = sum(rg for _, rg in grade_pos) | |
| neg_count = len(grade_neg) | |
| best_t, best_j = 0.0, -99.0 | |
| for ti in CANDIDATE_RANGE: | |
| t = ti / 100.0 | |
| retrieved_grade = sum(rg for s, rg in grade_pos if s >= t) | |
| tpr_graded = retrieved_grade / total_possible_grade if total_possible_grade > 0 else 0 | |
| fpr_graded = sum(1 for s, _ in grade_neg if s >= t) / neg_count if neg_count > 0 else 0 | |
| j = tpr_graded - fpr_graded | |
| if j > best_j: | |
| best_j = j | |
| best_t = t | |
| return best_t, best_j | |
| def compute(self, cross_modal_only: bool = False) -> Tuple[float, float]: | |
| subset = [(s, tp) for s, tp, cm, _, _ in self.records if not cross_modal_only or cm] | |
| if not subset: | |
| return 0.0, 0.0 | |
| tp_scores = [s for s, tp in subset if tp] | |
| tn_scores = [s for s, tp in subset if not tp] | |
| if not tp_scores or not tn_scores: | |
| return 0.0, 0.0 | |
| best_t, best_j = 0.0, -99.0 | |
| for ti in CANDIDATE_RANGE: | |
| t = ti / 100.0 | |
| tpr = sum(1 for s in tp_scores if s >= t) / len(tp_scores) | |
| fpr = sum(1 for s in tn_scores if s >= t) / len(tn_scores) | |
| j = tpr - fpr | |
| if j > best_j: | |
| best_j = j | |
| best_t = t | |
| return best_t, best_j | |
| def report_per_category(self, threshold: float) -> str: | |
| """Build per-category accuracy report for thesis Results chapter.""" | |
| lines = ["\n--- Per-Category Accuracy ---"] | |
| lines.append(f"{'Category':<25} {'Cases':>6} {'TP':>4} {'FP':>4} {'Acc':>6}") | |
| lines.append("-" * 50) | |
| for cat in sorted(self.category_records.keys()): | |
| records = self.category_records[cat] | |
| tp = sum(1 for s, is_tp, _ in records if is_tp and s >= threshold) | |
| fp = sum(1 for s, is_tp, _ in records if not is_tp and s >= threshold) | |
| total = len(records) | |
| hits = sum(1 for s, is_tp, _ in records if is_tp and s >= threshold) | |
| acc = hits / total if total > 0 else 0.0 | |
| precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0 | |
| lines.append(f"{cat:<25} {total:>6} {tp:>4} {fp:>4} {acc:>6.0%}") | |
| return "\n".join(lines) | |
| def build_histogram(self) -> str: | |
| """Build ASCII histogram of TP vs TN score distributions.""" | |
| if not self.records: | |
| return "(no data)" | |
| tp_scores = [s for s, tp, _, _, _ in self.records if tp] | |
| tn_scores = [s for s, tp, _, _, _ in self.records if not tp] | |
| lines = [] | |
| lines.append(f"All dense chunks (n={len(self.records)}, TP={len(tp_scores)}, TN={len(tn_scores)})") | |
| lines.append(f"{'Bucket':>14} {'TP':>4} {'TN':>4} {'TP (█)':25} {'TN (░)':25}") | |
| lines.append("-" * 75) | |
| bin_width = (HISTOGRAM_HIGH - HISTOGRAM_LOW) / HISTOGRAM_BINS | |
| max_count = max( | |
| max( | |
| sum(1 for s in tp_scores if HISTOGRAM_LOW + i * bin_width <= s < HISTOGRAM_LOW + (i + 1) * bin_width), | |
| sum(1 for s in tn_scores if HISTOGRAM_LOW + i * bin_width <= s < HISTOGRAM_LOW + (i + 1) * bin_width), | |
| ) | |
| for i in range(HISTOGRAM_BINS) | |
| ) or 1 | |
| for i in range(HISTOGRAM_BINS): | |
| lo = HISTOGRAM_LOW + i * bin_width | |
| hi = lo + bin_width | |
| tp_n = sum(1 for s in tp_scores if lo <= s < hi) | |
| tn_n = sum(1 for s in tn_scores if lo <= s < hi) | |
| tp_bar = "#" * int(tp_n / max_count * 24) | |
| tn_bar = "~" * int(tn_n / max_count * 24) | |
| lines.append(f" {lo:.2f}–{hi:.2f} {tp_n:>4} {tn_n:>4} {tp_bar:<25} {tn_bar}") | |
| return "\n".join(lines) | |
| # ============================================================================= | |
| # DATASET LOADING | |
| # ============================================================================= | |
| def load_golden_qa_cases() -> List[Dict]: | |
| path = FIXTURES_DIR / "golden_qa_cases.json" | |
| if not path.exists(): | |
| raise FileNotFoundError(f"Golden QA cases not found at {path}") | |
| with open(path, encoding="utf-8") as f: | |
| return json.load(f) | |
| def load_synthetic_qa_cases() -> List[Dict]: | |
| path = FIXTURES_DIR / "synthetic_qa_cases.json" | |
| if not path.exists(): | |
| return [] | |
| with open(path, encoding="utf-8") as f: | |
| return json.load(f) | |
| def save_synthetic_qa_cases(cases: List[Dict]): | |
| path = FIXTURES_DIR / "synthetic_qa_cases.json" | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(cases, f, indent=2, ensure_ascii=False) | |
| def load_cross_lingual_cases() -> List[Dict]: | |
| path = FIXTURES_DIR / "cross_lingual_cases.json" | |
| if not path.exists(): | |
| return [] | |
| with open(path, encoding="utf-8") as f: | |
| return json.load(f) | |
| async def generate_synthetic_dataset() -> List[Dict]: | |
| """Generate synthetic QA pairs using Ragas TestsetGenerator with gpt-4o-mini.""" | |
| if not HAS_RAGAS: | |
| raise RuntimeError("RAGAS not installed. Cannot generate synthetic dataset.") | |
| print("[Synthetic] Generating synthetic QA dataset with gpt-4o-mini...") | |
| print("[Synthetic] This may take several minutes and cost ~$0.50-$1.00.") | |
| try: | |
| from ragas.testset.generator import TestsetGenerator | |
| from ragas.testset.evolutions import simple, reasoning, multi_context | |
| except ImportError: | |
| from ragas.testset import TestsetGenerator | |
| simple, reasoning, multi_context = None, None, None | |
| with open(DATA_DIR / "vector_database.json", encoding="utf-8") as f: | |
| raw_docs = json.load(f) | |
| from openai import OpenAI | |
| generator_llm = OpenAI(api_key=OPENAI_API_KEY) | |
| critic_llm = OpenAI(api_key=OPENAI_API_KEY) | |
| from ragas.llms import llm_factory | |
| gen_wrapper = llm_factory(OPENAI_MODEL, client=generator_llm) | |
| crit_wrapper = llm_factory(OPENAI_MODEL, client=critic_llm) | |
| generator = TestsetGenerator( | |
| generator_llm=gen_wrapper, | |
| critic_llm=crit_wrapper, | |
| ) | |
| if simple is not None: | |
| distributions = {simple: 0.5, reasoning: 0.3, multi_context: 0.2} | |
| else: | |
| distributions = None | |
| testset = generator.generate_with_langchain_docs( | |
| raw_docs, | |
| test_size=60, | |
| distributions=distributions, | |
| ) | |
| df = testset.to_pandas() | |
| cases = [] | |
| for _, row in df.iterrows(): | |
| cases.append({ | |
| "case_id": f"synthetic_{str(row.get('question', ''))[:20]}", | |
| "case_group": "synthetic", | |
| "query": row.get("question", ""), | |
| "ground_truth": row.get("ground_truth", ""), | |
| "query_type": "plant_specific", | |
| "response_language": _detect_language(str(row.get("question", ""))), | |
| "expected_emojis": ["⚠️"], | |
| }) | |
| save_synthetic_qa_cases(cases) | |
| print(f"[Synthetic] Generated {len(cases)} synthetic QA pairs.") | |
| return cases | |
| # ============================================================================= | |
| # OPERATIONAL METRICS | |
| # ============================================================================= | |
| def compute_tps(usage: dict, latency_ms: float) -> float: | |
| """Compute tokens per second from usage and latency.""" | |
| completion = usage.get("completion_tokens", 0) | |
| if completion > 0 and latency_ms > 0: | |
| return round(completion / (latency_ms / 1000.0), 1) | |
| return 0.0 | |
| def _detect_language(text: str) -> str: | |
| id_markers = {"suhu", "berapa", "kelembaban", "cahaya", "tanaman", | |
| "membutuhkan", "pertumbuhan", "perkecambahan", "berapakah", | |
| "kangkung", "selada", "bayam", "kailan", "cabai", | |
| "pakcoy", "seledri", "terong", "pada", "fase", "yang", | |
| "untuk", "dan", "dengan", "adalah", "secara"} | |
| en_markers = {"what", "how", "does", "need", "give", "is", "the", | |
| "temperature", "humidity", "light", "for", "during"} | |
| words = set(re.findall(r'\b\w+\b', text.lower())) | |
| id_score = sum(1 for m in id_markers if m in words) | |
| en_score = sum(1 for m in en_markers if m in words) | |
| if id_score > en_score: | |
| return "id" | |
| if en_score > id_score: | |
| return "en" | |
| return "id" if id_score > 0 else "en" | |
| # ============================================================================= | |
| # GROUND TRUTH VALIDATION | |
| # ============================================================================= | |
| def validate_ground_truths(human_cases: List[Dict]) -> List[str]: | |
| """Check hard-coded ground truths against live plants_database_day_night.json.""" | |
| db_path = DATA_DIR / "plants_database_day_night.json" | |
| if not db_path.exists(): | |
| return ["[WARN] plants_database_day_night.json not found -- skipping validation"] | |
| with open(db_path, encoding="utf-8") as f: | |
| plant_db = json.load(f) | |
| warnings_list = [] | |
| for case in human_cases: | |
| plant_id = case.get("expected_plant") | |
| if not plant_id: | |
| continue | |
| if case.get("is_negative_test"): | |
| continue | |
| resolved_id = _resolve_plant_id(plant_id) | |
| for plant in plant_db.get("plants", []): | |
| if plant.get("id") == resolved_id: | |
| stage = case.get("expected_stage", "vegetative") | |
| lifecycle = plant.get("lifecycle", {}) | |
| if stage not in lifecycle: | |
| warnings_list.append( | |
| f"[WARN] {case['case_id']}: stage '{stage}' not found in DB for {resolved_id}. " | |
| f"Available: {list(lifecycle.keys())}" | |
| ) | |
| continue | |
| stage_data = lifecycle[stage] | |
| db_day = stage_data.get("day", {}) | |
| if db_day: | |
| gt = case.get("ground_truth", "") | |
| db_opt = db_day.get("temp_optimal_c") | |
| if db_opt and str(db_opt) not in gt and str(int(db_opt)) not in gt: | |
| warnings_list.append( | |
| f"[WARN] {case['case_id']}: ground_truth may be stale. " | |
| f"DB temp_optimal_c={db_opt} not found in ground_truth text." | |
| ) | |
| return warnings_list | |
| # ============================================================================= | |
| # MAIN EVALUATION ENGINE | |
| # ============================================================================= | |
| class EvaluationEngine: | |
| def __init__(self, results_dir: Path = RESULTS_DIR): | |
| self.results_dir = results_dir | |
| self.results_dir.mkdir(parents=True, exist_ok=True) | |
| self.critic_logger = CriticReasoningLogger(results_dir / "critic_reasoning_log.jsonl") | |
| self.all_results: List[Dict] = [] | |
| self.cost_tracker = {"cases_completed": 0, "total_estimated_cost": 0.0} | |
| async def evaluate_single_case(self, case: Dict) -> Dict: | |
| query = case["query"] | |
| ground_truth = case.get("ground_truth", "") | |
| acceptable_answers = case.get("acceptable_answers", []) | |
| response_language = case.get("response_language") | |
| temporal_context = case.get("temporal_context") | |
| t_start = time.perf_counter() | |
| result = await generate_context_aware_response( | |
| query=query, | |
| sensors=None, | |
| has_live_sensors=False, | |
| plant_override=case.get("expected_plant"), | |
| stage_override=case.get("expected_stage"), | |
| history=None, | |
| response_language=response_language, | |
| temporal_context=temporal_context, | |
| ) | |
| t_generation = time.perf_counter() | |
| answer = result.get("response", "") | |
| metadata = result.get("_evaluation_metadata", {}) | |
| meta_latency = metadata.get("latency_ms", 0) | |
| latency_ms = meta_latency if meta_latency > 0 else round((t_generation - t_start) * 1000, 1) | |
| retrieved_chunks = metadata.get("retrieved_chunks", []) | |
| contexts = [c.get("content", "") for c in retrieved_chunks if c.get("content")] | |
| use_structured = metadata.get("use_structured_params", False) | |
| resolved_phase = metadata.get("resolved_phase") | |
| model_used = metadata.get("model_used", "unknown") | |
| usage = metadata.get("token_usage", {}) | |
| semantic_scores = metadata.get("semantic_scores", []) | |
| fts_scores = metadata.get("fts_scores", []) | |
| rrf_ranks = metadata.get("rrf_ranks", []) | |
| bge_top_doc = metadata.get("bge_top_doc", "") | |
| fts_top_doc = metadata.get("fts_top_doc", "") | |
| tie_breaker_flag = metadata.get("tie_breaker_flag", False) | |
| gt_params = {} | |
| if case.get("expected_plant"): | |
| from app.local_plant_db import get_plant_parameters | |
| params = get_plant_parameters(case["expected_plant"], case.get("expected_stage") or "vegetative") | |
| if params: | |
| gt_params["temperature"] = {"value": params.get("ideal_temp_optimal"), "min": params.get("ideal_temp_min"), "max": params.get("ideal_temp_max")} | |
| gt_params["humidity"] = {"value": params.get("ideal_rh_optimal"), "min": params.get("ideal_rh_min"), "max": params.get("ideal_rh_max")} | |
| gt_params["light"] = {"value": params.get("ideal_light_optimal") or params.get("ideal_light_min"), "min": params.get("ideal_light_min"), "max": params.get("ideal_light_max")} | |
| should_score_numerical = ( | |
| case.get("case_group") == "quantitative" | |
| and use_structured | |
| and bool(gt_params) | |
| ) | |
| requested_params = _requested_numeric_params(query) if should_score_numerical else set() | |
| if should_score_numerical: | |
| numerical_result = NumericalRigorChecker.evaluate_answer( | |
| answer, | |
| gt_params, | |
| requested_params=requested_params, | |
| ) | |
| else: | |
| numerical_result = { | |
| "applicable": False, | |
| "status": "NOT_APPLICABLE", | |
| "requested_params": [], | |
| "param_results": {}, | |
| "overall_pass": True, | |
| "factual_score_override": 1.0, | |
| } | |
| temporal_result = TemporalAdherenceChecker.check(answer, resolved_phase) | |
| constraint_result = ConstraintSatisfactionChecker.check(answer, query, use_structured) | |
| citation_result = CitationAccuracyChecker.check(answer, metadata) | |
| terminology_result = TerminologyNuanceChecker.evaluate_all(answer) if case.get("risk_flag") else None | |
| tps = compute_tps(usage, latency_ms) | |
| ragas_scores = await self._compute_ragas_scores( | |
| question=query, answer=answer, contexts=contexts, | |
| ground_truth=ground_truth, acceptable_answers=acceptable_answers, | |
| ) | |
| # D1 Guardrail: retry low-faith cases with strict document synthesis | |
| _guardrail_applied = False | |
| try: | |
| raw_faith = ragas_scores.get("faithfulness", "") | |
| if isinstance(raw_faith, (int, float)) and float(raw_faith) < 0.3 and len(contexts) > 0: | |
| case_id = case.get("case_id", "?") | |
| print(f"[Guardrail] Low faith ({raw_faith:.4f}) with {len(contexts)} chunks — retrying {case_id}") | |
| # Build strict context-only instruction | |
| ctx_text = "\n\n".join( | |
| f"--- Document {i+1} ---\n{c[:1000]}" | |
| for i, c in enumerate(contexts[:5]) | |
| ) | |
| guardrail_prompt = ( | |
| "You are Veridia, an agricultural assistant. Answer the user's question " | |
| "using ONLY the provided context below. Follow these rules strictly:\n" | |
| "1. If the context contains the answer, summarize it directly.\n" | |
| "2. Do NOT say 'tidak ditemukan dalam dokumen' or 'not found in documents'.\n" | |
| "3. Do NOT use your own training knowledge — only the context below.\n" | |
| "4. If the context does not contain relevant information, say: " | |
| "'The available documents do not contain this specific information.'\n\n" | |
| f"CONTEXT:\n{ctx_text}" | |
| ) | |
| retry_answer_raw = await call_llm_with_history( | |
| system_prompt=guardrail_prompt, | |
| user_message=query, | |
| temperature=0.3, | |
| ) | |
| retry_answer = retry_answer_raw if isinstance(retry_answer_raw, str) else retry_answer_raw.get("content", "") | |
| if retry_answer and len(retry_answer) > 50: | |
| retry_scores = await self._compute_ragas_scores( | |
| question=query, answer=retry_answer, contexts=contexts, | |
| ground_truth=ground_truth, acceptable_answers=acceptable_answers, | |
| ) | |
| retry_faith = retry_scores.get("faithfulness", "") | |
| if (isinstance(retry_faith, (int, float)) | |
| and float(retry_faith) > float(raw_faith)): | |
| print(f"[Guardrail] Improved faith: {raw_faith:.4f} -> {retry_faith:.4f}") | |
| ragas_scores = retry_scores | |
| answer = retry_answer | |
| _guardrail_applied = True | |
| except Exception as guardrail_err: | |
| print(f"[Guardrail] Error during retry: {guardrail_err}") | |
| prompt_tokens = usage.get("prompt_tokens", 0) | |
| completion_tokens = usage.get("completion_tokens", 0) | |
| generation_cost = (prompt_tokens * 0.15 + completion_tokens * 0.60) / 1_000_000 | |
| critic_cost = (len(ragas_scores.get("_metrics_computed", [])) * 2000 * 0.75 + 300 * 4.50) / 1_000_000 | |
| estimated_cost = round(generation_cost + critic_cost, 6) | |
| eval_result = { | |
| "case_id": case["case_id"], | |
| "case_group": case.get("case_group", "unknown"), | |
| "query": query, | |
| "answer": answer, | |
| "ground_truth": ground_truth, | |
| "latency_ms": latency_ms, | |
| "model_used": model_used, | |
| "tps": tps, | |
| "token_usage": usage, | |
| "estimated_cost_usd": estimated_cost, | |
| "retrieved_chunks_count": len(retrieved_chunks), | |
| "retrieval_mode": metadata.get("retrieval_mode", "unknown"), | |
| "parent_expansion": metadata.get("parent_expansion", False), | |
| "category": case.get("category", "unknown"), | |
| "semantic_scores": semantic_scores, | |
| "fts_scores": fts_scores, | |
| "rrf_ranks": rrf_ranks, | |
| "bge_top_doc": bge_top_doc, | |
| "fts_top_doc": fts_top_doc, | |
| "tie_breaker_flag": tie_breaker_flag, | |
| "numerical_rigor": numerical_result, | |
| "temporal_adherence": temporal_result, | |
| "constraint_satisfaction": constraint_result, | |
| "citation_accuracy": citation_result, | |
| "terminology_nuance": terminology_result, | |
| "ragas_scores": ragas_scores, | |
| "guardrail_applied": _guardrail_applied, | |
| } | |
| self.all_results.append(eval_result) | |
| self.cost_tracker["cases_completed"] += 1 | |
| self.cost_tracker["total_estimated_cost"] += estimated_cost | |
| return eval_result | |
| async def _compute_ragas_scores(self, question, answer, contexts, ground_truth, acceptable_answers=None) -> Dict: | |
| if not HAS_RAGAS or not HAS_OPENAI: | |
| return {"error": "RAGAS or OpenAI not available", "_metrics_computed": []} | |
| try: | |
| from openai import AsyncOpenAI as _AsyncOAI | |
| async_client = _AsyncOAI(api_key=OPENAI_API_KEY) | |
| critic_llm = llm_factory(RAGAS_CRITIC_MODEL, client=async_client, max_tokens=4096) | |
| ctx = contexts if isinstance(contexts, list) else [contexts] if contexts else [] | |
| result = {} | |
| computed = [] | |
| # Faithfulness: answer grounded in retrieved context (skip when no chunks) | |
| f_metric = Faithfulness(llm=critic_llm) | |
| try: | |
| if not ctx: | |
| result["faithfulness"] = "skipped: no retrieved context" | |
| else: | |
| # Truncate contexts to avoid max_tokens issues in critic LLM generation | |
| truncated_ctx = [c[:1200] for c in ctx][:7] | |
| score = await _retry_ragas_call( | |
| lambda q=question, a=answer, tc=truncated_ctx: f_metric.ascore( | |
| user_input=q, response=a, retrieved_contexts=tc | |
| ), | |
| "faithfulness", | |
| ) | |
| result["faithfulness"] = round(score, 4) | |
| computed.append("faithfulness") | |
| except Exception as e: | |
| result["faithfulness"] = _categorize_ragas_error(e) | |
| # Context Precision: ranking quality (needs ground_truth + contexts) | |
| cp_metric = ContextPrecision(llm=critic_llm) | |
| try: | |
| if not ctx: | |
| result["context_precision"] = "skipped: no retrieved context" | |
| elif not ground_truth: | |
| result["context_precision"] = "skipped: no ground_truth" | |
| else: | |
| truncated_ctx = [c[:1200] for c in ctx][:7] | |
| score = await _retry_ragas_call( | |
| lambda q=question, gt=ground_truth, tc=truncated_ctx: cp_metric.ascore( | |
| user_input=q, reference=gt, retrieved_contexts=tc | |
| ), | |
| "context_precision", | |
| ) | |
| result["context_precision"] = round(score, 4) | |
| computed.append("context_precision") | |
| except Exception as e: | |
| result["context_precision"] = _categorize_ragas_error(e) | |
| # Context Recall: did we find all needed facts (needs ground_truth + contexts) | |
| cr_metric = ContextRecall(llm=critic_llm) | |
| try: | |
| if not ctx: | |
| result["context_recall"] = "skipped: no retrieved context" | |
| elif not ground_truth: | |
| result["context_recall"] = "skipped: no ground_truth" | |
| else: | |
| truncated_ctx = [c[:1200] for c in ctx][:7] | |
| score = await _retry_ragas_call( | |
| lambda q=question, gt=ground_truth, tc=truncated_ctx: cr_metric.ascore( | |
| user_input=q, retrieved_contexts=tc, reference=gt | |
| ), | |
| "context_recall", | |
| ) | |
| result["context_recall"] = round(score, 4) | |
| computed.append("context_recall") | |
| except Exception as e: | |
| result["context_recall"] = _categorize_ragas_error(e) | |
| # Answer Correctness: LLM-based only (weights=[1.0, 0.0] disables embedding component) | |
| try: | |
| ac_metric = AnswerCorrectness(llm=critic_llm, weights=[1.0, 0.0]) | |
| _all_gts = [ground_truth] + (acceptable_answers or []) | |
| _all_gts = [gt for gt in _all_gts if gt] | |
| if _all_gts: | |
| best_ac = 0.0 | |
| for i, gt_candidate in enumerate(_all_gts): | |
| score = await _retry_ragas_call( | |
| lambda q=question, a=answer, gt=gt_candidate: ac_metric.ascore( | |
| user_input=q, response=a, reference=gt | |
| ), | |
| f"answer_correctness[{i}]", | |
| ) | |
| if score > best_ac: | |
| best_ac = score | |
| result["answer_correctness"] = round(best_ac, 4) | |
| computed.append("answer_correctness") | |
| else: | |
| result["answer_correctness"] = "skipped: no ground_truth" | |
| except Exception as e: | |
| result["answer_correctness"] = _categorize_ragas_error(e) | |
| # Answer Relevance: needs ragas-native OpenAI embeddings (not langchain's) | |
| try: | |
| import sys as _sys | |
| from ragas.embeddings import OpenAIEmbeddings as _RagasEmbed | |
| _ar_cls = ( | |
| getattr(_sys.modules[__name__], "ResponseRelevancy", None) | |
| or getattr(_sys.modules[__name__], "AnswerRelevancy", None) | |
| or _AnswerRelevanceMetric | |
| ) | |
| _embeddings = _RagasEmbed(client=async_client) | |
| ar_metric = _ar_cls(llm=critic_llm, embeddings=_embeddings) | |
| score = await _retry_ragas_call( | |
| lambda q=question, a=answer: ar_metric.ascore( | |
| user_input=q, response=a | |
| ), | |
| "answer_relevance", | |
| ) | |
| result["answer_relevance"] = round(score, 4) | |
| computed.append("answer_relevance") | |
| except Exception as e: | |
| result["answer_relevance"] = _categorize_ragas_error(e) | |
| # Strict Data Rule override for custom Numerical Rigor | |
| if self.all_results and self.all_results[-1].get("numerical_rigor", {}).get("overall_pass") is False: | |
| result["_numerical_rigor_override"] = True | |
| result["_metrics_computed"] = computed | |
| return result | |
| except Exception as e: | |
| print(f"[RAGAS] Error computing metrics: {e}") | |
| return {"error": str(e), "_metrics_computed": []} | |
| def export_csv(self): | |
| path = self.results_dir / "results_detail.csv" | |
| if not self.all_results: | |
| print("[Export] No results to export.") | |
| return | |
| rows = [] | |
| for r in self.all_results: | |
| ragas = r.get("ragas_scores", {}) | |
| row = { | |
| "case_id": r["case_id"], | |
| "case_group": r["case_group"], | |
| "category": r.get("category", "unknown"), | |
| "latency_ms": r["latency_ms"], | |
| "tps": r["tps"], | |
| "model_used": r["model_used"], | |
| "retrieved_chunks": r["retrieved_chunks_count"], | |
| "retrieval_mode": r["retrieval_mode"], | |
| "parent_expansion": r["parent_expansion"], | |
| "bge_top_doc": r.get("bge_top_doc", ""), | |
| "fts_top_doc": r.get("fts_top_doc", ""), | |
| "tie_breaker_flag": r.get("tie_breaker_flag", False), | |
| "numerical_rigor_pass": r["numerical_rigor"]["overall_pass"], | |
| "temporal_adherence_pass": r["temporal_adherence"]["pass"], | |
| "constraint_satisfaction_pass": r["constraint_satisfaction"]["pass"], | |
| "citation_accuracy_pass": r["citation_accuracy"]["pass"], | |
| "faithfulness": ragas.get("faithfulness", ""), | |
| "context_precision": ragas.get("context_precision", ""), | |
| "context_recall": ragas.get("context_recall", ""), | |
| "answer_correctness": ragas.get("answer_correctness", ""), | |
| "answer_relevance": ragas.get("answer_relevance", ""), | |
| "estimated_cost_usd": r["estimated_cost_usd"], | |
| } | |
| # Add terminology nuance scores | |
| tn = r.get("terminology_nuance") | |
| if tn: | |
| row["terminology_accuracy"] = tn["accuracy"] | |
| for key, sub in tn["results"].items(): | |
| row[f"tn_{key}"] = sub["pass"] | |
| rows.append(row) | |
| with open(path, "w", newline="", encoding="utf-8") as f: | |
| if rows: | |
| all_keys = list(dict.fromkeys(k for row in rows for k in row.keys())) | |
| writer = csv.DictWriter(f, fieldnames=all_keys, restval="") | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| print(f"[Export] Results written to {path}") | |
| def export_summary(self): | |
| """Export summary JSON and methodology note.""" | |
| if not self.all_results: | |
| return | |
| # Overall metrics | |
| rag_scores = [r.get("ragas_scores", {}) for r in self.all_results if r.get("ragas_scores")] | |
| metrics_computed = set() | |
| for rs in rag_scores: | |
| metrics_computed.update(rs.get("_metrics_computed", [])) | |
| latencies = [r["latency_ms"] for r in self.all_results if r["latency_ms"] > 0] | |
| latencies_sorted = sorted(latencies) if latencies else [0] | |
| nr_results = [r["numerical_rigor"] for r in self.all_results] | |
| nr_applicable = [r for r in nr_results if r.get("applicable", True)] | |
| nr_pass = sum(1 for r in nr_applicable if r.get("overall_pass")) | |
| summary = { | |
| "total_cases": len(self.all_results), | |
| "total_estimated_cost_usd": round(self.cost_tracker["total_estimated_cost"], 4), | |
| "metrics_computed": list(metrics_computed), | |
| "latency_ms": { | |
| "p50": round(latencies_sorted[len(latencies_sorted) // 2], 1) if latencies_sorted else 0, | |
| "p95": round(latencies_sorted[int(len(latencies_sorted) * 0.95)], 1) if len(latencies_sorted) > 1 else 0, | |
| "p99": round(latencies_sorted[int(len(latencies_sorted) * 0.99)], 1) if len(latencies_sorted) > 1 else 0, | |
| "avg": round(sum(latencies) / len(latencies), 1) if latencies else 0, | |
| }, | |
| "avg_tps": round(sum(r["tps"] for r in self.all_results) / len(self.all_results), 1) if self.all_results else 0, | |
| "numerical_rigor_pass_rate": round( | |
| nr_pass / len(nr_applicable) * 100, 1 | |
| ) if nr_applicable else 0.0, | |
| "numerical_rigor_applicable_count": len(nr_applicable), | |
| "numerical_rigor_skipped_count": len(nr_results) - len(nr_applicable), | |
| "temporal_adherence_pass_rate": round( | |
| sum(1 for r in self.all_results if r["temporal_adherence"]["pass"]) / len(self.all_results) * 100, 1 | |
| ) if self.all_results else 0, | |
| "constraint_satisfaction_pass_rate": round( | |
| sum(1 for r in self.all_results if r["constraint_satisfaction"]["pass"]) / len(self.all_results) * 100, 1 | |
| ) if self.all_results else 0, | |
| "citation_accuracy_pass_rate": round( | |
| sum(1 for r in self.all_results if r["citation_accuracy"]["pass"]) / len(self.all_results) * 100, 1 | |
| ) if self.all_results else 0, | |
| } | |
| # Terminology nuance summary | |
| tn_results = [r.get("terminology_nuance") for r in self.all_results if r.get("terminology_nuance")] | |
| if tn_results: | |
| avg_acc = sum(tn["accuracy"] for tn in tn_results) / len(tn_results) | |
| summary["terminology_accuracy_avg"] = round(avg_acc, 3) | |
| # Answer-level RAGAS metric averages with explicit accounting | |
| ac_vals = [s.get("answer_correctness") for s in rag_scores if isinstance(s.get("answer_correctness"), (int, float))] | |
| ac_skipped = sum(1 for s in rag_scores if s.get("answer_correctness") == "skipped: no ground_truth") | |
| ar_vals = [s.get("answer_relevance") for s in rag_scores if isinstance(s.get("answer_relevance"), (int, float))] | |
| summary["answer_correctness_avg"] = round(sum(ac_vals) / len(ac_vals), 4) if ac_vals else 0 | |
| summary["answer_correctness_case_count"] = len(ac_vals) | |
| summary["answer_correctness_skipped_count"] = ac_skipped | |
| summary["answer_relevance_avg"] = round(sum(ar_vals) / len(ar_vals), 4) if ar_vals else 0 | |
| summary["answer_relevance_case_count"] = len(ar_vals) | |
| path = self.results_dir / "operational_metrics.json" | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(summary, f, indent=2) | |
| print(f"[Export] Summary written to {path}") | |
| # Methodology note | |
| note_path = self.results_dir / "methodology_note.txt" | |
| calib_cases = load_golden_retrieval_cases() | |
| qa_cases = load_golden_qa_cases() | |
| synthetic_cases = load_synthetic_qa_cases() | |
| with open(note_path, "w", encoding="utf-8") as f: | |
| f.write("PGC RAGAS Evaluation — Methodology Note\n") | |
| f.write("=" * 50 + "\n\n") | |
| f.write("Train/Test Split:\n") | |
| f.write(f" Calibration set: golden_retrieval_cases.json ({len(calib_cases)} cases)\n") | |
| f.write(" → Youden's J threshold calibration + retrieval MRR only.\n") | |
| f.write(" → NOT used for any end-to-end RAGAS metrics.\n\n") | |
| f.write( | |
| f" Test set: golden_qa_cases.json ({len(qa_cases)} cases)" | |
| f" + synthetic_qa_cases.json ({len(synthetic_cases)} cases)\n" | |
| ) | |
| f.write(" → All reported Faithfulness, Context Precision, Context Recall,\n") | |
| f.write(" Temporal Adherence, Numerical Rigor, Citation Accuracy,\n") | |
| f.write(" and Constraint Satisfaction scores.\n") | |
| f.write(" → Answer Correctness is computed only for cases with a populated\n") | |
| f.write(" ground_truth field; cases without references are excluded from\n") | |
| f.write(" that average (see answer_correctness_skipped_count in JSON).\n") | |
| f.write(" → Answer Relevance is reference-free and computed for all cases.\n\n") | |
| f.write(f" Thresholds tested: Dense 0.70, Hybrid 0.70\n") | |
| f.write(f" Numerical tolerance: ±{NUMERICAL_TOLERANCE} units\n\n") | |
| f.write(f" Total estimated cost: ${self.cost_tracker['total_estimated_cost']:.2f}\n") | |
| f.write(f" Student model: gpt-oss-120b via Cerebras\n") | |
| f.write(f" Teacher model: {OPENAI_MODEL} via OpenAI\n\n") | |
| f.write(" Calibration categories:\n") | |
| f.write(" - standard, tie_breaker, species_mismatch, phase_mismatch,\n") | |
| f.write(" unit_conversion, out_of_scope, negative_control\n") | |
| f.write(" Per-category accuracy reported in per-category table above.\n") | |
| print(f"[Export] Methodology note written to {note_path}") | |
| def print_cost_summary(self): | |
| total = self.cost_tracker["total_estimated_cost"] | |
| completed = self.cost_tracker["cases_completed"] | |
| print(f"\n{'='*50}") | |
| print(f" EVALUATION COST SUMMARY") | |
| print(f"{'='*50}") | |
| print(f" Cases completed: {completed}") | |
| print(f" Estimated cost: ${total:.4f}") | |
| remaining_budget = 5.00 - total | |
| print(f" Remaining budget: ${remaining_budget:.4f}") | |
| if remaining_budget < 0: | |
| print(f" ⚠️ BUDGET EXCEEDED! Estimated cost exceeds $5.00 grant.") | |
| print(f"{'='*50}\n") | |
| def export_results_log(self): | |
| """Export full per-case results to results_log.json for debugging.""" | |
| if not self.all_results: | |
| print("[Export] No results to export.") | |
| return | |
| path = self.results_dir / "results_log.json" | |
| with open(path, "w", encoding="utf-8") as f: | |
| json.dump(self.all_results, f, indent=2, ensure_ascii=False) | |
| print(f"[Export] Full results log written to {path}") | |
| def export_results_data(self): | |
| """Export flattened per-case metrics to results_data.csv for charting.""" | |
| if not self.all_results: | |
| print("[Export] No results to export.") | |
| return | |
| rows = [] | |
| for r in self.all_results: | |
| ragas = r.get("ragas_scores", {}) | |
| nr = r.get("numerical_rigor", {}) | |
| ta = r.get("temporal_adherence", {}) | |
| cs = r.get("constraint_satisfaction", {}) | |
| ca = r.get("citation_accuracy", {}) | |
| tn = r.get("terminology_nuance") | |
| row = { | |
| "case_id": r["case_id"], | |
| "case_group": r["case_group"], | |
| "category": r.get("category", ""), | |
| "query_type": r.get("query_type", ""), | |
| "latency_ms": r["latency_ms"], | |
| "tps": r["tps"], | |
| "model_used": r["model_used"], | |
| "retrieved_chunks": r["retrieved_chunks_count"], | |
| "retrieval_mode": r["retrieval_mode"], | |
| "parent_expansion": r["parent_expansion"], | |
| "estimated_cost_usd": r["estimated_cost_usd"], | |
| "prompt_tokens": r.get("token_usage", {}).get("prompt_tokens", 0), | |
| "completion_tokens": r.get("token_usage", {}).get("completion_tokens", 0), | |
| "bge_top_doc": r.get("bge_top_doc", ""), | |
| "fts_top_doc": r.get("fts_top_doc", ""), | |
| "tie_breaker_flag": r.get("tie_breaker_flag", False), | |
| "numerical_rigor_pass": nr.get("overall_pass", True), | |
| "numerical_rigor_score_override": nr.get("factual_score_override", 1.0), | |
| "temporal_adherence_pass": ta.get("pass", True), | |
| "temporal_adherence_phase": ta.get("resolved_phase", ""), | |
| "constraint_satisfaction_pass": cs.get("pass", True), | |
| "constraint_satisfaction_mode": cs.get("mode", ""), | |
| "citation_accuracy_pass": ca.get("pass", True), | |
| "faithfulness": ragas.get("faithfulness", ""), | |
| "context_precision": ragas.get("context_precision", ""), | |
| "context_recall": ragas.get("context_recall", ""), | |
| "answer_correctness": ragas.get("answer_correctness", ""), | |
| "answer_relevance": ragas.get("answer_relevance", ""), | |
| } | |
| if tn: | |
| row["terminology_accuracy"] = tn["accuracy"] | |
| for key, sub in tn.get("results", {}).items(): | |
| row[f"tn_{key}"] = sub["pass"] | |
| rows.append(row) | |
| path = self.results_dir / "results_data.csv" | |
| with open(path, "w", newline="", encoding="utf-8") as f: | |
| if rows: | |
| all_keys = list(dict.fromkeys(k for row in rows for k in row.keys())) | |
| writer = csv.DictWriter(f, fieldnames=all_keys, restval="") | |
| writer.writeheader() | |
| writer.writerows(rows) | |
| print(f"[Export] Chart data written to {path}") | |
| def export_thesis_tables(self, calib_result: Optional[Dict] = None, cross_result: Optional[Dict] = None, only: Optional[str] = None): | |
| r"""Export LaTeX tabular environments for direct \input{} inclusion.""" | |
| if not self.all_results: | |
| print("[Export] No results to export.") | |
| return | |
| n = len(self.all_results) | |
| nr_pass = sum(1 for r in self.all_results if r["numerical_rigor"]["overall_pass"]) | |
| ta_pass = sum(1 for r in self.all_results if r["temporal_adherence"]["pass"]) | |
| cs_pass = sum(1 for r in self.all_results if r["constraint_satisfaction"]["pass"]) | |
| ca_pass = sum(1 for r in self.all_results if r["citation_accuracy"]["pass"]) | |
| latencies = [r["latency_ms"] for r in self.all_results if r["latency_ms"] > 0] | |
| latencies_sorted = sorted(latencies) if latencies else [0] | |
| p50 = round(latencies_sorted[len(latencies_sorted) // 2], 1) if latencies_sorted else 0 | |
| p95 = round(latencies_sorted[int(len(latencies_sorted) * 0.95)], 1) if len(latencies_sorted) > 1 else 0 | |
| avg_lat = round(sum(latencies) / len(latencies), 1) if latencies else 0 | |
| avg_tps = round(sum(r["tps"] for r in self.all_results) / n, 1) if n else 0 | |
| ragas_scores = [r.get("ragas_scores", {}) for r in self.all_results] | |
| faithfulness_vals = [s.get("faithfulness", "") for s in ragas_scores if isinstance(s.get("faithfulness"), (int, float))] | |
| ctx_prec_vals = [s.get("context_precision", "") for s in ragas_scores if isinstance(s.get("context_precision"), (int, float))] | |
| ctx_recall_vals = [s.get("context_recall", "") for s in ragas_scores if isinstance(s.get("context_recall"), (int, float))] | |
| ac_vals = [s.get("answer_correctness") for s in ragas_scores if isinstance(s.get("answer_correctness"), (int, float))] | |
| ar_vals = [s.get("answer_relevance") for s in ragas_scores if isinstance(s.get("answer_relevance"), (int, float))] | |
| avg_faith = round(sum(faithfulness_vals) / len(faithfulness_vals), 4) if faithfulness_vals else 0 | |
| avg_cp = round(sum(ctx_prec_vals) / len(ctx_prec_vals), 4) if ctx_prec_vals else 0 | |
| avg_cr = round(sum(ctx_recall_vals) / len(ctx_recall_vals), 4) if ctx_recall_vals else 0 | |
| avg_ac = round(sum(ac_vals) / len(ac_vals), 4) if ac_vals else 0 | |
| avg_ar = round(sum(ar_vals) / len(ar_vals), 4) if ar_vals else 0 | |
| lines = [ | |
| "% Thesis Result Tables — auto-generated by evaluate_ragas.py", | |
| "% \\input{} each table into your LaTeX document.", | |
| "", | |
| ] | |
| # Table 1: Overall Performance Summary | |
| lines.append(r"\begin{table}[ht]") | |
| lines.append(r"\centering") | |
| if only == "rag": | |
| caption = r"\caption{RAG Evaluation Performance Summary}" | |
| elif only == "adversarial": | |
| caption = r"\caption{Adversarial Evaluation Performance Summary}" | |
| else: | |
| caption = ( | |
| r"\caption{Overall System Performance Summary. " | |
| r"Ground-truth reference used for Answer Correctness; " | |
| r"cases without populated references are excluded from that average.}" | |
| ) | |
| lines.append(caption) | |
| lines.append(r"\label{tab:overall_performance}") | |
| lines.append(r"\begin{tabular}{lcc}") | |
| lines.append(r"\toprule") | |
| lines.append(r"Metric & Value & Notes \\") | |
| lines.append(r"\midrule") | |
| lines.append(f"Total Cases & {n} & Human-adversarial + synthetic \\\\") | |
| if only == "rag": | |
| lines.append(f"Average Faithfulness & {avg_faith:.3f} & RAGAS metric \\\\") | |
| lines.append(f"Average Context Precision & {avg_cp:.3f} & RAGAS metric \\\\") | |
| lines.append(f"Average Context Recall & {avg_cr:.3f} & RAGAS metric \\\\") | |
| lines.append(f"Average Answer Correctness & {avg_ac:.3f} & RAGAS metric \\\\") | |
| lines.append(f"Average Answer Relevance & {avg_ar:.3f} & RAGAS metric \\\\") | |
| elif only == "adversarial": | |
| lines.append(f"Numerical Rigor Pass Rate & {nr_pass / n * 100:.1f}\\% & $\\pm${NUMERICAL_TOLERANCE}°C tolerance \\\\") | |
| lines.append(f"Temporal Adherence Pass Rate & {ta_pass / n * 100:.1f}\\% & Day/night phase correctness \\\\") | |
| lines.append(f"Constraint Satisfaction Pass Rate & {cs_pass / n * 100:.1f}\\% & 3-state context-aware \\\\") | |
| lines.append(f"Citation Accuracy Pass Rate & {ca_pass / n * 100:.1f}\\% & Emoji prefix audit \\\\") | |
| else: | |
| lines.append(f"Numerical Rigor Pass Rate & {nr_pass / n * 100:.1f}\\% & $\\pm${NUMERICAL_TOLERANCE}°C tolerance \\\\") | |
| lines.append(f"Temporal Adherence Pass Rate & {ta_pass / n * 100:.1f}\\% & Day/night phase correctness \\\\") | |
| lines.append(f"Constraint Satisfaction Pass Rate & {cs_pass / n * 100:.1f}\\% & 3-state context-aware \\\\") | |
| lines.append(f"Citation Accuracy Pass Rate & {ca_pass / n * 100:.1f}\\% & Emoji prefix audit \\\\") | |
| lines.append(f"Average Faithfulness & {avg_faith:.3f} & RAGAS metric \\\\") | |
| lines.append(f"Average Context Precision & {avg_cp:.3f} & RAGAS metric \\\\") | |
| lines.append(f"Average Context Recall & {avg_cr:.3f} & RAGAS metric \\\\") | |
| lines.append(f"Average Answer Correctness & {avg_ac:.3f} & RAGAS metric \\\\") | |
| lines.append(f"Average Answer Relevance & {avg_ar:.3f} & RAGAS metric \\\\") | |
| lines.append(f"Average Latency & {avg_lat:,}ms & P50: {p50:,}ms, P95: {p95:,}ms \\\\") | |
| lines.append(f"Average Throughput & {avg_tps} TPS & Tokens per second \\\\") | |
| lines.append(r"\bottomrule") | |
| lines.append(r"\end{tabular}") | |
| lines.append(r"\end{table}") | |
| lines.append("") | |
| # Table 2: System Precision by Dimension | |
| dims = {"Numerical Rigor": nr_pass, "Citation Accuracy": ca_pass, "Constraint Satisfaction": cs_pass} | |
| lines.append(r"\begin{table}[ht]") | |
| lines.append(r"\centering") | |
| lines.append(r"\caption{System Precision by Dimension}") | |
| lines.append(r"\label{tab:system_precision}") | |
| lines.append(r"\begin{tabular}{lcc}") | |
| lines.append(r"\toprule") | |
| lines.append(r"Dimension & Pass Rate & Description \\") | |
| lines.append(r"\midrule") | |
| for label, passed in dims.items(): | |
| pct = passed / n * 100 | |
| lines.append(f"{label} & {pct:.1f}\\% & See Section~\\\\") | |
| overall_sp = sum(passed / n for passed in dims.values()) / len(dims) * 100 | |
| lines.append(r"\midrule") | |
| lines.append(f"System Precision & {overall_sp:.1f}\\% & Average of three dimensions \\\\") | |
| lines.append(r"\bottomrule") | |
| lines.append(r"\end{tabular}") | |
| lines.append(r"\end{table}") | |
| lines.append("") | |
| if only != "adversarial": | |
| # Table 3: Youden's J Calibration | |
| lines.append(r"\begin{table}[ht]") | |
| lines.append(r"\centering") | |
| lines.append(r"\caption{Youden's J Threshold Calibration}") | |
| lines.append(r"\label{tab:youden_calibration}") | |
| lines.append(r"\begin{tabular}{lccc}") | |
| lines.append(r"\toprule") | |
| lines.append(r"Mode & Optimal Threshold & Youden's J & Deployed \\") | |
| lines.append(r"\midrule") | |
| if calib_result: | |
| jd = calib_result.get("youden_dense", {}) | |
| jg = calib_result.get("youden_graded_dense", {}) | |
| jh = calib_result.get("youden_hybrid", {}) | |
| lines.append(f"Binary Dense & {jd.get('optimal_threshold', 0):.2f} & ${jd.get('j', 0):+.3f}$ & 0.70 \\\\") | |
| lines.append(f"Graded Dense & {jg.get('optimal_threshold', 0):.2f} & ${jg.get('j', 0):+.3f}$ & --- \\\\") | |
| lines.append(f"Hybrid (Dense+FTS) & {jh.get('optimal_threshold', 0):.2f} & ${jh.get('j', 0):+.3f}$ & 0.70 \\\\") | |
| else: | |
| lines.append(r"% WARNING: No calibration data -- values are placeholders") | |
| lines.append(r"Binary Dense & --- & --- & 0.70 \\\\") | |
| lines.append(r"Graded Dense & --- & --- & --- \\\\") | |
| lines.append(r"Hybrid (Dense+FTS) & --- & --- & 0.70 \\\\") | |
| lines.append(r"\bottomrule") | |
| lines.append(r"\end{tabular}") | |
| lines.append(r"\end{table}") | |
| lines.append("") | |
| if only != "adversarial": | |
| # Table 4: Cross-Lingual MRR | |
| lines.append(r"\begin{table}[ht]") | |
| lines.append(r"\centering") | |
| lines.append(r"\caption{Cross-Lingual MRR Results}") | |
| lines.append(r"\label{tab:cross_lingual_mrr}") | |
| lines.append(r"\begin{tabular}{lcc}") | |
| lines.append(r"\toprule") | |
| lines.append(r"Language Group & MRR & Cases \\") | |
| lines.append(r"\midrule") | |
| if cross_result: | |
| lines.append(f"Indonesian $\\rightarrow$ English & {cross_result.get('id_mrr', 0):.3f} & {cross_result.get('id_cases', 0)} \\\\") | |
| lines.append(f"English $\\rightarrow$ English & {cross_result.get('en_mrr', 0):.3f} & {cross_result.get('en_cases', 0)} \\\\") | |
| lines.append(f"$\\Delta$MRR & {cross_result.get('delta_mrr', 0):.3f} & --- \\\\") | |
| else: | |
| lines.append(r"% WARNING: No cross-lingual data -- values are placeholders") | |
| lines.append(r"Indonesian $\rightarrow$ English & --- & --- \\\\") | |
| lines.append(r"English $\rightarrow$ English & --- & --- \\\\") | |
| lines.append(r"$\Delta$MRR & --- & --- \\\\") | |
| lines.append(r"\bottomrule") | |
| lines.append(r"\end{tabular}") | |
| lines.append(r"\end{table}") | |
| lines.append("") | |
| # Table 5: Retrieval Specificity | |
| total_cost = round(self.cost_tracker["total_estimated_cost"], 4) | |
| lines.append(r"\begin{table}[ht]") | |
| lines.append(r"\centering") | |
| lines.append(r"\caption{Evaluation Cost Breakdown}") | |
| lines.append(r"\label{tab:evaluation_cost}") | |
| lines.append(r"\begin{tabular}{lc}") | |
| lines.append(r"\toprule") | |
| lines.append(r"Item & Value \\") | |
| lines.append(r"\midrule") | |
| lines.append(f"Total Cases & {n} \\\\") | |
| lines.append(f"Total Estimated Cost & \\${total_cost:.4f} \\\\") | |
| lines.append(f"Teacher Model & {OPENAI_MODEL} \\\\") | |
| lines.append(r"Student Model & \texttt{gpt-oss-120b} (Cerebras) \\\\") | |
| lines.append(r"\bottomrule") | |
| lines.append(r"\end{tabular}") | |
| lines.append(r"\end{table}") | |
| lines.append("") | |
| body = "\n".join(lines) | |
| path = self.results_dir / "thesis_tables.tex" | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(body) | |
| print(f"[Export] Thesis LaTeX tables written to {path}") | |
| # ============================================================================= | |
| # YOUDEN'S J RUNNER (Calibration) | |
| # ============================================================================= | |
| async def run_youden_calibration() -> Dict: | |
| """Run Youden's J calibration on the held-out calibration set (30 cases).""" | |
| from app.vector_store import search_knowledge, search_knowledge_fts | |
| cases = load_golden_retrieval_cases() | |
| total_cases = len(cases) | |
| rag_cases = [ | |
| c for c in cases | |
| if c.get("expected_mode") == "vector_rag" and c.get("expected_found") and c.get("expected_source") | |
| ] | |
| print(f"\n[Calibration] Running Youden's J on golden_retrieval_cases.json ({total_cases} total, {len(rag_cases)} vector_rag)...\n") | |
| calibrator = YoudenJCalibrator() | |
| source_plant_map = _build_source_plant_map(cases) | |
| grade_debug = {"1.0_Exact": 0, "0.5_Family": 0, "0.25_Topic": 0, "0.0_None": 0} | |
| per_case_grade = defaultdict(lambda: {"1.0_Exact": 0, "0.5_Family": 0, "0.25_Topic": 0, "0.0_None": 0}) | |
| for case in rag_cases: | |
| query = str(case["query"]) | |
| source = str(case["expected_source"]) | |
| keywords: list = case.get("expected_content_keywords") or [] | |
| case_id = str(case["case_id"]) | |
| category = case.get("category", "unknown") | |
| expected_plant = case.get("expected_plant", "") | |
| dense_chunks = await search_knowledge( | |
| query=query, match_threshold=CALIB_THRESHOLD, | |
| match_count=CALIB_COUNT, query_label=f"calib:{case_id}", | |
| ) | |
| fts_chunks = await search_knowledge_fts(query=query, match_count=CALIB_COUNT) | |
| fts_keys = {(c.get("filename"), c.get("page_number")) for c in fts_chunks} | |
| for chunk in dense_chunks: | |
| sim = chunk.get("similarity", 0.0) | |
| key = (chunk.get("filename"), chunk.get("page_number")) | |
| is_cm = key in fts_keys | |
| chunk_source = chunk.get("source", "").strip() | |
| content = (chunk.get("content") or "").lower() | |
| source_match = chunk_source == source.strip() | |
| is_tp = source_match and any(kw.lower() in content for kw in keywords) | |
| calibrator.add_record(sim, is_tp, is_cm, category, case_id) | |
| relevance_grade = compute_relevance_grade( | |
| chunk_source=chunk_source, | |
| content=content, | |
| expected_source=source, | |
| expected_plant=expected_plant, | |
| expected_keywords=keywords, | |
| source_plant_map=source_plant_map, | |
| ) | |
| calibrator.add_record_graded(sim, relevance_grade, is_cm, category, case_id) | |
| # Track grade distribution | |
| if relevance_grade >= 1.0: | |
| grade_debug["1.0_Exact"] += 1 | |
| per_case_grade[case_id]["1.0_Exact"] += 1 | |
| elif relevance_grade >= 0.5: | |
| grade_debug["0.5_Family"] += 1 | |
| per_case_grade[case_id]["0.5_Family"] += 1 | |
| elif relevance_grade >= 0.25: | |
| grade_debug["0.25_Topic"] += 1 | |
| per_case_grade[case_id]["0.25_Topic"] += 1 | |
| else: | |
| grade_debug["0.0_None"] += 1 | |
| per_case_grade[case_id]["0.0_None"] += 1 | |
| # Binary Youden's J | |
| dense_t, dense_j = calibrator.compute(cross_modal_only=False) | |
| hybrid_t, hybrid_j = calibrator.compute(cross_modal_only=True) | |
| # Graded Youden's J | |
| graded_dense_t, graded_dense_j = calibrator.compute_graded(cross_modal_only=False) | |
| histogram = calibrator.build_histogram() | |
| print(histogram) | |
| # Grade distribution debug | |
| total_graded = sum(grade_debug.values()) | |
| print("\n--- Relevance Grade Distribution (all chunks) ---") | |
| print(f" {'Grade':<20} {'Count':>6} {'Pct':>8}") | |
| print(f" {'-'*36}") | |
| for label in ["1.0_Exact", "0.5_Family", "0.25_Topic", "0.0_None"]: | |
| cnt = grade_debug[label] | |
| pct = cnt / total_graded * 100 if total_graded > 0 else 0 | |
| print(f" {label:<20} {cnt:>6} ({pct:>5.1f}%)") | |
| print(f" {'TOTAL':<20} {total_graded:>6}") | |
| print() | |
| # Per-case grade debug | |
| print("--- Per-Case Grade Breakdown ---") | |
| print(f" {'Case ID':<35} {'Exact':>6} {'Family':>6} {'Topic':>6} {'None':>6}") | |
| print(f" {'-'*60}") | |
| for case_id in sorted(per_case_grade.keys()): | |
| g = per_case_grade[case_id] | |
| print(f" {case_id:<35} {g['1.0_Exact']:>6} {g['0.5_Family']:>6} {g['0.25_Topic']:>6} {g['0.0_None']:>6}") | |
| print() | |
| print(f"\n[Calibration] Youden's J Results:") | |
| print(f" Binary Dense threshold: t={dense_t:.2f}, J={dense_j:+.3f}") | |
| print(f" Graded Dense threshold: t={graded_dense_t:.2f}, J={graded_dense_j:+.3f}") | |
| print(f" Hybrid threshold: t={hybrid_t:.2f}, J={hybrid_j:+.3f}") | |
| print(f" Deployed: Dense=0.70, Hybrid=0.70") | |
| # Print per-category accuracy at deployed thresholds | |
| print(calibrator.report_per_category(0.70)) | |
| print() | |
| print(f" Tie-Breaker Summary: {sum(1 for r in calibrator.records if r[3]=='tie_breaker')} records") | |
| print(f" Species Mismatch: {sum(1 for r in calibrator.records if r[3]=='species_mismatch')} records") | |
| print(f" Phase Mismatch: {sum(1 for r in calibrator.records if r[3]=='phase_mismatch')} records") | |
| print(f" Unit Conversion: {sum(1 for r in calibrator.records if r[3]=='unit_conversion')} records") | |
| print(f" Negative Control: {sum(1 for r in calibrator.records if r[3]=='negative_control')} records") | |
| # Phase 4: Retrieval Specificity Breakdown | |
| print("\n--- Retrieval Specificity (Top-1 RRF per case) ---") | |
| spec_counts = {"Exact Match": 0, "Family Match": 0, "Topic Match": 0, "Irrelevant": 0} | |
| for case in rag_cases: | |
| query = str(case["query"]) | |
| source = str(case["expected_source"]) | |
| keywords: list = case.get("expected_content_keywords") or [] | |
| case_id = str(case["case_id"]) | |
| expected_plant = case.get("expected_plant", "") | |
| dense_chunks = await search_knowledge( | |
| query=query, match_threshold=CALIB_THRESHOLD, | |
| match_count=1, query_label=f"spec:{case_id}", | |
| ) | |
| if dense_chunks: | |
| top_chunk = dense_chunks[0] | |
| classification = classify_top1_retrieval(top_chunk, case, source_plant_map) | |
| spec_counts[classification] += 1 | |
| total_spec = sum(spec_counts.values()) | |
| print(f" {'Classification':<20} {'Count':>6} {'Pct':>8}") | |
| print(f" {'-'*36}") | |
| for cls, count in spec_counts.items(): | |
| pct = count / total_spec * 100 if total_spec > 0 else 0 | |
| print(f" {cls:<20} {count:>4}/{total_spec} ({pct:>5.0f}%)") | |
| print() | |
| # Phase 3: System Precision | |
| print("[Calibration] Running System Precision evaluation on all cases...") | |
| system_evaluator = SystemPrecisionEvaluator() | |
| for i, case in enumerate(rag_cases, 1): | |
| print(f" [{i}/{len(rag_cases)}] {case.get('case_id', '')}...") | |
| try: | |
| await system_evaluator.evaluate_case(case) | |
| except Exception as e: | |
| print(f" ERROR: {e}") | |
| system_evaluator.print_report() | |
| system_evaluator.export(RESULTS_DIR / "system_precision.json") | |
| result = { | |
| "calibration_cases": len(rag_cases), | |
| "total_data_points": len(calibrator.records), | |
| "youden_dense": {"optimal_threshold": round(dense_t, 2), "j": round(dense_j, 3)}, | |
| "youden_graded_dense": {"optimal_threshold": round(graded_dense_t, 2), "j": round(graded_dense_j, 3)}, | |
| "youden_hybrid": {"optimal_threshold": round(hybrid_t, 2), "j": round(hybrid_j, 3)}, | |
| "deployed_thresholds": {"dense": 0.70, "hybrid": 0.70}, | |
| "retrieval_specificity": spec_counts, | |
| "system_precision": system_evaluator.compute_precision(), | |
| } | |
| # Save histogram | |
| with open(RESULTS_DIR / "similarity_histogram.txt", "w", encoding="utf-8") as f: | |
| f.write(histogram) | |
| # Save calibration JSON | |
| with open(RESULTS_DIR / "threshold_calibration.json", "w", encoding="utf-8") as f: | |
| json.dump(result, f, indent=2) | |
| print(f"\n[Calibration] Results saved to {RESULTS_DIR}") | |
| return result | |
| # ============================================================================= | |
| # CROSS-LINGUAL MRR EXPERIMENT (Phase 2) | |
| # ============================================================================= | |
| async def run_cross_lingual_experiment() -> Dict: | |
| """Run cross-lingual MRR experiment: ID→EN vs EN→EN retrieval.""" | |
| from app.vector_store import search_knowledge, search_knowledge_fts | |
| cases = load_cross_lingual_cases() | |
| if not cases: | |
| print("[Cross-Lingual] No cross-lingual cases found. Skipping.") | |
| return {"error": "no cases"} | |
| print(f"\n[Cross-Lingual] Running ΔMRR experiment on {len(cases)} cases...") | |
| id_cases = [c for c in cases if c.get("query_lang") == "id"] | |
| en_cases = [c for c in cases if c.get("query_lang") == "en"] | |
| async def compute_mrr(case_list: list) -> float: | |
| if not case_list: | |
| return 0.0 | |
| reciprocal_ranks = [] | |
| for case in case_list: | |
| query = str(case["query"]) | |
| source = str(case["expected_source"]) | |
| keywords: list = case.get("expected_content_keywords") or [] | |
| dense_chunks = await search_knowledge( | |
| query=query, match_threshold=CALIB_THRESHOLD, | |
| match_count=20, query_label=f"cross:{case.get('case_id','')}", | |
| ) | |
| rank = None | |
| for idx, chunk in enumerate(dense_chunks): | |
| chunk_source = chunk.get("source", "").strip() | |
| content = (chunk.get("content") or "").lower() | |
| source_match = chunk_source == source.strip() | |
| keyword_match = any(kw.lower() in content for kw in keywords) if keywords else False | |
| if source_match and keyword_match: | |
| rank = idx + 1 | |
| break | |
| if rank is not None: | |
| reciprocal_ranks.append(1.0 / rank) | |
| else: | |
| reciprocal_ranks.append(0.0) | |
| return sum(reciprocal_ranks) / len(reciprocal_ranks) if reciprocal_ranks else 0.0 | |
| id_mrr = await compute_mrr(id_cases) | |
| en_mrr = await compute_mrr(en_cases) | |
| delta_mrr = abs(id_mrr - en_mrr) | |
| print() | |
| print("-" * 50) | |
| print(" CROSS-LINGUAL MRR RESULTS") | |
| print("-" * 50) | |
| print(f" Indonesian → English MRR: {id_mrr:.3f} ({len(id_cases)} cases)") | |
| print(f" English → English MRR: {en_mrr:.3f} ({len(en_cases)} cases)") | |
| print(f" ΔMRR: {delta_mrr:.3f}") | |
| if delta_mrr < 0.15: | |
| print(f" ✅ ΔMRR < 0.15 → Cross-lingual robustness demonstrated") | |
| else: | |
| print(f" ⚠️ ΔMRR >= 0.15 → Cross-lingual gap may need mitigation") | |
| print("-" * 50) | |
| print() | |
| result = { | |
| "total_cases": len(cases), | |
| "id_cases": len(id_cases), | |
| "en_cases": len(en_cases), | |
| "id_mrr": round(id_mrr, 4), | |
| "en_mrr": round(en_mrr, 4), | |
| "delta_mrr": round(delta_mrr, 4), | |
| } | |
| with open(RESULTS_DIR / "cross_lingual_results.json", "w", encoding="utf-8") as f: | |
| json.dump(result, f, indent=2) | |
| print(f"[Cross-Lingual] Results saved to {RESULTS_DIR / 'cross_lingual_results.json'}") | |
| return result | |
| # ============================================================================= | |
| # DRY RUN | |
| # ============================================================================= | |
| async def run_dry_run(engine: EvaluationEngine): | |
| """Run full metric suite on 3 representative cases to verify pipeline.""" | |
| print("\n" + "=" * 60) | |
| print(" DRY RUN MODE — 3 Cases, Full Pipeline") | |
| print("=" * 60) | |
| dry_cases = [ | |
| { | |
| "case_id": "dry_quantitative", | |
| "case_group": "quantitative", | |
| "query": "Berapa suhu optimal selada fase vegetatif?", | |
| "ground_truth": "Suhu optimal untuk selada fase vegetatif adalah 20°C dengan rentang 18-24°C.", | |
| "expected_plant": "lettuce", | |
| "expected_stage": "vegetative", | |
| "response_language": "id", | |
| "expected_emojis": ["📚"], | |
| }, | |
| { | |
| "case_id": "dry_phase_aware", | |
| "case_group": "phase_aware", | |
| "query": "Apa kondisi ideal chamber sekarang?", | |
| "ground_truth": "Chamber sedang dalam siklus malam dengan parameter suhu 18-22°C...", | |
| "expected_plant": None, | |
| "expected_stage": None, | |
| "response_language": "id", | |
| "temporal_context": {"local_hour": 23, "startNight": 22, "startDay": 6}, | |
| "expected_emojis": ["⚠️"], | |
| }, | |
| { | |
| "case_id": "dry_nuance_kecambah", | |
| "case_group": "linguistic", | |
| "query": "Apa perbedaan perawatan kecambah dan tunas pada fase awal?", | |
| "ground_truth": "Kecambah merujuk pada mung bean sprouts yang memerlukan...", | |
| "expected_plant": "mung_bean_sprouts", | |
| "expected_stage": "germination", | |
| "response_language": "id", | |
| "expected_emojis": ["📚", "📖"], | |
| "risk_flag": "high_risk", | |
| }, | |
| ] | |
| for i, case in enumerate(dry_cases, 1): | |
| print(f"\n{'-'*50}") | |
| print(f" DRY RUN CASE {i}: {case['case_id']}") | |
| print(f" Query: {case['query']}") | |
| print(f"{'-'*50}") | |
| result = await engine.evaluate_single_case(case) | |
| print(f"\n Answer: {result['answer'][:200]}...") | |
| print(f"\n Metadata:") | |
| print(f" Model: {result['model_used']}") | |
| print(f" Latency: {result['latency_ms']}ms") | |
| print(f" Tokens: {result['token_usage']}") | |
| print(f" Chunks: {result['retrieved_chunks_count']}") | |
| print(f" Parent Expansion: {result['parent_expansion']}") | |
| print(f"\n Audits:") | |
| print(f" Numerical Rigor: {'[OK] PASS' if result['numerical_rigor']['overall_pass'] else '[FAIL] FAIL'}") | |
| print(f" Temporal Adherence: {'[OK] PASS' if result['temporal_adherence']['pass'] else '[FAIL] FAIL'}") | |
| print(f" Constraint Satisfaction: {'[OK] PASS' if result['constraint_satisfaction']['pass'] else '[FAIL] FAIL'}") | |
| print(f" Citation Accuracy: {'[OK] PASS' if result['citation_accuracy']['pass'] else '[FAIL] FAIL'}") | |
| ragas = result.get("ragas_scores", {}) | |
| if "error" not in ragas: | |
| for metric in ragas.get("_metrics_computed", []): | |
| print(f" RAGAS {metric}: {ragas.get(metric, 'N/A')}") | |
| else: | |
| print(f" RAGAS: {ragas.get('error', 'N/A')}") | |
| tn = result.get("terminology_nuance") | |
| if tn: | |
| print(f" Terminology Accuracy: {tn['accuracy']:.0%} ({tn['passed']}/{tn['total']})") | |
| print(f" Est. Cost: ${result['estimated_cost_usd']:.6f}") | |
| print(f"\n{'='*60}") | |
| print(" DRY RUN COMPLETE") | |
| engine.print_cost_summary() | |
| # Check for anomalous critic reasoning | |
| log_path = RESULTS_DIR / "critic_reasoning_log.jsonl" | |
| if log_path.exists(): | |
| with open(log_path, encoding="utf-8") as f: | |
| entries = [json.loads(line) for line in f if line.strip()] | |
| if len(entries) > 0: | |
| print(f" Critic calls logged: {len(entries)}") | |
| print(f" Check {log_path} for detailed reasoning.") | |
| engine.export_results_log() | |
| engine.export_results_data() | |
| engine.export_thesis_tables() | |
| print("\n WARNING: Review critic reasoning for Indonesian terminology interpretation.") | |
| print(" If reasoning sounds inconsistent, consider upgrading to gpt-5.5 for final run.\n") | |
| # ============================================================================= | |
| # FULL EVALUATION RUN | |
| # ============================================================================= | |
| async def run_full_evaluation(engine: EvaluationEngine, only: Optional[str] = None): | |
| """Run full evaluation on all available test cases.""" | |
| print(f"\n{'='*60}") | |
| print(f" FULL EVALUATION RUN") | |
| print(f"{'='*60}") | |
| all_cases = [] | |
| human_cases = load_golden_qa_cases() | |
| synthetic_cases = load_synthetic_qa_cases() | |
| if only in (None, "human"): | |
| all_cases.extend(human_cases) | |
| elif only == "rag": | |
| all_cases.extend([c for c in human_cases if c.get("case_group") == "rag_qualitative"]) | |
| elif only == "adversarial": | |
| all_cases.extend([c for c in human_cases if c.get("case_group") != "rag_qualitative"]) | |
| if only in (None, "synthetic"): | |
| all_cases.extend(synthetic_cases) | |
| if not all_cases: | |
| print("[Evaluation] No test cases found. Use --regenerate-synthetic to generate synthetic cases.") | |
| return | |
| print(f"\n Loaded {len(human_cases)} human-adversarial + {len(synthetic_cases)} synthetic cases.") | |
| print(f" Total: {len(all_cases)} cases.") | |
| # Validate ground truths | |
| warnings_gt = validate_ground_truths(human_cases) | |
| for w in warnings_gt: | |
| print(f" {w}") | |
| # Run calibration first | |
| calib_result = await run_youden_calibration() if only != "adversarial" else None | |
| cross_result = await run_cross_lingual_experiment() if only != "adversarial" else None | |
| print(f"\n{'-'*50}") | |
| print(f" Starting evaluation of {len(all_cases)} cases...") | |
| print(f"{'-'*50}") | |
| for i, case in enumerate(all_cases, 1): | |
| print(f"\n [{i}/{len(all_cases)}] {case.get('case_id', case['query'][:40])}...") | |
| await engine.evaluate_single_case(case) | |
| if i % 10 == 0: | |
| engine.print_cost_summary() | |
| engine.export_csv() | |
| engine.export_summary() | |
| engine.export_results_log() | |
| engine.export_results_data() | |
| engine.export_thesis_tables(calib_result, cross_result, only=only) | |
| engine.print_cost_summary() | |
| print(f"\n{'='*60}") | |
| print(" EVALUATION COMPLETE") | |
| print(f" Results: {RESULTS_DIR}") | |
| print(f"{'='*60}\n") | |
| # ============================================================================= | |
| # CLI ENTRY POINT | |
| # ============================================================================= | |
| async def run_calibration_only(): | |
| """Run Youden calibration + cross-lingual experiment.""" | |
| calib_result = await run_youden_calibration() | |
| cross_result = await run_cross_lingual_experiment() | |
| print() | |
| print("=" * 50) | |
| print(" CALIBRATION SUMMARY") | |
| print("=" * 50) | |
| jd = calib_result.get("youden_dense", {}) | |
| jg = calib_result.get("youden_graded_dense", {}) | |
| sp = calib_result.get("system_precision", {}) | |
| cr = cross_result | |
| print(f" Retriever Youden's J (binary): {jd.get('j', 0):+.3f}") | |
| print(f" Retriever Youden's J (graded): {jg.get('j', 0):+.3f}") | |
| print(f" System Precision: {sp.get('system_precision', 0):.2%}") | |
| print(f" Cross-Lingual ΔMRR: {cr.get('delta_mrr', 0):.3f}") | |
| print("=" * 50) | |
| print() | |
| return calib_result | |
| def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser( | |
| description="PGC RAGAS Evaluation Framework (Cerebras Edition)", | |
| ) | |
| parser.add_argument("--mode", choices=["dry_run", "full", "calibrate"], default="dry_run", | |
| help="dry_run (3 cases), full (all 100+ cases), calibrate (J + cross-lingual only)") | |
| parser.add_argument("--only", choices=["human", "synthetic", "rag", "adversarial"], default=None, | |
| help="Restrict evaluation arm: 'human' (all 65 golden cases), 'synthetic', " | |
| "'rag' (20 rag_qualitative cases only), 'adversarial' (original 45 non-RAG cases)") | |
| parser.add_argument("--output-dir", default=None, | |
| help="Override output directory (default: results/<only>/ or results/)") | |
| parser.add_argument("--regenerate-synthetic", action="store_true", | |
| help="Force regenerating the synthetic dataset") | |
| parser.add_argument("--run-cross-lingual", action="store_true", | |
| help="Also run cross-lingual MRR experiment") | |
| args = parser.parse_args() | |
| if not OPENAI_API_KEY: | |
| print("ERROR: OPENAI_API_KEY environment variable not set.") | |
| print("Add it to AI Chatbot/.env or set it as an environment variable.") | |
| print("This is required for the gpt-4o-mini critic (Teacher model).") | |
| sys.exit(1) | |
| if not CEREBRAS_API_KEY: | |
| print("WARNING: CEREBRAS_API_KEY not set. Generation calls will fail.") | |
| print("Add it to AI Chatbot/.env") | |
| if args.regenerate_synthetic: | |
| if not HAS_RAGAS: | |
| print("ERROR: Cannot regenerate synthetic dataset. RAGAS not installed.") | |
| sys.exit(1) | |
| asyncio.run(generate_synthetic_dataset()) | |
| # Determine output directory: --output-dir overrides, then auto-subdirectory for named arms | |
| if args.output_dir: | |
| results_dir = Path(args.output_dir) | |
| elif args.only in ("rag", "adversarial", "synthetic"): | |
| results_dir = RESULTS_DIR / args.only | |
| else: | |
| results_dir = RESULTS_DIR # default: results/ (unchanged for --only human or unset) | |
| engine = EvaluationEngine(results_dir=results_dir) | |
| if args.mode == "dry_run": | |
| asyncio.run(run_dry_run(engine)) | |
| elif args.mode == "calibrate": | |
| asyncio.run(run_calibration_only()) | |
| else: | |
| asyncio.run(run_full_evaluation(engine, only=args.only)) | |
| if __name__ == "__main__": | |
| main() | |