"""Chat benchmark runner for JSON eval datasets.""" from __future__ import annotations import asyncio import json from collections.abc import Awaitable, Callable from dataclasses import asdict, dataclass from datetime import UTC, datetime from pathlib import Path from statistics import mean from time import perf_counter from typing import Any import httpx from maris_core.code.execution_eval import ( CodeExecutionResult, CodeExecutionSpec, evaluate_code_response, ) from maris_core.text.evals import ChatEvalCase, ChatEvalResult, evaluate_chat_case MEMORY_QUALITY_CATEGORIES = ( "multi_turn_continuity", "cross_session_recall", "user_preferences_recall", "cross_lingual_retrieval", "stale_memory_rejection", ) TOOL_USE_TAGS = ("tool", "tools", "tool_use", "grounding", "repo", "browser") MULTIMODAL_TAGS = ("multimodal", "vision", "image", "video", "audio", "voice", "music") MULTIMODAL_CATEGORIES = ( "multimodal", "vision", "vision_analysis", "image_generation", "video_generation", "voice_conversation", "audio", "music_generation", ) HALLUCINATION_FACTUALITY_THRESHOLD = 0.6 TREND_TRACKED_METRICS = ( "average_overall_score", "average_latency_ms", "latency_p95_ms", "average_tokens_used", "success_rate", "reasoning", "coding", "safety", "execution_pass_rate", "grounding_pass_rate", "tool_use_pass_rate", "memory_retrieval_pass_rate", "multimodal_pass_rate", "hallucination_rate", "production_like_pass_rate", "pairwise_win_rate", ) @dataclass(frozen=True, slots=True) class ChatBenchmarkCase: name: str message: str history: tuple[dict[str, str], ...] = () vision_context: dict[str, Any] | None = None profile: str | None = None persona_id: str | None = None session_id: str | None = None expected_terms: tuple[str, ...] = () forbidden_terms: tuple[str, ...] = () tags: tuple[str, ...] = () reference_answer: str | None = None reference_facts: tuple[str, ...] = () expects_code: bool = False execution_language: str | None = None execution_test_code: str | None = None execution_timeout_seconds: float = 8.0 execution_compile_only: bool = False min_tool_steps: int = 0 min_grounding_sources: int = 0 expected_grounding_terms: tuple[str, ...] = () branches: tuple[str, ...] = () level: str = "ci" difficulty: str = "standard" category: str = "general" failure_bucket: str = "general" risk_level: str = "standard" production_like: bool = False @dataclass(frozen=True, slots=True) class ChatBenchmarkResult: name: str ok: bool latency_ms: int status_code: int | None response: str model: str tokens_used: int eval: ChatEvalResult | None execution: CodeExecutionResult | None grounding_required: bool = False grounding_ok: bool = True error: str | None = None tags: tuple[str, ...] = () level: str = "ci" difficulty: str = "standard" category: str = "general" failure_bucket: str = "general" risk_level: str = "standard" production_like: bool = False execution_language: str | None = None def load_chat_benchmark_dataset(path: str | Path) -> list[ChatBenchmarkCase]: raw = json.loads(Path(path).read_text(encoding="utf-8")) entries = raw.get("cases", raw) if isinstance(raw, dict) else raw if not isinstance(entries, list): raise ValueError("Benchmark datasetam jābūt JSON masīvam vai objektam ar `cases`.") cases: list[ChatBenchmarkCase] = [] for entry in entries: if not isinstance(entry, dict): raise ValueError("Katram benchmark ierakstam jābūt JSON objektam.") name = str(entry.get("name", "")).strip() message = str(entry.get("message", "")).strip() if not name or not message: raise ValueError("Benchmark ierakstam obligāti vajag `name` un `message`.") history = tuple( item for item in entry.get("history", []) if isinstance(item, dict) and item.get("role") and item.get("content") ) cases.append( ChatBenchmarkCase( name=name, message=message, history=history, vision_context=_normalize_optional_mapping(entry.get("vision_context")), profile=_normalize_optional_text(entry.get("profile")), persona_id=_normalize_optional_text(entry.get("persona_id")), session_id=_normalize_optional_text(entry.get("session_id")), expected_terms=_normalize_text_list(entry.get("expected_terms")), forbidden_terms=_normalize_text_list(entry.get("forbidden_terms")), tags=_normalize_text_list(entry.get("tags")), reference_answer=_normalize_optional_text(entry.get("reference_answer")), reference_facts=_normalize_text_list(entry.get("reference_facts")), expects_code=bool(entry.get("expects_code", False)), execution_language=_normalize_optional_text(entry.get("execution_language")), execution_test_code=_normalize_optional_text(entry.get("execution_test_code")), execution_timeout_seconds=float(entry.get("execution_timeout_seconds", 8.0) or 8.0), execution_compile_only=bool(entry.get("execution_compile_only", False)), min_tool_steps=max(0, int(entry.get("min_tool_steps", 0) or 0)), min_grounding_sources=max(0, int(entry.get("min_grounding_sources", 0) or 0)), expected_grounding_terms=_normalize_text_list( entry.get("expected_grounding_terms") ), branches=_normalize_text_list(entry.get("branches")), level=_normalize_optional_text(entry.get("level")) or "ci", difficulty=_normalize_optional_text(entry.get("difficulty")) or "standard", category=_normalize_optional_text(entry.get("category")) or "general", failure_bucket=_normalize_optional_text(entry.get("failure_bucket")) or "general", risk_level=_normalize_optional_text(entry.get("risk_level")) or "standard", production_like=bool(entry.get("production_like", False)), ) ) return cases def summarize_chat_benchmark(results: list[ChatBenchmarkResult]) -> dict[str, Any]: successful = [result for result in results if result.ok] failed = [result for result in results if not result.ok] evals = [result.eval for result in successful if result.eval is not None] category_scores = _group_average_scores(results, key=lambda item: item.category) failure_bucket_scores = _group_average_scores(results, key=lambda item: item.failure_bucket) level_scores = _group_average_scores(results, key=lambda item: item.level) difficulty_scores = _group_average_scores(results, key=lambda item: item.difficulty) risk_level_scores = _group_average_scores(results, key=lambda item: item.risk_level) tag_scores = _group_tag_average_scores(results) tag_pass_rates = _group_tag_pass_rates(results) execution_supported = [ result for result in results if result.execution is not None and result.execution.available ] execution_skipped = [ result for result in results if result.execution is not None and not result.execution.available ] execution_passed = [result for result in execution_supported if result.execution.passed] grounding_cases = [result for result in results if result.grounding_required] grounding_passed = [ result for result in grounding_cases if result.grounding_ok is True and result.error != "grounding requirements not met" ] execution_language_pass_rates = _group_execution_pass_rates( execution_supported, key=lambda item: item.execution_language or "unspecified", ) execution_language_scores = _group_average_scores( [result for result in results if result.execution_language], key=lambda item: item.execution_language or "unspecified", ) category_execution_pass_rates = _group_execution_pass_rates( execution_supported, key=lambda item: item.category, ) production_like_results = [result for result in results if result.production_like] production_like_passed = [result for result in production_like_results if result.ok] memory_results = [ result for result in results if result.category in MEMORY_QUALITY_CATEGORIES or "memory" in result.tags ] memory_passed = [result for result in memory_results if result.ok] memory_quality_scores = { key: value for key, value in category_scores.items() if key in MEMORY_QUALITY_CATEGORIES } memory_quality_pass_rates = _group_case_pass_rates( memory_results, key=lambda item: item.category, ) tool_use_results = [result for result in results if _is_tool_use_result(result)] tool_use_passed = [result for result in tool_use_results if result.ok and result.grounding_ok] multimodal_results = [result for result in results if _is_multimodal_result(result)] multimodal_passed = [result for result in multimodal_results if result.ok] hallucination_population = [result for result in results if result.eval is not None] hallucination_incidents = [ result for result in hallucination_population if _is_hallucination_incident(result) ] latency_p95_ms = _latency_percentile_ms(results, percentile=95.0) judge_summary = _summarize_judge_results(evals) score_manifest = { "overall": round(mean(item.overall for item in evals), 3) if evals else 0.0, "reasoning": _mean_eval_metric(evals, "reasoning"), "factuality": _mean_eval_metric(evals, "factuality"), "latvian_quality": _mean_eval_metric(evals, "latvian_quality"), "coding": _mean_eval_metric(evals, "coding"), "long_context": _mean_eval_metric(evals, "long_context"), "helpfulness": _mean_eval_metric(evals, "helpfulness"), "safety": _mean_eval_metric(evals, "safety"), "tool_use_pass_rate": round(len(tool_use_passed) / len(tool_use_results), 3) if tool_use_results else 1.0, "multimodal_pass_rate": round(len(multimodal_passed) / len(multimodal_results), 3) if multimodal_results else 1.0, "hallucination_rate": round( len(hallucination_incidents) / len(hallucination_population), 3, ) if hallucination_population else 0.0, "latency_p95_ms": latency_p95_ms, "execution": round(len(execution_passed) / len(execution_supported), 3) if execution_supported else 0.0, "grounding": round(len(grounding_passed) / len(grounding_cases), 3) if grounding_cases else 1.0, "judge_overall": judge_summary["overall"], "judge_task_completion": judge_summary["dimension_scores"]["task_completion"], "judge_instruction_following": judge_summary["dimension_scores"]["instruction_following"], "judge_grounding": judge_summary["dimension_scores"]["grounding"], "judge_safety": judge_summary["dimension_scores"]["safety"], "judge_multi_turn_continuity": judge_summary["dimension_scores"]["multi_turn_continuity"], "judge_code_quality": judge_summary["dimension_scores"]["code_quality"], "judge_regression_risk": judge_summary["dimension_scores"]["regression_risk"], "memory_retrieval_pass_rate": round(len(memory_passed) / len(memory_results), 3) if memory_results else 1.0, } for category, score in memory_quality_scores.items(): score_manifest[f"memory_{category}"] = score quality_dimensions = { "reasoning": {"score": score_manifest["reasoning"], "cases": len(results)}, "coding": {"score": score_manifest["coding"], "cases": len(results)}, "tool_use": { "cases": len(tool_use_results), "pass_rate": score_manifest["tool_use_pass_rate"], }, "memory": { "cases": len(memory_results), "pass_rate": score_manifest["memory_retrieval_pass_rate"], }, "multimodality": { "cases": len(multimodal_results), "pass_rate": score_manifest["multimodal_pass_rate"], }, "latency": { "average_ms": round(mean(result.latency_ms for result in successful), 1) if successful else 0.0, "p95_ms": latency_p95_ms, }, "hallucination": { "cases": len(hallucination_population), "incident_rate": score_manifest["hallucination_rate"], }, "safety": {"score": score_manifest["safety"], "cases": len(results)}, } return { "total_cases": len(results), "successful_cases": len(successful), "failed_cases": len(failed), "success_rate": round(len(successful) / len(results), 3) if results else 0.0, "average_latency_ms": round(mean(result.latency_ms for result in successful), 1) if successful else 0.0, "latency_p95_ms": latency_p95_ms, "average_tokens_used": round(mean(result.tokens_used for result in successful), 1) if successful else 0.0, "average_overall_score": round(mean(item.overall for item in evals), 3) if evals else 0.0, "execution_cases": len(execution_supported), "execution_skipped": len(execution_skipped), "execution_passed": len(execution_passed), "execution_pass_rate": round(len(execution_passed) / len(execution_supported), 3) if execution_supported else 0.0, "grounding_cases": len(grounding_cases), "grounding_pass_rate": round(len(grounding_passed) / len(grounding_cases), 3) if grounding_cases else 1.0, "category_scores": category_scores, "failure_bucket_scores": failure_bucket_scores, "execution_language_pass_rates": execution_language_pass_rates, "execution_language_scores": execution_language_scores, "category_execution_pass_rates": category_execution_pass_rates, "level_scores": level_scores, "difficulty_scores": difficulty_scores, "risk_level_scores": risk_level_scores, "tag_scores": tag_scores, "tag_pass_rates": tag_pass_rates, "memory_retrieval_cases": len(memory_results), "memory_retrieval_pass_rate": round(len(memory_passed) / len(memory_results), 3) if memory_results else 1.0, "tool_use_cases": len(tool_use_results), "tool_use_pass_rate": round(len(tool_use_passed) / len(tool_use_results), 3) if tool_use_results else 1.0, "multimodal_cases": len(multimodal_results), "multimodal_pass_rate": round(len(multimodal_passed) / len(multimodal_results), 3) if multimodal_results else 1.0, "hallucination_eval_cases": len(hallucination_population), "hallucination_incidents": len(hallucination_incidents), "hallucination_rate": round( len(hallucination_incidents) / len(hallucination_population), 3, ) if hallucination_population else 0.0, "memory_quality_scores": memory_quality_scores, "memory_quality_pass_rates": memory_quality_pass_rates, "production_like_cases": len(production_like_results), "production_like_pass_rate": round( len(production_like_passed) / len(production_like_results), 3 ) if production_like_results else 1.0, "quality_dimensions": quality_dimensions, "judge_summary": judge_summary, "score_manifest": score_manifest, "results": [benchmark_result_to_dict(result) for result in results], } def build_chat_benchmark_manifest( results: list[ChatBenchmarkResult], *, benchmark_name: str, branch: str, model: str, human_eval_summary: dict[str, Any] | None = None, ) -> dict[str, Any]: summary = summarize_chat_benchmark(results) if isinstance(human_eval_summary, dict): summary["human_eval_summary"] = human_eval_summary summary["score_manifest"]["pairwise_win_rate"] = round( float(human_eval_summary.get("pairwise_win_rate", 0.0) or 0.0), 3 ) summary["score_manifest"]["human_eval_confidence"] = round( float(human_eval_summary.get("average_confidence", 0.0) or 0.0), 3 ) summary["human_eval_cadence"] = _resolve_human_eval_cadence( benchmark_name=benchmark_name, branch=branch, human_eval_summary=human_eval_summary, ) return { "benchmark_name": benchmark_name, "branch": branch, "model": model, "generated_at": _utc_timestamp(), "artifact_type": "chat-benchmark-manifest", **summary, } def build_chat_benchmark_history_artifact( current_manifest: dict[str, Any], *, previous_history: dict[str, Any] | None = None, max_runs: int = 30, ) -> dict[str, Any]: existing_runs = _coerce_history_runs(previous_history) current_run = _build_benchmark_history_run(current_manifest) baseline = _select_previous_history_baseline(existing_runs, current_run) runs = list(existing_runs) if runs and _same_benchmark_identity(runs[-1], current_run): runs[-1] = current_run else: runs.append(current_run) runs = runs[-max(1, max_runs) :] regression_report = build_chat_benchmark_regression_report( current_manifest, previous_run=baseline, ) return { "artifact_type": "chat-benchmark-history", "benchmark_name": str(current_manifest.get("benchmark_name", "")).strip(), "branch": str(current_manifest.get("branch", "")).strip(), "model": str(current_manifest.get("model", "")).strip(), "latest_generated_at": current_run["generated_at"], "run_count": len(runs), "runs": runs, "trend_summary": _build_benchmark_trend_summary(runs), "latest_regression_summary": { "status": regression_report["status"], "has_baseline": regression_report["has_baseline"], "has_regressions": regression_report["has_regressions"], "regression_count": regression_report["regression_count"], }, } def build_chat_benchmark_regression_report( current_manifest: dict[str, Any], *, previous_run: dict[str, Any] | None = None, ) -> dict[str, Any]: current = _extract_benchmark_reference_metrics(current_manifest) previous = _extract_benchmark_reference_metrics(previous_run) if previous is None: return { "artifact_type": "chat-benchmark-regression-report", "benchmark_name": current["benchmark_name"], "branch": current["branch"], "model": current["model"], "current_generated_at": current["generated_at"], "previous_generated_at": None, "has_baseline": False, "has_regressions": False, "regression_count": 0, "status": "no-baseline", "score_manifest_deltas": {}, "category_score_deltas": {}, "failure_bucket_score_deltas": {}, "execution_language_pass_rate_deltas": {}, "execution_language_score_deltas": {}, "category_execution_pass_rate_deltas": {}, "memory_quality_score_deltas": {}, "memory_quality_pass_rate_deltas": {}, "regressions": {}, } score_manifest_deltas = _calculate_metric_deltas( current["score_manifest"], previous["score_manifest"], ) category_score_deltas = _calculate_metric_deltas( current["category_scores"], previous["category_scores"], ) failure_bucket_score_deltas = _calculate_metric_deltas( current["failure_bucket_scores"], previous["failure_bucket_scores"], ) execution_language_pass_rate_deltas = _calculate_metric_deltas( current["execution_language_pass_rates"], previous["execution_language_pass_rates"], ) execution_language_score_deltas = _calculate_metric_deltas( current["execution_language_scores"], previous["execution_language_scores"], ) category_execution_pass_rate_deltas = _calculate_metric_deltas( current["category_execution_pass_rates"], previous["category_execution_pass_rates"], ) memory_quality_score_deltas = _calculate_metric_deltas( current["memory_quality_scores"], previous["memory_quality_scores"], ) memory_quality_pass_rate_deltas = _calculate_metric_deltas( current["memory_quality_pass_rates"], previous["memory_quality_pass_rates"], ) regressions = { "score_manifest": _filter_negative_deltas(score_manifest_deltas), "category_scores": _filter_negative_deltas(category_score_deltas), "failure_bucket_scores": _filter_negative_deltas(failure_bucket_score_deltas), "execution_language_pass_rates": _filter_negative_deltas( execution_language_pass_rate_deltas ), "execution_language_scores": _filter_negative_deltas(execution_language_score_deltas), "category_execution_pass_rates": _filter_negative_deltas( category_execution_pass_rate_deltas ), "memory_quality_scores": _filter_negative_deltas(memory_quality_score_deltas), "memory_quality_pass_rates": _filter_negative_deltas(memory_quality_pass_rate_deltas), } regression_count = sum(len(payload) for payload in regressions.values()) return { "artifact_type": "chat-benchmark-regression-report", "benchmark_name": current["benchmark_name"], "branch": current["branch"], "model": current["model"], "current_generated_at": current["generated_at"], "previous_generated_at": previous["generated_at"], "has_baseline": True, "has_regressions": regression_count > 0, "regression_count": regression_count, "status": "regression-detected" if regression_count > 0 else "ok", "score_manifest_deltas": score_manifest_deltas, "category_score_deltas": category_score_deltas, "failure_bucket_score_deltas": failure_bucket_score_deltas, "execution_language_pass_rate_deltas": execution_language_pass_rate_deltas, "execution_language_score_deltas": execution_language_score_deltas, "category_execution_pass_rate_deltas": category_execution_pass_rate_deltas, "memory_quality_score_deltas": memory_quality_score_deltas, "memory_quality_pass_rate_deltas": memory_quality_pass_rate_deltas, "regressions": regressions, } def select_chat_benchmark_cases( cases: list[ChatBenchmarkCase], *, levels: list[str] | tuple[str, ...], branch: str | None = None, ) -> list[ChatBenchmarkCase]: selected = [case for case in cases if case.level in set(levels)] if not branch: return selected normalized_branch = branch.strip().lower() return [ case for case in selected if not case.branches or normalized_branch in {item.strip().lower() for item in case.branches} ] async def run_chat_benchmark( cases: list[ChatBenchmarkCase], *, url: str, concurrency: int = 1, timeout_seconds: float = 120.0, transport: httpx.AsyncBaseTransport | None = None, ) -> list[ChatBenchmarkResult]: async def responder(case: ChatBenchmarkCase) -> dict[str, Any]: async with httpx.AsyncClient( timeout=timeout_seconds, follow_redirects=True, transport=transport, ) as client: response = await client.post( url, json={ "message": case.message, "history": list(case.history), "vision_context": case.vision_context, "profile": case.profile, "persona_id": case.persona_id, "session_id": case.session_id, }, ) response.raise_for_status() return response.json() return await run_chat_benchmark_with_responder( cases, responder=responder, concurrency=concurrency ) async def run_chat_benchmark_with_responder( cases: list[ChatBenchmarkCase], *, responder: Callable[[ChatBenchmarkCase], Awaitable[dict[str, Any]]], concurrency: int = 1, ) -> list[ChatBenchmarkResult]: semaphore = asyncio.Semaphore(max(1, concurrency)) async def run_case(case: ChatBenchmarkCase) -> ChatBenchmarkResult: async with semaphore: return await _execute_case(responder, case) return await asyncio.gather(*(run_case(case) for case in cases)) def benchmark_result_to_dict(result: ChatBenchmarkResult) -> dict[str, Any]: payload = asdict(result) if result.eval is not None: payload["eval"] = asdict(result.eval) payload["eval"]["overall"] = result.eval.overall return payload def _normalize_optional_text(value: Any) -> str | None: normalized = str(value or "").strip() return normalized or None def _normalize_text_list(value: Any) -> tuple[str, ...]: if not isinstance(value, list): return () return tuple(str(item).strip() for item in value if str(item).strip()) def _normalize_optional_mapping(value: Any) -> dict[str, Any] | None: if not isinstance(value, dict): return None return dict(value) async def _execute_case( responder: Callable[[ChatBenchmarkCase], Awaitable[dict[str, Any]]], case: ChatBenchmarkCase, ) -> ChatBenchmarkResult: started_at = perf_counter() try: data = await responder(case) latency_ms = int((perf_counter() - started_at) * 1000) response_text = str(data.get("response", "")).strip() execution_result = ( evaluate_code_response( response_text, CodeExecutionSpec( language=case.execution_language, test_code=case.execution_test_code or "", timeout_seconds=case.execution_timeout_seconds, compile_only=case.execution_compile_only, ), ) if case.execution_language else None ) grounding_ok = _grounding_requirements_met(data, case) eval_result = evaluate_chat_case( ChatEvalCase( name=case.name, prompt=case.message, response=response_text, persona_title=str(data.get("persona_title") or "Core Assistant"), reference_answer=case.reference_answer or "", reference_facts=case.reference_facts, expected_terms=case.expected_terms, forbidden_terms=case.forbidden_terms, history_turns=len(case.history), expects_code=case.expects_code, level=case.level, difficulty=case.difficulty, category=case.category, failure_bucket=case.failure_bucket, risk_level=case.risk_level, production_like=case.production_like, ) ) execution_ok = ( execution_result is None or not execution_result.available or execution_result.passed ) ok = execution_ok and grounding_ok return ChatBenchmarkResult( name=case.name, ok=ok, latency_ms=latency_ms, status_code=int(data.get("status_code", 200) or 200), response=response_text, model=str(data.get("model", "")).strip(), tokens_used=int(data.get("tokens_used", 0) or 0), eval=eval_result, execution=execution_result, grounding_required=_case_requires_grounding(case), grounding_ok=grounding_ok, error=_failure_summary(execution_result=execution_result, grounding_ok=grounding_ok), tags=case.tags, level=case.level, difficulty=case.difficulty, category=case.category, failure_bucket=case.failure_bucket, risk_level=case.risk_level, production_like=case.production_like, execution_language=case.execution_language, ) except (httpx.HTTPError, ValueError, TypeError) as exc: latency_ms = int((perf_counter() - started_at) * 1000) status_code = getattr(getattr(exc, "response", None), "status_code", None) return ChatBenchmarkResult( name=case.name, ok=False, latency_ms=latency_ms, status_code=status_code, response="", model="", tokens_used=0, eval=None, execution=None, grounding_required=_case_requires_grounding(case), grounding_ok=False, error=str(exc), tags=case.tags, level=case.level, difficulty=case.difficulty, category=case.category, failure_bucket=case.failure_bucket, risk_level=case.risk_level, production_like=case.production_like, execution_language=case.execution_language, ) def _case_requires_grounding(case: ChatBenchmarkCase) -> bool: return ( case.min_tool_steps > 0 or case.min_grounding_sources > 0 or bool(case.expected_grounding_terms) ) def _grounding_requirements_met(data: dict[str, Any], case: ChatBenchmarkCase) -> bool: if not _case_requires_grounding(case): return True tool_trace = data.get("tool_trace") if not isinstance(tool_trace, dict): return False steps = tool_trace.get("steps") grounding_sources = tool_trace.get("grounding_sources") if case.min_tool_steps > 0 and ( not isinstance(steps, list) or len(steps) < case.min_tool_steps ): return False if case.min_grounding_sources > 0 and ( not isinstance(grounding_sources, list) or len(grounding_sources) < case.min_grounding_sources ): return False if not case.expected_grounding_terms: return True haystacks: list[str] = [] if isinstance(grounding_sources, list): for source in grounding_sources: if not isinstance(source, dict): continue haystacks.extend( [ str(source.get("label", "")).lower(), str(source.get("uri", "")).lower(), str(source.get("snippet", "")).lower(), ] ) combined = "\n".join(haystacks) return all(term.lower() in combined for term in case.expected_grounding_terms) def _failure_summary( *, execution_result: CodeExecutionResult | None, grounding_ok: bool ) -> str | None: if execution_result is not None and execution_result.available and not execution_result.passed: return execution_result.summary if not grounding_ok: return "grounding requirements not met" return None def _mean_eval_metric(evals: list[ChatEvalResult], field: str) -> float: if not evals: return 0.0 return round(mean(float(getattr(item, field, 0.0)) for item in evals), 3) def _group_average_scores( results: list[ChatBenchmarkResult], *, key: Callable[[ChatBenchmarkResult], str], ) -> dict[str, float]: buckets: dict[str, list[float]] = {} for result in results: if result.eval is None: continue bucket = key(result).strip() if not bucket: continue buckets.setdefault(bucket, []).append(result.eval.overall) return {bucket: round(mean(scores), 3) for bucket, scores in sorted(buckets.items()) if scores} def _group_case_pass_rates( results: list[ChatBenchmarkResult], *, key: Callable[[ChatBenchmarkResult], str], ) -> dict[str, float]: buckets: dict[str, list[float]] = {} for result in results: bucket = key(result).strip() if not bucket: continue buckets.setdefault(bucket, []).append(1.0 if result.ok else 0.0) return {bucket: round(mean(scores), 3) for bucket, scores in sorted(buckets.items()) if scores} def _group_tag_average_scores(results: list[ChatBenchmarkResult]) -> dict[str, float]: buckets: dict[str, list[float]] = {} for result in results: if result.eval is None: continue for tag in result.tags: normalized = str(tag).strip() if normalized: buckets.setdefault(normalized, []).append(result.eval.overall) return {bucket: round(mean(scores), 3) for bucket, scores in sorted(buckets.items()) if scores} def _group_tag_pass_rates(results: list[ChatBenchmarkResult]) -> dict[str, float]: buckets: dict[str, list[float]] = {} for result in results: for tag in result.tags: normalized = str(tag).strip() if normalized: buckets.setdefault(normalized, []).append(1.0 if result.ok else 0.0) return {bucket: round(mean(scores), 3) for bucket, scores in sorted(buckets.items()) if scores} def _group_execution_pass_rates( results: list[ChatBenchmarkResult], *, key: Callable[[ChatBenchmarkResult], str], ) -> dict[str, float]: buckets: dict[str, list[float]] = {} for result in results: if result.execution is None or not result.execution.available: continue bucket = key(result).strip() if not bucket: continue buckets.setdefault(bucket, []).append(1.0 if result.execution.passed else 0.0) return {bucket: round(mean(scores), 3) for bucket, scores in sorted(buckets.items()) if scores} def _is_tool_use_result(result: ChatBenchmarkResult) -> bool: if result.grounding_required: return True normalized_tags = {str(tag).strip().lower() for tag in result.tags} return any(tag in normalized_tags for tag in TOOL_USE_TAGS) def _is_multimodal_result(result: ChatBenchmarkResult) -> bool: normalized_category = result.category.strip().lower() if normalized_category in MULTIMODAL_CATEGORIES: return True normalized_tags = {str(tag).strip().lower() for tag in result.tags} return any(tag in normalized_tags for tag in MULTIMODAL_TAGS) def _is_hallucination_incident(result: ChatBenchmarkResult) -> bool: if result.eval is None: return False if result.eval.factuality < HALLUCINATION_FACTUALITY_THRESHOLD: return True judge = result.eval.judge if judge is None: return False return ( result.category in {"grounding", "factuality", "multimodal"} and not judge.grounding.passed ) def _latency_percentile_ms( results: list[ChatBenchmarkResult], *, percentile: float, ) -> float: if not results: return 0.0 latencies = sorted(result.latency_ms for result in results) index = max(0, min(len(latencies) - 1, round((len(latencies) - 1) * (percentile / 100.0)))) return round(float(latencies[index]), 1) def _summarize_judge_results(evals: list[ChatEvalResult]) -> dict[str, Any]: dimension_names = ( "task_completion", "instruction_following", "grounding", "safety", "multi_turn_continuity", "code_quality", "regression_risk", ) judge_results = [item.judge for item in evals if item.judge is not None] if not judge_results: return { "overall": 0.0, "pass_rate": 0.0, "dimension_scores": {name: 0.0 for name in dimension_names}, "failure_reasons": {}, } failure_reasons: dict[str, int] = {} for judge in judge_results: for reason in judge.failure_reasons: failure_reasons[reason] = failure_reasons.get(reason, 0) + 1 return { "overall": round(mean(judge.overall for judge in judge_results), 3), "pass_rate": round( sum(1 for judge in judge_results if judge.passed) / len(judge_results), 3, ), "dimension_scores": { name: round( mean(float(getattr(judge, name).score) for judge in judge_results), 3, ) for name in dimension_names }, "failure_reasons": dict(sorted(failure_reasons.items())), } def _utc_timestamp() -> str: return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z") def _normalize_metric_map(value: Any) -> dict[str, float]: if not isinstance(value, dict): return {} normalized: dict[str, float] = {} for key, item in value.items(): try: normalized[str(key)] = round(float(item), 3) except (TypeError, ValueError): continue return normalized def _extract_benchmark_reference_metrics(reference: dict[str, Any] | None) -> dict[str, Any] | None: if not isinstance(reference, dict): return None return { "benchmark_name": str(reference.get("benchmark_name", "")).strip(), "branch": str(reference.get("branch", "")).strip(), "model": str(reference.get("model", "")).strip(), "generated_at": str(reference.get("generated_at") or _utc_timestamp()), "score_manifest": _normalize_metric_map(reference.get("score_manifest")), "category_scores": _normalize_metric_map(reference.get("category_scores")), "failure_bucket_scores": _normalize_metric_map(reference.get("failure_bucket_scores")), "tag_scores": _normalize_metric_map(reference.get("tag_scores")), "tag_pass_rates": _normalize_metric_map(reference.get("tag_pass_rates")), "execution_language_pass_rates": _normalize_metric_map( reference.get("execution_language_pass_rates") ), "execution_language_scores": _normalize_metric_map( reference.get("execution_language_scores") ), "category_execution_pass_rates": _normalize_metric_map( reference.get("category_execution_pass_rates") ), "memory_quality_scores": _normalize_metric_map(reference.get("memory_quality_scores")), "memory_quality_pass_rates": _normalize_metric_map( reference.get("memory_quality_pass_rates") ), "risk_level_scores": _normalize_metric_map(reference.get("risk_level_scores")), "average_overall_score": float(reference.get("average_overall_score", 0.0) or 0.0), "average_latency_ms": float(reference.get("average_latency_ms", 0.0) or 0.0), "average_tokens_used": float(reference.get("average_tokens_used", 0.0) or 0.0), "success_rate": float(reference.get("success_rate", 0.0) or 0.0), "execution_pass_rate": float(reference.get("execution_pass_rate", 0.0) or 0.0), "grounding_pass_rate": float(reference.get("grounding_pass_rate", 0.0) or 0.0), "memory_retrieval_pass_rate": float( reference.get("memory_retrieval_pass_rate", 0.0) or 0.0 ), "production_like_pass_rate": float(reference.get("production_like_pass_rate", 0.0) or 0.0), "human_eval_cadence": str(reference.get("human_eval_cadence", "") or ""), "total_cases": int(reference.get("total_cases", 0) or 0), "failed_cases": int(reference.get("failed_cases", 0) or 0), } def _build_benchmark_history_run(manifest: dict[str, Any]) -> dict[str, Any]: extracted = _extract_benchmark_reference_metrics(manifest) if extracted is None: raise ValueError("Benchmark manifest history snapshotam jābūt objektam.") return extracted def _coerce_history_runs(history: dict[str, Any] | None) -> list[dict[str, Any]]: if not isinstance(history, dict): return [] runs = history.get("runs") if not isinstance(runs, list): return [] return [dict(item) for item in runs if isinstance(item, dict)] def _same_benchmark_identity(left: dict[str, Any], right: dict[str, Any]) -> bool: return ( str(left.get("benchmark_name", "")).strip() == str(right.get("benchmark_name", "")).strip() and str(left.get("branch", "")).strip() == str(right.get("branch", "")).strip() and str(left.get("model", "")).strip() == str(right.get("model", "")).strip() and str(left.get("generated_at", "")).strip() == str(right.get("generated_at", "")).strip() ) def _select_previous_history_baseline( runs: list[dict[str, Any]], current_run: dict[str, Any] ) -> dict[str, Any] | None: if not runs: return None if _same_benchmark_identity(runs[-1], current_run): return runs[-2] if len(runs) > 1 else None return runs[-1] def _calculate_metric_deltas( current_metrics: dict[str, float], previous_metrics: dict[str, float], ) -> dict[str, dict[str, float]]: shared_keys = sorted(set(current_metrics) & set(previous_metrics)) return { key: { "current": round(current_metrics[key], 3), "previous": round(previous_metrics[key], 3), "delta": round(current_metrics[key] - previous_metrics[key], 3), } for key in shared_keys } def _filter_negative_deltas( deltas: dict[str, dict[str, float]], ) -> dict[str, dict[str, float]]: return { key: payload for key, payload in deltas.items() if float(payload.get("delta", 0.0) or 0.0) < 0.0 } def _resolve_human_eval_cadence( *, benchmark_name: str, branch: str, human_eval_summary: dict[str, Any] | None, ) -> str: if isinstance(human_eval_summary, dict): cadence = str(human_eval_summary.get("cadence", "") or "").strip() if cadence: return cadence normalized_benchmark = benchmark_name.strip().lower() normalized_branch = branch.strip().lower() if "memory" in normalized_benchmark or normalized_branch == "master": return "weekly + pre-release" if normalized_branch in {"coder", "planner"}: return "per release" return "per release" def _build_benchmark_trend_summary(runs: list[dict[str, Any]]) -> dict[str, dict[str, Any]]: if not runs: return {} summary: dict[str, dict[str, Any]] = {} for metric in TREND_TRACKED_METRICS: recent_values = [_history_metric_value(run, metric) for run in runs] recent_values = [round(value, 3) for value in recent_values if value is not None] if not recent_values: continue latest = recent_values[-1] baseline = recent_values[0] summary[metric] = { "latest": latest, "baseline": baseline, "delta": round(latest - baseline, 3), "recent_values": recent_values[-5:], } return summary def _history_metric_value(run: dict[str, Any], metric: str) -> float | None: if metric in run: try: return float(run[metric]) except (TypeError, ValueError): return None score_manifest = run.get("score_manifest") if isinstance(score_manifest, dict) and metric in score_manifest: try: return float(score_manifest[metric]) except (TypeError, ValueError): return None return None