MarisUK's picture
Maris AI model sync
f440f03 verified
"""Chat benchmark runner for JSON eval datasets."""
from __future__ import annotations
import asyncio
import json
from collections.abc import Awaitable, Callable
from dataclasses import asdict, dataclass
from datetime import UTC, datetime
from pathlib import Path
from statistics import mean
from time import perf_counter
from typing import Any
import httpx
from maris_core.code.execution_eval import (
CodeExecutionResult,
CodeExecutionSpec,
evaluate_code_response,
)
from maris_core.text.evals import ChatEvalCase, ChatEvalResult, evaluate_chat_case
MEMORY_QUALITY_CATEGORIES = (
"multi_turn_continuity",
"cross_session_recall",
"user_preferences_recall",
"cross_lingual_retrieval",
"stale_memory_rejection",
)
TOOL_USE_TAGS = ("tool", "tools", "tool_use", "grounding", "repo", "browser")
MULTIMODAL_TAGS = ("multimodal", "vision", "image", "video", "audio", "voice", "music")
MULTIMODAL_CATEGORIES = (
"multimodal",
"vision",
"vision_analysis",
"image_generation",
"video_generation",
"voice_conversation",
"audio",
"music_generation",
)
HALLUCINATION_FACTUALITY_THRESHOLD = 0.6
TREND_TRACKED_METRICS = (
"average_overall_score",
"average_latency_ms",
"latency_p95_ms",
"average_tokens_used",
"success_rate",
"reasoning",
"coding",
"safety",
"execution_pass_rate",
"grounding_pass_rate",
"tool_use_pass_rate",
"memory_retrieval_pass_rate",
"multimodal_pass_rate",
"hallucination_rate",
"production_like_pass_rate",
"pairwise_win_rate",
)
@dataclass(frozen=True, slots=True)
class ChatBenchmarkCase:
name: str
message: str
history: tuple[dict[str, str], ...] = ()
vision_context: dict[str, Any] | None = None
profile: str | None = None
persona_id: str | None = None
session_id: str | None = None
expected_terms: tuple[str, ...] = ()
forbidden_terms: tuple[str, ...] = ()
tags: tuple[str, ...] = ()
reference_answer: str | None = None
reference_facts: tuple[str, ...] = ()
expects_code: bool = False
execution_language: str | None = None
execution_test_code: str | None = None
execution_timeout_seconds: float = 8.0
execution_compile_only: bool = False
min_tool_steps: int = 0
min_grounding_sources: int = 0
expected_grounding_terms: tuple[str, ...] = ()
branches: tuple[str, ...] = ()
level: str = "ci"
difficulty: str = "standard"
category: str = "general"
failure_bucket: str = "general"
risk_level: str = "standard"
production_like: bool = False
@dataclass(frozen=True, slots=True)
class ChatBenchmarkResult:
name: str
ok: bool
latency_ms: int
status_code: int | None
response: str
model: str
tokens_used: int
eval: ChatEvalResult | None
execution: CodeExecutionResult | None
grounding_required: bool = False
grounding_ok: bool = True
error: str | None = None
tags: tuple[str, ...] = ()
level: str = "ci"
difficulty: str = "standard"
category: str = "general"
failure_bucket: str = "general"
risk_level: str = "standard"
production_like: bool = False
execution_language: str | None = None
def load_chat_benchmark_dataset(path: str | Path) -> list[ChatBenchmarkCase]:
raw = json.loads(Path(path).read_text(encoding="utf-8"))
entries = raw.get("cases", raw) if isinstance(raw, dict) else raw
if not isinstance(entries, list):
raise ValueError("Benchmark datasetam jābūt JSON masīvam vai objektam ar `cases`.")
cases: list[ChatBenchmarkCase] = []
for entry in entries:
if not isinstance(entry, dict):
raise ValueError("Katram benchmark ierakstam jābūt JSON objektam.")
name = str(entry.get("name", "")).strip()
message = str(entry.get("message", "")).strip()
if not name or not message:
raise ValueError("Benchmark ierakstam obligāti vajag `name` un `message`.")
history = tuple(
item
for item in entry.get("history", [])
if isinstance(item, dict) and item.get("role") and item.get("content")
)
cases.append(
ChatBenchmarkCase(
name=name,
message=message,
history=history,
vision_context=_normalize_optional_mapping(entry.get("vision_context")),
profile=_normalize_optional_text(entry.get("profile")),
persona_id=_normalize_optional_text(entry.get("persona_id")),
session_id=_normalize_optional_text(entry.get("session_id")),
expected_terms=_normalize_text_list(entry.get("expected_terms")),
forbidden_terms=_normalize_text_list(entry.get("forbidden_terms")),
tags=_normalize_text_list(entry.get("tags")),
reference_answer=_normalize_optional_text(entry.get("reference_answer")),
reference_facts=_normalize_text_list(entry.get("reference_facts")),
expects_code=bool(entry.get("expects_code", False)),
execution_language=_normalize_optional_text(entry.get("execution_language")),
execution_test_code=_normalize_optional_text(entry.get("execution_test_code")),
execution_timeout_seconds=float(entry.get("execution_timeout_seconds", 8.0) or 8.0),
execution_compile_only=bool(entry.get("execution_compile_only", False)),
min_tool_steps=max(0, int(entry.get("min_tool_steps", 0) or 0)),
min_grounding_sources=max(0, int(entry.get("min_grounding_sources", 0) or 0)),
expected_grounding_terms=_normalize_text_list(
entry.get("expected_grounding_terms")
),
branches=_normalize_text_list(entry.get("branches")),
level=_normalize_optional_text(entry.get("level")) or "ci",
difficulty=_normalize_optional_text(entry.get("difficulty")) or "standard",
category=_normalize_optional_text(entry.get("category")) or "general",
failure_bucket=_normalize_optional_text(entry.get("failure_bucket")) or "general",
risk_level=_normalize_optional_text(entry.get("risk_level")) or "standard",
production_like=bool(entry.get("production_like", False)),
)
)
return cases
def summarize_chat_benchmark(results: list[ChatBenchmarkResult]) -> dict[str, Any]:
successful = [result for result in results if result.ok]
failed = [result for result in results if not result.ok]
evals = [result.eval for result in successful if result.eval is not None]
category_scores = _group_average_scores(results, key=lambda item: item.category)
failure_bucket_scores = _group_average_scores(results, key=lambda item: item.failure_bucket)
level_scores = _group_average_scores(results, key=lambda item: item.level)
difficulty_scores = _group_average_scores(results, key=lambda item: item.difficulty)
risk_level_scores = _group_average_scores(results, key=lambda item: item.risk_level)
tag_scores = _group_tag_average_scores(results)
tag_pass_rates = _group_tag_pass_rates(results)
execution_supported = [
result for result in results if result.execution is not None and result.execution.available
]
execution_skipped = [
result
for result in results
if result.execution is not None and not result.execution.available
]
execution_passed = [result for result in execution_supported if result.execution.passed]
grounding_cases = [result for result in results if result.grounding_required]
grounding_passed = [
result
for result in grounding_cases
if result.grounding_ok is True and result.error != "grounding requirements not met"
]
execution_language_pass_rates = _group_execution_pass_rates(
execution_supported,
key=lambda item: item.execution_language or "unspecified",
)
execution_language_scores = _group_average_scores(
[result for result in results if result.execution_language],
key=lambda item: item.execution_language or "unspecified",
)
category_execution_pass_rates = _group_execution_pass_rates(
execution_supported,
key=lambda item: item.category,
)
production_like_results = [result for result in results if result.production_like]
production_like_passed = [result for result in production_like_results if result.ok]
memory_results = [
result
for result in results
if result.category in MEMORY_QUALITY_CATEGORIES or "memory" in result.tags
]
memory_passed = [result for result in memory_results if result.ok]
memory_quality_scores = {
key: value for key, value in category_scores.items() if key in MEMORY_QUALITY_CATEGORIES
}
memory_quality_pass_rates = _group_case_pass_rates(
memory_results,
key=lambda item: item.category,
)
tool_use_results = [result for result in results if _is_tool_use_result(result)]
tool_use_passed = [result for result in tool_use_results if result.ok and result.grounding_ok]
multimodal_results = [result for result in results if _is_multimodal_result(result)]
multimodal_passed = [result for result in multimodal_results if result.ok]
hallucination_population = [result for result in results if result.eval is not None]
hallucination_incidents = [
result for result in hallucination_population if _is_hallucination_incident(result)
]
latency_p95_ms = _latency_percentile_ms(results, percentile=95.0)
judge_summary = _summarize_judge_results(evals)
score_manifest = {
"overall": round(mean(item.overall for item in evals), 3) if evals else 0.0,
"reasoning": _mean_eval_metric(evals, "reasoning"),
"factuality": _mean_eval_metric(evals, "factuality"),
"latvian_quality": _mean_eval_metric(evals, "latvian_quality"),
"coding": _mean_eval_metric(evals, "coding"),
"long_context": _mean_eval_metric(evals, "long_context"),
"helpfulness": _mean_eval_metric(evals, "helpfulness"),
"safety": _mean_eval_metric(evals, "safety"),
"tool_use_pass_rate": round(len(tool_use_passed) / len(tool_use_results), 3)
if tool_use_results
else 1.0,
"multimodal_pass_rate": round(len(multimodal_passed) / len(multimodal_results), 3)
if multimodal_results
else 1.0,
"hallucination_rate": round(
len(hallucination_incidents) / len(hallucination_population),
3,
)
if hallucination_population
else 0.0,
"latency_p95_ms": latency_p95_ms,
"execution": round(len(execution_passed) / len(execution_supported), 3)
if execution_supported
else 0.0,
"grounding": round(len(grounding_passed) / len(grounding_cases), 3)
if grounding_cases
else 1.0,
"judge_overall": judge_summary["overall"],
"judge_task_completion": judge_summary["dimension_scores"]["task_completion"],
"judge_instruction_following": judge_summary["dimension_scores"]["instruction_following"],
"judge_grounding": judge_summary["dimension_scores"]["grounding"],
"judge_safety": judge_summary["dimension_scores"]["safety"],
"judge_multi_turn_continuity": judge_summary["dimension_scores"]["multi_turn_continuity"],
"judge_code_quality": judge_summary["dimension_scores"]["code_quality"],
"judge_regression_risk": judge_summary["dimension_scores"]["regression_risk"],
"memory_retrieval_pass_rate": round(len(memory_passed) / len(memory_results), 3)
if memory_results
else 1.0,
}
for category, score in memory_quality_scores.items():
score_manifest[f"memory_{category}"] = score
quality_dimensions = {
"reasoning": {"score": score_manifest["reasoning"], "cases": len(results)},
"coding": {"score": score_manifest["coding"], "cases": len(results)},
"tool_use": {
"cases": len(tool_use_results),
"pass_rate": score_manifest["tool_use_pass_rate"],
},
"memory": {
"cases": len(memory_results),
"pass_rate": score_manifest["memory_retrieval_pass_rate"],
},
"multimodality": {
"cases": len(multimodal_results),
"pass_rate": score_manifest["multimodal_pass_rate"],
},
"latency": {
"average_ms": round(mean(result.latency_ms for result in successful), 1)
if successful
else 0.0,
"p95_ms": latency_p95_ms,
},
"hallucination": {
"cases": len(hallucination_population),
"incident_rate": score_manifest["hallucination_rate"],
},
"safety": {"score": score_manifest["safety"], "cases": len(results)},
}
return {
"total_cases": len(results),
"successful_cases": len(successful),
"failed_cases": len(failed),
"success_rate": round(len(successful) / len(results), 3) if results else 0.0,
"average_latency_ms": round(mean(result.latency_ms for result in successful), 1)
if successful
else 0.0,
"latency_p95_ms": latency_p95_ms,
"average_tokens_used": round(mean(result.tokens_used for result in successful), 1)
if successful
else 0.0,
"average_overall_score": round(mean(item.overall for item in evals), 3) if evals else 0.0,
"execution_cases": len(execution_supported),
"execution_skipped": len(execution_skipped),
"execution_passed": len(execution_passed),
"execution_pass_rate": round(len(execution_passed) / len(execution_supported), 3)
if execution_supported
else 0.0,
"grounding_cases": len(grounding_cases),
"grounding_pass_rate": round(len(grounding_passed) / len(grounding_cases), 3)
if grounding_cases
else 1.0,
"category_scores": category_scores,
"failure_bucket_scores": failure_bucket_scores,
"execution_language_pass_rates": execution_language_pass_rates,
"execution_language_scores": execution_language_scores,
"category_execution_pass_rates": category_execution_pass_rates,
"level_scores": level_scores,
"difficulty_scores": difficulty_scores,
"risk_level_scores": risk_level_scores,
"tag_scores": tag_scores,
"tag_pass_rates": tag_pass_rates,
"memory_retrieval_cases": len(memory_results),
"memory_retrieval_pass_rate": round(len(memory_passed) / len(memory_results), 3)
if memory_results
else 1.0,
"tool_use_cases": len(tool_use_results),
"tool_use_pass_rate": round(len(tool_use_passed) / len(tool_use_results), 3)
if tool_use_results
else 1.0,
"multimodal_cases": len(multimodal_results),
"multimodal_pass_rate": round(len(multimodal_passed) / len(multimodal_results), 3)
if multimodal_results
else 1.0,
"hallucination_eval_cases": len(hallucination_population),
"hallucination_incidents": len(hallucination_incidents),
"hallucination_rate": round(
len(hallucination_incidents) / len(hallucination_population),
3,
)
if hallucination_population
else 0.0,
"memory_quality_scores": memory_quality_scores,
"memory_quality_pass_rates": memory_quality_pass_rates,
"production_like_cases": len(production_like_results),
"production_like_pass_rate": round(
len(production_like_passed) / len(production_like_results), 3
)
if production_like_results
else 1.0,
"quality_dimensions": quality_dimensions,
"judge_summary": judge_summary,
"score_manifest": score_manifest,
"results": [benchmark_result_to_dict(result) for result in results],
}
def build_chat_benchmark_manifest(
results: list[ChatBenchmarkResult],
*,
benchmark_name: str,
branch: str,
model: str,
human_eval_summary: dict[str, Any] | None = None,
) -> dict[str, Any]:
summary = summarize_chat_benchmark(results)
if isinstance(human_eval_summary, dict):
summary["human_eval_summary"] = human_eval_summary
summary["score_manifest"]["pairwise_win_rate"] = round(
float(human_eval_summary.get("pairwise_win_rate", 0.0) or 0.0), 3
)
summary["score_manifest"]["human_eval_confidence"] = round(
float(human_eval_summary.get("average_confidence", 0.0) or 0.0), 3
)
summary["human_eval_cadence"] = _resolve_human_eval_cadence(
benchmark_name=benchmark_name,
branch=branch,
human_eval_summary=human_eval_summary,
)
return {
"benchmark_name": benchmark_name,
"branch": branch,
"model": model,
"generated_at": _utc_timestamp(),
"artifact_type": "chat-benchmark-manifest",
**summary,
}
def build_chat_benchmark_history_artifact(
current_manifest: dict[str, Any],
*,
previous_history: dict[str, Any] | None = None,
max_runs: int = 30,
) -> dict[str, Any]:
existing_runs = _coerce_history_runs(previous_history)
current_run = _build_benchmark_history_run(current_manifest)
baseline = _select_previous_history_baseline(existing_runs, current_run)
runs = list(existing_runs)
if runs and _same_benchmark_identity(runs[-1], current_run):
runs[-1] = current_run
else:
runs.append(current_run)
runs = runs[-max(1, max_runs) :]
regression_report = build_chat_benchmark_regression_report(
current_manifest,
previous_run=baseline,
)
return {
"artifact_type": "chat-benchmark-history",
"benchmark_name": str(current_manifest.get("benchmark_name", "")).strip(),
"branch": str(current_manifest.get("branch", "")).strip(),
"model": str(current_manifest.get("model", "")).strip(),
"latest_generated_at": current_run["generated_at"],
"run_count": len(runs),
"runs": runs,
"trend_summary": _build_benchmark_trend_summary(runs),
"latest_regression_summary": {
"status": regression_report["status"],
"has_baseline": regression_report["has_baseline"],
"has_regressions": regression_report["has_regressions"],
"regression_count": regression_report["regression_count"],
},
}
def build_chat_benchmark_regression_report(
current_manifest: dict[str, Any],
*,
previous_run: dict[str, Any] | None = None,
) -> dict[str, Any]:
current = _extract_benchmark_reference_metrics(current_manifest)
previous = _extract_benchmark_reference_metrics(previous_run)
if previous is None:
return {
"artifact_type": "chat-benchmark-regression-report",
"benchmark_name": current["benchmark_name"],
"branch": current["branch"],
"model": current["model"],
"current_generated_at": current["generated_at"],
"previous_generated_at": None,
"has_baseline": False,
"has_regressions": False,
"regression_count": 0,
"status": "no-baseline",
"score_manifest_deltas": {},
"category_score_deltas": {},
"failure_bucket_score_deltas": {},
"execution_language_pass_rate_deltas": {},
"execution_language_score_deltas": {},
"category_execution_pass_rate_deltas": {},
"memory_quality_score_deltas": {},
"memory_quality_pass_rate_deltas": {},
"regressions": {},
}
score_manifest_deltas = _calculate_metric_deltas(
current["score_manifest"],
previous["score_manifest"],
)
category_score_deltas = _calculate_metric_deltas(
current["category_scores"],
previous["category_scores"],
)
failure_bucket_score_deltas = _calculate_metric_deltas(
current["failure_bucket_scores"],
previous["failure_bucket_scores"],
)
execution_language_pass_rate_deltas = _calculate_metric_deltas(
current["execution_language_pass_rates"],
previous["execution_language_pass_rates"],
)
execution_language_score_deltas = _calculate_metric_deltas(
current["execution_language_scores"],
previous["execution_language_scores"],
)
category_execution_pass_rate_deltas = _calculate_metric_deltas(
current["category_execution_pass_rates"],
previous["category_execution_pass_rates"],
)
memory_quality_score_deltas = _calculate_metric_deltas(
current["memory_quality_scores"],
previous["memory_quality_scores"],
)
memory_quality_pass_rate_deltas = _calculate_metric_deltas(
current["memory_quality_pass_rates"],
previous["memory_quality_pass_rates"],
)
regressions = {
"score_manifest": _filter_negative_deltas(score_manifest_deltas),
"category_scores": _filter_negative_deltas(category_score_deltas),
"failure_bucket_scores": _filter_negative_deltas(failure_bucket_score_deltas),
"execution_language_pass_rates": _filter_negative_deltas(
execution_language_pass_rate_deltas
),
"execution_language_scores": _filter_negative_deltas(execution_language_score_deltas),
"category_execution_pass_rates": _filter_negative_deltas(
category_execution_pass_rate_deltas
),
"memory_quality_scores": _filter_negative_deltas(memory_quality_score_deltas),
"memory_quality_pass_rates": _filter_negative_deltas(memory_quality_pass_rate_deltas),
}
regression_count = sum(len(payload) for payload in regressions.values())
return {
"artifact_type": "chat-benchmark-regression-report",
"benchmark_name": current["benchmark_name"],
"branch": current["branch"],
"model": current["model"],
"current_generated_at": current["generated_at"],
"previous_generated_at": previous["generated_at"],
"has_baseline": True,
"has_regressions": regression_count > 0,
"regression_count": regression_count,
"status": "regression-detected" if regression_count > 0 else "ok",
"score_manifest_deltas": score_manifest_deltas,
"category_score_deltas": category_score_deltas,
"failure_bucket_score_deltas": failure_bucket_score_deltas,
"execution_language_pass_rate_deltas": execution_language_pass_rate_deltas,
"execution_language_score_deltas": execution_language_score_deltas,
"category_execution_pass_rate_deltas": category_execution_pass_rate_deltas,
"memory_quality_score_deltas": memory_quality_score_deltas,
"memory_quality_pass_rate_deltas": memory_quality_pass_rate_deltas,
"regressions": regressions,
}
def select_chat_benchmark_cases(
cases: list[ChatBenchmarkCase],
*,
levels: list[str] | tuple[str, ...],
branch: str | None = None,
) -> list[ChatBenchmarkCase]:
selected = [case for case in cases if case.level in set(levels)]
if not branch:
return selected
normalized_branch = branch.strip().lower()
return [
case
for case in selected
if not case.branches
or normalized_branch in {item.strip().lower() for item in case.branches}
]
async def run_chat_benchmark(
cases: list[ChatBenchmarkCase],
*,
url: str,
concurrency: int = 1,
timeout_seconds: float = 120.0,
transport: httpx.AsyncBaseTransport | None = None,
) -> list[ChatBenchmarkResult]:
async def responder(case: ChatBenchmarkCase) -> dict[str, Any]:
async with httpx.AsyncClient(
timeout=timeout_seconds,
follow_redirects=True,
transport=transport,
) as client:
response = await client.post(
url,
json={
"message": case.message,
"history": list(case.history),
"vision_context": case.vision_context,
"profile": case.profile,
"persona_id": case.persona_id,
"session_id": case.session_id,
},
)
response.raise_for_status()
return response.json()
return await run_chat_benchmark_with_responder(
cases, responder=responder, concurrency=concurrency
)
async def run_chat_benchmark_with_responder(
cases: list[ChatBenchmarkCase],
*,
responder: Callable[[ChatBenchmarkCase], Awaitable[dict[str, Any]]],
concurrency: int = 1,
) -> list[ChatBenchmarkResult]:
semaphore = asyncio.Semaphore(max(1, concurrency))
async def run_case(case: ChatBenchmarkCase) -> ChatBenchmarkResult:
async with semaphore:
return await _execute_case(responder, case)
return await asyncio.gather(*(run_case(case) for case in cases))
def benchmark_result_to_dict(result: ChatBenchmarkResult) -> dict[str, Any]:
payload = asdict(result)
if result.eval is not None:
payload["eval"] = asdict(result.eval)
payload["eval"]["overall"] = result.eval.overall
return payload
def _normalize_optional_text(value: Any) -> str | None:
normalized = str(value or "").strip()
return normalized or None
def _normalize_text_list(value: Any) -> tuple[str, ...]:
if not isinstance(value, list):
return ()
return tuple(str(item).strip() for item in value if str(item).strip())
def _normalize_optional_mapping(value: Any) -> dict[str, Any] | None:
if not isinstance(value, dict):
return None
return dict(value)
async def _execute_case(
responder: Callable[[ChatBenchmarkCase], Awaitable[dict[str, Any]]],
case: ChatBenchmarkCase,
) -> ChatBenchmarkResult:
started_at = perf_counter()
try:
data = await responder(case)
latency_ms = int((perf_counter() - started_at) * 1000)
response_text = str(data.get("response", "")).strip()
execution_result = (
evaluate_code_response(
response_text,
CodeExecutionSpec(
language=case.execution_language,
test_code=case.execution_test_code or "",
timeout_seconds=case.execution_timeout_seconds,
compile_only=case.execution_compile_only,
),
)
if case.execution_language
else None
)
grounding_ok = _grounding_requirements_met(data, case)
eval_result = evaluate_chat_case(
ChatEvalCase(
name=case.name,
prompt=case.message,
response=response_text,
persona_title=str(data.get("persona_title") or "Core Assistant"),
reference_answer=case.reference_answer or "",
reference_facts=case.reference_facts,
expected_terms=case.expected_terms,
forbidden_terms=case.forbidden_terms,
history_turns=len(case.history),
expects_code=case.expects_code,
level=case.level,
difficulty=case.difficulty,
category=case.category,
failure_bucket=case.failure_bucket,
risk_level=case.risk_level,
production_like=case.production_like,
)
)
execution_ok = (
execution_result is None or not execution_result.available or execution_result.passed
)
ok = execution_ok and grounding_ok
return ChatBenchmarkResult(
name=case.name,
ok=ok,
latency_ms=latency_ms,
status_code=int(data.get("status_code", 200) or 200),
response=response_text,
model=str(data.get("model", "")).strip(),
tokens_used=int(data.get("tokens_used", 0) or 0),
eval=eval_result,
execution=execution_result,
grounding_required=_case_requires_grounding(case),
grounding_ok=grounding_ok,
error=_failure_summary(execution_result=execution_result, grounding_ok=grounding_ok),
tags=case.tags,
level=case.level,
difficulty=case.difficulty,
category=case.category,
failure_bucket=case.failure_bucket,
risk_level=case.risk_level,
production_like=case.production_like,
execution_language=case.execution_language,
)
except (httpx.HTTPError, ValueError, TypeError) as exc:
latency_ms = int((perf_counter() - started_at) * 1000)
status_code = getattr(getattr(exc, "response", None), "status_code", None)
return ChatBenchmarkResult(
name=case.name,
ok=False,
latency_ms=latency_ms,
status_code=status_code,
response="",
model="",
tokens_used=0,
eval=None,
execution=None,
grounding_required=_case_requires_grounding(case),
grounding_ok=False,
error=str(exc),
tags=case.tags,
level=case.level,
difficulty=case.difficulty,
category=case.category,
failure_bucket=case.failure_bucket,
risk_level=case.risk_level,
production_like=case.production_like,
execution_language=case.execution_language,
)
def _case_requires_grounding(case: ChatBenchmarkCase) -> bool:
return (
case.min_tool_steps > 0
or case.min_grounding_sources > 0
or bool(case.expected_grounding_terms)
)
def _grounding_requirements_met(data: dict[str, Any], case: ChatBenchmarkCase) -> bool:
if not _case_requires_grounding(case):
return True
tool_trace = data.get("tool_trace")
if not isinstance(tool_trace, dict):
return False
steps = tool_trace.get("steps")
grounding_sources = tool_trace.get("grounding_sources")
if case.min_tool_steps > 0 and (
not isinstance(steps, list) or len(steps) < case.min_tool_steps
):
return False
if case.min_grounding_sources > 0 and (
not isinstance(grounding_sources, list)
or len(grounding_sources) < case.min_grounding_sources
):
return False
if not case.expected_grounding_terms:
return True
haystacks: list[str] = []
if isinstance(grounding_sources, list):
for source in grounding_sources:
if not isinstance(source, dict):
continue
haystacks.extend(
[
str(source.get("label", "")).lower(),
str(source.get("uri", "")).lower(),
str(source.get("snippet", "")).lower(),
]
)
combined = "\n".join(haystacks)
return all(term.lower() in combined for term in case.expected_grounding_terms)
def _failure_summary(
*, execution_result: CodeExecutionResult | None, grounding_ok: bool
) -> str | None:
if execution_result is not None and execution_result.available and not execution_result.passed:
return execution_result.summary
if not grounding_ok:
return "grounding requirements not met"
return None
def _mean_eval_metric(evals: list[ChatEvalResult], field: str) -> float:
if not evals:
return 0.0
return round(mean(float(getattr(item, field, 0.0)) for item in evals), 3)
def _group_average_scores(
results: list[ChatBenchmarkResult],
*,
key: Callable[[ChatBenchmarkResult], str],
) -> dict[str, float]:
buckets: dict[str, list[float]] = {}
for result in results:
if result.eval is None:
continue
bucket = key(result).strip()
if not bucket:
continue
buckets.setdefault(bucket, []).append(result.eval.overall)
return {bucket: round(mean(scores), 3) for bucket, scores in sorted(buckets.items()) if scores}
def _group_case_pass_rates(
results: list[ChatBenchmarkResult],
*,
key: Callable[[ChatBenchmarkResult], str],
) -> dict[str, float]:
buckets: dict[str, list[float]] = {}
for result in results:
bucket = key(result).strip()
if not bucket:
continue
buckets.setdefault(bucket, []).append(1.0 if result.ok else 0.0)
return {bucket: round(mean(scores), 3) for bucket, scores in sorted(buckets.items()) if scores}
def _group_tag_average_scores(results: list[ChatBenchmarkResult]) -> dict[str, float]:
buckets: dict[str, list[float]] = {}
for result in results:
if result.eval is None:
continue
for tag in result.tags:
normalized = str(tag).strip()
if normalized:
buckets.setdefault(normalized, []).append(result.eval.overall)
return {bucket: round(mean(scores), 3) for bucket, scores in sorted(buckets.items()) if scores}
def _group_tag_pass_rates(results: list[ChatBenchmarkResult]) -> dict[str, float]:
buckets: dict[str, list[float]] = {}
for result in results:
for tag in result.tags:
normalized = str(tag).strip()
if normalized:
buckets.setdefault(normalized, []).append(1.0 if result.ok else 0.0)
return {bucket: round(mean(scores), 3) for bucket, scores in sorted(buckets.items()) if scores}
def _group_execution_pass_rates(
results: list[ChatBenchmarkResult],
*,
key: Callable[[ChatBenchmarkResult], str],
) -> dict[str, float]:
buckets: dict[str, list[float]] = {}
for result in results:
if result.execution is None or not result.execution.available:
continue
bucket = key(result).strip()
if not bucket:
continue
buckets.setdefault(bucket, []).append(1.0 if result.execution.passed else 0.0)
return {bucket: round(mean(scores), 3) for bucket, scores in sorted(buckets.items()) if scores}
def _is_tool_use_result(result: ChatBenchmarkResult) -> bool:
if result.grounding_required:
return True
normalized_tags = {str(tag).strip().lower() for tag in result.tags}
return any(tag in normalized_tags for tag in TOOL_USE_TAGS)
def _is_multimodal_result(result: ChatBenchmarkResult) -> bool:
normalized_category = result.category.strip().lower()
if normalized_category in MULTIMODAL_CATEGORIES:
return True
normalized_tags = {str(tag).strip().lower() for tag in result.tags}
return any(tag in normalized_tags for tag in MULTIMODAL_TAGS)
def _is_hallucination_incident(result: ChatBenchmarkResult) -> bool:
if result.eval is None:
return False
if result.eval.factuality < HALLUCINATION_FACTUALITY_THRESHOLD:
return True
judge = result.eval.judge
if judge is None:
return False
return (
result.category in {"grounding", "factuality", "multimodal"} and not judge.grounding.passed
)
def _latency_percentile_ms(
results: list[ChatBenchmarkResult],
*,
percentile: float,
) -> float:
if not results:
return 0.0
latencies = sorted(result.latency_ms for result in results)
index = max(0, min(len(latencies) - 1, round((len(latencies) - 1) * (percentile / 100.0))))
return round(float(latencies[index]), 1)
def _summarize_judge_results(evals: list[ChatEvalResult]) -> dict[str, Any]:
dimension_names = (
"task_completion",
"instruction_following",
"grounding",
"safety",
"multi_turn_continuity",
"code_quality",
"regression_risk",
)
judge_results = [item.judge for item in evals if item.judge is not None]
if not judge_results:
return {
"overall": 0.0,
"pass_rate": 0.0,
"dimension_scores": {name: 0.0 for name in dimension_names},
"failure_reasons": {},
}
failure_reasons: dict[str, int] = {}
for judge in judge_results:
for reason in judge.failure_reasons:
failure_reasons[reason] = failure_reasons.get(reason, 0) + 1
return {
"overall": round(mean(judge.overall for judge in judge_results), 3),
"pass_rate": round(
sum(1 for judge in judge_results if judge.passed) / len(judge_results),
3,
),
"dimension_scores": {
name: round(
mean(float(getattr(judge, name).score) for judge in judge_results),
3,
)
for name in dimension_names
},
"failure_reasons": dict(sorted(failure_reasons.items())),
}
def _utc_timestamp() -> str:
return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def _normalize_metric_map(value: Any) -> dict[str, float]:
if not isinstance(value, dict):
return {}
normalized: dict[str, float] = {}
for key, item in value.items():
try:
normalized[str(key)] = round(float(item), 3)
except (TypeError, ValueError):
continue
return normalized
def _extract_benchmark_reference_metrics(reference: dict[str, Any] | None) -> dict[str, Any] | None:
if not isinstance(reference, dict):
return None
return {
"benchmark_name": str(reference.get("benchmark_name", "")).strip(),
"branch": str(reference.get("branch", "")).strip(),
"model": str(reference.get("model", "")).strip(),
"generated_at": str(reference.get("generated_at") or _utc_timestamp()),
"score_manifest": _normalize_metric_map(reference.get("score_manifest")),
"category_scores": _normalize_metric_map(reference.get("category_scores")),
"failure_bucket_scores": _normalize_metric_map(reference.get("failure_bucket_scores")),
"tag_scores": _normalize_metric_map(reference.get("tag_scores")),
"tag_pass_rates": _normalize_metric_map(reference.get("tag_pass_rates")),
"execution_language_pass_rates": _normalize_metric_map(
reference.get("execution_language_pass_rates")
),
"execution_language_scores": _normalize_metric_map(
reference.get("execution_language_scores")
),
"category_execution_pass_rates": _normalize_metric_map(
reference.get("category_execution_pass_rates")
),
"memory_quality_scores": _normalize_metric_map(reference.get("memory_quality_scores")),
"memory_quality_pass_rates": _normalize_metric_map(
reference.get("memory_quality_pass_rates")
),
"risk_level_scores": _normalize_metric_map(reference.get("risk_level_scores")),
"average_overall_score": float(reference.get("average_overall_score", 0.0) or 0.0),
"average_latency_ms": float(reference.get("average_latency_ms", 0.0) or 0.0),
"average_tokens_used": float(reference.get("average_tokens_used", 0.0) or 0.0),
"success_rate": float(reference.get("success_rate", 0.0) or 0.0),
"execution_pass_rate": float(reference.get("execution_pass_rate", 0.0) or 0.0),
"grounding_pass_rate": float(reference.get("grounding_pass_rate", 0.0) or 0.0),
"memory_retrieval_pass_rate": float(
reference.get("memory_retrieval_pass_rate", 0.0) or 0.0
),
"production_like_pass_rate": float(reference.get("production_like_pass_rate", 0.0) or 0.0),
"human_eval_cadence": str(reference.get("human_eval_cadence", "") or ""),
"total_cases": int(reference.get("total_cases", 0) or 0),
"failed_cases": int(reference.get("failed_cases", 0) or 0),
}
def _build_benchmark_history_run(manifest: dict[str, Any]) -> dict[str, Any]:
extracted = _extract_benchmark_reference_metrics(manifest)
if extracted is None:
raise ValueError("Benchmark manifest history snapshotam jābūt objektam.")
return extracted
def _coerce_history_runs(history: dict[str, Any] | None) -> list[dict[str, Any]]:
if not isinstance(history, dict):
return []
runs = history.get("runs")
if not isinstance(runs, list):
return []
return [dict(item) for item in runs if isinstance(item, dict)]
def _same_benchmark_identity(left: dict[str, Any], right: dict[str, Any]) -> bool:
return (
str(left.get("benchmark_name", "")).strip() == str(right.get("benchmark_name", "")).strip()
and str(left.get("branch", "")).strip() == str(right.get("branch", "")).strip()
and str(left.get("model", "")).strip() == str(right.get("model", "")).strip()
and str(left.get("generated_at", "")).strip() == str(right.get("generated_at", "")).strip()
)
def _select_previous_history_baseline(
runs: list[dict[str, Any]], current_run: dict[str, Any]
) -> dict[str, Any] | None:
if not runs:
return None
if _same_benchmark_identity(runs[-1], current_run):
return runs[-2] if len(runs) > 1 else None
return runs[-1]
def _calculate_metric_deltas(
current_metrics: dict[str, float],
previous_metrics: dict[str, float],
) -> dict[str, dict[str, float]]:
shared_keys = sorted(set(current_metrics) & set(previous_metrics))
return {
key: {
"current": round(current_metrics[key], 3),
"previous": round(previous_metrics[key], 3),
"delta": round(current_metrics[key] - previous_metrics[key], 3),
}
for key in shared_keys
}
def _filter_negative_deltas(
deltas: dict[str, dict[str, float]],
) -> dict[str, dict[str, float]]:
return {
key: payload
for key, payload in deltas.items()
if float(payload.get("delta", 0.0) or 0.0) < 0.0
}
def _resolve_human_eval_cadence(
*,
benchmark_name: str,
branch: str,
human_eval_summary: dict[str, Any] | None,
) -> str:
if isinstance(human_eval_summary, dict):
cadence = str(human_eval_summary.get("cadence", "") or "").strip()
if cadence:
return cadence
normalized_benchmark = benchmark_name.strip().lower()
normalized_branch = branch.strip().lower()
if "memory" in normalized_benchmark or normalized_branch == "master":
return "weekly + pre-release"
if normalized_branch in {"coder", "planner"}:
return "per release"
return "per release"
def _build_benchmark_trend_summary(runs: list[dict[str, Any]]) -> dict[str, dict[str, Any]]:
if not runs:
return {}
summary: dict[str, dict[str, Any]] = {}
for metric in TREND_TRACKED_METRICS:
recent_values = [_history_metric_value(run, metric) for run in runs]
recent_values = [round(value, 3) for value in recent_values if value is not None]
if not recent_values:
continue
latest = recent_values[-1]
baseline = recent_values[0]
summary[metric] = {
"latest": latest,
"baseline": baseline,
"delta": round(latest - baseline, 3),
"recent_values": recent_values[-5:],
}
return summary
def _history_metric_value(run: dict[str, Any], metric: str) -> float | None:
if metric in run:
try:
return float(run[metric])
except (TypeError, ValueError):
return None
score_manifest = run.get("score_manifest")
if isinstance(score_manifest, dict) and metric in score_manifest:
try:
return float(score_manifest[metric])
except (TypeError, ValueError):
return None
return None