Spaces:
Running
Running
| from __future__ import annotations | |
| import hashlib | |
| import os | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import threading | |
| from typing import Any | |
| import uuid | |
| import portalocker | |
| from data.benchmark import load_benchmark_payload | |
| from data.dataset_config import ( | |
| DEFAULT_DATASET_KEY, | |
| compute_dataset_signature, | |
| dataset_template_bytes, | |
| delete_uploaded_dataset_with_artifacts, | |
| discover_datasets, | |
| resolve_results_paths, | |
| save_uploaded_dataset, | |
| ) | |
| from model_identity import ( | |
| CLOUD_SOURCE, | |
| LOCAL_SOURCE, | |
| model_ref_from_record, | |
| resolve_model_host, | |
| split_model_ref, | |
| to_model_ref, | |
| ) | |
| from mode_selection import normalize_selected_models | |
| from runner import get_runner | |
| from scoring import evaluate_response, normalize_reason_text | |
| from storage import ( | |
| compute_model_metrics, | |
| load_results, | |
| prepare_results_excel, | |
| prepare_results_json, | |
| render_results_markdown, | |
| save_results, | |
| upsert_result, | |
| ) | |
| ROOT = Path(__file__).resolve().parent | |
| DATA_DIR = ROOT / "data" | |
| BENCHMARK_PATH = DATA_DIR / "benchmark.json" | |
| UPLOADED_DATASETS_DIR = DATA_DIR / "uploaded_datasets" | |
| LOCK_PATH = DATA_DIR / ".persistence.lock" | |
| TABLE_EXPORT_MODEL_LEADERBOARD = "model_leader_board" | |
| TABLE_EXPORT_CATEGORY_PERFORMANCE = "category_level_model_performance" | |
| TABLE_EXPORT_HARDNESS_PERFORMANCE = "hardness_level_model_performance" | |
| TABLE_EXPORT_QUESTION_PERFORMANCE = "question_level_model_performance" | |
| TABLE_EXPORT_RESPONSE_PERFORMANCE = "response_level_model_performance" | |
| _PERSISTED_RUN_ENTRY_KEYS: set[str] = set() | |
| _PERSISTED_RUN_ENTRY_KEYS_LOCK = threading.Lock() | |
| def _evaluation_label(status: str) -> str: | |
| normalized_status = str(status or "").strip() | |
| return { | |
| "success": "Successful", | |
| "fail": "Fail", | |
| "manual_review": "Needs Review", | |
| }.get(normalized_status, normalized_status or "Unknown") | |
| def _evaluation_method_label(auto_scored: Any) -> str: | |
| return "Automatic" if bool(auto_scored) else "Manual" | |
| def _normalized_result_row(record: dict[str, Any]) -> dict[str, Any]: | |
| normalized = dict(record) | |
| model_ref = model_ref_from_record(normalized) | |
| if not model_ref: | |
| return normalized | |
| model_name, source = split_model_ref(model_ref) | |
| normalized["model"] = model_ref | |
| normalized["model_source"] = source | |
| normalized["model_name"] = model_name | |
| if not str(normalized.get("model_host", "") or "").strip(): | |
| normalized["model_host"] = resolve_model_host(source) | |
| prompt_tokens = _optional_int(normalized.get("prompt_tokens")) | |
| generated_tokens = _optional_int(normalized.get("generated_tokens")) | |
| generated_tokens_estimated = normalized.get("generated_tokens_estimated") | |
| if generated_tokens is None: | |
| generated_tokens = _estimate_generated_tokens(str(normalized.get("response", "") or "")) | |
| normalized["generated_tokens_estimated"] = True | |
| elif isinstance(generated_tokens_estimated, bool): | |
| normalized["generated_tokens_estimated"] = generated_tokens_estimated | |
| else: | |
| normalized["generated_tokens_estimated"] = prompt_tokens is None | |
| normalized["generated_tokens"] = generated_tokens | |
| if prompt_tokens is not None: | |
| normalized["prompt_tokens"] = prompt_tokens | |
| normalized["evaluation"] = _evaluation_label(str(normalized.get("status", "") or "")) | |
| normalized["evaluation_method"] = _evaluation_method_label(normalized.get("auto_scored")) | |
| return normalized | |
| def _normalized_result_rows(rows: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| return [_normalized_result_row(row) for row in rows] | |
| def _persist_key_for_entry(snapshot: dict[str, Any], entry: dict[str, Any]) -> str: | |
| session_id = str(snapshot.get("session_id", "") or "").strip() | |
| run_id = int(snapshot.get("run_id", 0) or 0) | |
| question_id = str(snapshot.get("question_id", "") or "").strip() | |
| model_ref = model_ref_from_record({"model": entry.get("model", ""), "model_source": entry.get("source", "")}) | |
| return f"{session_id}:{run_id}:{question_id}:{model_ref}" | |
| def _is_entry_persisted(persist_key: str) -> bool: | |
| with _PERSISTED_RUN_ENTRY_KEYS_LOCK: | |
| return persist_key in _PERSISTED_RUN_ENTRY_KEYS | |
| def _mark_entries_persisted(persist_keys: list[str]) -> None: | |
| if not persist_keys: | |
| return | |
| with _PERSISTED_RUN_ENTRY_KEYS_LOCK: | |
| _PERSISTED_RUN_ENTRY_KEYS.update(persist_keys) | |
| def _verdict_for_entry(entry: dict[str, Any], expected_answer: str) -> dict[str, Any]: | |
| if bool(entry.get("interrupted")): | |
| return { | |
| "status": "manual_review", | |
| "score": None, | |
| "auto_scored": False, | |
| "reason": "Stopped by user.", | |
| } | |
| error_text = str(entry.get("error", "") or "").strip() | |
| if error_text: | |
| return { | |
| "status": "manual_review", | |
| "score": None, | |
| "auto_scored": False, | |
| "reason": f"Error: {error_text}", | |
| } | |
| return evaluate_response(expected_answer=expected_answer, response=str(entry.get("response", "") or "")) | |
| def _estimate_generated_tokens(response_text: str) -> int: | |
| trimmed = response_text.strip() | |
| if not trimmed: | |
| return 0 | |
| chars = len(trimmed) | |
| words = len([part for part in trimmed.split() if part]) | |
| return max(words, round(chars / 4)) | |
| def _optional_int(value: Any) -> int | None: | |
| if isinstance(value, bool): | |
| return None | |
| if isinstance(value, int): | |
| return value | |
| if isinstance(value, float) and value.is_integer(): | |
| return int(value) | |
| return None | |
| def _persist_completed_run_entries(snapshot: dict[str, Any]) -> None: | |
| run_id = int(snapshot.get("run_id", 0) or 0) | |
| if run_id <= 0: | |
| return | |
| dataset_key = str(snapshot.get("dataset_key", "") or "").strip() | |
| question_id = str(snapshot.get("question_id", "") or "").strip() | |
| session_id = str(snapshot.get("session_id", "") or "").strip() | |
| if not dataset_key or not question_id or not session_id: | |
| return | |
| dataset = _dataset_option_map().get(dataset_key) | |
| if dataset is None: | |
| return | |
| question = next((q for q in dataset["questions"] if str(q.get("id", "") or "") == question_id), None) | |
| if question is None: | |
| return | |
| prompt = str(question.get("prompt", "") or "") | |
| expected_answer = str(question.get("expected_answer", "") or "") | |
| prompt_hash = record_prompt_hash(prompt) if prompt else "" | |
| results_path, results_md_path = resolve_results_paths(dataset_key, DATA_DIR, ROOT) | |
| persisted_keys: list[str] = [] | |
| with portalocker.Lock(str(LOCK_PATH), timeout=10): | |
| rows = load_results(results_path) | |
| changed = False | |
| for entry_raw in snapshot.get("entries", []): | |
| if not isinstance(entry_raw, dict): | |
| continue | |
| if not bool(entry_raw.get("completed")): | |
| continue | |
| entry = dict(entry_raw) | |
| model_ref = model_ref_from_record({"model": entry.get("model", ""), "model_source": entry.get("source", "")}) | |
| if not model_ref: | |
| continue | |
| persist_key = _persist_key_for_entry(snapshot, entry) | |
| if _is_entry_persisted(persist_key): | |
| continue | |
| model_name, source = split_model_ref(model_ref) | |
| host = str(entry.get("host", "") or "").strip() or resolve_model_host(source) | |
| verdict = _verdict_for_entry(entry, expected_answer) | |
| response_text = str(entry.get("response", "") or "") | |
| exact_generated_tokens = _optional_int(entry.get("generated_tokens")) | |
| exact_prompt_tokens = _optional_int(entry.get("prompt_tokens")) | |
| record = { | |
| "dataset_key": dataset_key, | |
| "dataset_signature": dataset["signature"], | |
| "question_prompt_hash": prompt_hash, | |
| "question_id": question_id, | |
| "model": model_ref, | |
| "model_name": model_name, | |
| "model_source": source, | |
| "model_host": host, | |
| "response": response_text, | |
| "status": verdict["status"], | |
| "score": verdict["score"], | |
| "response_time_ms": round(float(entry.get("elapsed_ms", 0.0) or 0.0), 2), | |
| "generated_tokens": exact_generated_tokens if exact_generated_tokens is not None else _estimate_generated_tokens(response_text), | |
| "generated_tokens_estimated": exact_generated_tokens is None, | |
| "timestamp": datetime.now(timezone.utc).isoformat(), | |
| "interrupted": bool(entry.get("interrupted")), | |
| "auto_scored": bool(verdict.get("auto_scored")), | |
| "reason": normalize_reason_text(str(verdict.get("reason", "") or "")), | |
| "evaluation": _evaluation_label(str(verdict["status"])), | |
| "evaluation_method": _evaluation_method_label(verdict.get("auto_scored")), | |
| "run_id": run_id, | |
| "session_id": session_id, | |
| } | |
| if exact_prompt_tokens is not None: | |
| record["prompt_tokens"] = exact_prompt_tokens | |
| rows = upsert_result(rows, record) | |
| persisted_keys.append(persist_key) | |
| changed = True | |
| if changed: | |
| save_results(results_path, rows) | |
| render_results_markdown(dataset["questions"], rows, results_md_path) | |
| _mark_entries_persisted(persisted_keys) | |
| def _dataset_option_map() -> dict[str, dict[str, Any]]: | |
| options = discover_datasets(BENCHMARK_PATH, UPLOADED_DATASETS_DIR) | |
| option_map: dict[str, dict[str, Any]] = {} | |
| for option in options: | |
| path = Path(option["path"]) | |
| payload = load_benchmark_payload(path) | |
| option_map[option["key"]] = { | |
| "key": option["key"], | |
| "label": option["label"], | |
| "is_default": bool(option["is_default"]), | |
| "path": path, | |
| "signature": compute_dataset_signature(path), | |
| "instruction": payload.get("instruction", ""), | |
| "questions": payload.get("questions", []), | |
| } | |
| return option_map | |
| def get_health() -> dict[str, str]: | |
| return {"status": "ok", "version": "v1"} | |
| def get_ollama_auth_status() -> dict[str, bool]: | |
| return {"server_api_key_configured": bool(os.getenv("OLLAMA_API_KEY", "").strip())} | |
| def get_models(*, ollama_api_key: str = "") -> list[str]: | |
| from engine import get_cloud_client, get_local_client, list_models | |
| model_refs: set[str] = set() | |
| cloud_error: Exception | None = None | |
| explicit_cloud_key_supplied = bool(str(ollama_api_key or "").strip()) | |
| try: | |
| cloud_client = get_cloud_client(api_key=ollama_api_key) | |
| for model in list_models(cloud_client, source=CLOUD_SOURCE): | |
| model_ref = to_model_ref(model, CLOUD_SOURCE) | |
| if model_ref: | |
| model_refs.add(model_ref) | |
| except Exception as exc: # noqa: BLE001 | |
| cloud_error = exc | |
| try: | |
| local_client = get_local_client() | |
| for model in list_models(local_client, source=LOCAL_SOURCE): | |
| model_ref = to_model_ref(model, LOCAL_SOURCE) | |
| if model_ref: | |
| model_refs.add(model_ref) | |
| except Exception: | |
| # Local model discovery is best-effort and should not block cloud usage. | |
| pass | |
| if explicit_cloud_key_supplied and cloud_error is not None: | |
| raise RuntimeError(str(cloud_error)) | |
| if model_refs: | |
| return sorted(model_refs) | |
| if cloud_error is not None: | |
| raise RuntimeError(str(cloud_error)) | |
| raise RuntimeError("No models discovered from Ollama cloud/local providers.") | |
| def get_datasets() -> list[dict[str, Any]]: | |
| datasets = [] | |
| for item in _dataset_option_map().values(): | |
| datasets.append( | |
| { | |
| "key": item["key"], | |
| "label": item["label"], | |
| "is_default": item["is_default"], | |
| "signature": item["signature"], | |
| "question_count": len(item["questions"]), | |
| } | |
| ) | |
| datasets.sort(key=lambda row: (not row["is_default"], row["label"].lower())) | |
| return datasets | |
| def get_questions(dataset_key: str) -> dict[str, Any] | None: | |
| dataset = _dataset_option_map().get(dataset_key) | |
| if dataset is None: | |
| return None | |
| return { | |
| "dataset_key": dataset_key, | |
| "instruction": dataset["instruction"], | |
| "questions": dataset["questions"], | |
| } | |
| def get_results(dataset_key: str) -> dict[str, Any] | None: | |
| dataset = _dataset_option_map().get(dataset_key) | |
| if dataset is None: | |
| return None | |
| results_path, _ = resolve_results_paths(dataset_key, DATA_DIR, ROOT) | |
| with portalocker.Lock(str(LOCK_PATH), timeout=10): | |
| rows = _normalized_result_rows(load_results(results_path)) | |
| matrix = _build_matrix(dataset["questions"], rows) | |
| return { | |
| "dataset_key": dataset_key, | |
| "results": rows, | |
| "metrics": compute_model_metrics(rows), | |
| "matrix": matrix, | |
| } | |
| def get_dataset_template() -> bytes: | |
| return dataset_template_bytes() | |
| def upload_dataset(*, filename: str, content: bytes) -> dict[str, Any]: | |
| path = save_uploaded_dataset(UPLOADED_DATASETS_DIR, filename, content) | |
| options = discover_datasets(BENCHMARK_PATH, UPLOADED_DATASETS_DIR) | |
| option = next((item for item in options if Path(item["path"]) == path), None) | |
| if option is None: | |
| raise RuntimeError("Uploaded dataset could not be resolved.") | |
| payload = load_benchmark_payload(path) | |
| return { | |
| "key": option["key"], | |
| "label": option["label"], | |
| "is_default": bool(option["is_default"]), | |
| "signature": compute_dataset_signature(path), | |
| "question_count": len(payload.get("questions", [])), | |
| } | |
| def delete_dataset(dataset_key: str) -> tuple[str, dict[str, Any] | None]: | |
| options = _dataset_option_map() | |
| target = options.get(dataset_key) | |
| if target is None: | |
| return "not_found", None | |
| if target["is_default"] or dataset_key == DEFAULT_DATASET_KEY: | |
| return "default_forbidden", None | |
| summary = delete_uploaded_dataset_with_artifacts(target, DATA_DIR, ROOT) | |
| return "deleted", { | |
| "dataset_key": dataset_key, | |
| "target_count": summary["target_count"], | |
| "deleted_count": summary["deleted_count"], | |
| "missing_count": summary["missing_count"], | |
| } | |
| def export_results(dataset_key: str, export_format: str) -> tuple[bytes, str, str] | None: | |
| dataset = _dataset_option_map().get(dataset_key) | |
| if dataset is None: | |
| return None | |
| results_path, _ = resolve_results_paths(dataset_key, DATA_DIR, ROOT) | |
| with portalocker.Lock(str(LOCK_PATH), timeout=10): | |
| rows = _normalized_result_rows(load_results(results_path)) | |
| stem = "results" if dataset_key == DEFAULT_DATASET_KEY else f"results_{dataset_key}" | |
| if export_format == "json": | |
| return prepare_results_json(rows), "application/json", f"{stem}.json" | |
| if export_format == "xlsx": | |
| return ( | |
| prepare_results_excel(rows), | |
| "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| f"{stem}.xlsx", | |
| ) | |
| raise ValueError("Unsupported export format.") | |
| def export_results_table( | |
| dataset_key: str, | |
| table_key: str, | |
| export_format: str, | |
| ) -> tuple[str, tuple[bytes, str, str] | None]: | |
| dataset = _dataset_option_map().get(dataset_key) | |
| if dataset is None: | |
| return "dataset_not_found", None | |
| results_path, _ = resolve_results_paths(dataset_key, DATA_DIR, ROOT) | |
| with portalocker.Lock(str(LOCK_PATH), timeout=10): | |
| rows = _normalized_result_rows(load_results(results_path)) | |
| if table_key == TABLE_EXPORT_MODEL_LEADERBOARD: | |
| table_rows = _table_rows_model_leader_board(rows) | |
| elif table_key == TABLE_EXPORT_CATEGORY_PERFORMANCE: | |
| table_rows = _table_rows_group_performance(dataset["questions"], rows, group_key="category", fallback_value="GENEL") | |
| elif table_key == TABLE_EXPORT_HARDNESS_PERFORMANCE: | |
| table_rows = _table_rows_group_performance(dataset["questions"], rows, group_key="hardness_level", fallback_value="(missing)") | |
| elif table_key == TABLE_EXPORT_QUESTION_PERFORMANCE: | |
| table_rows = _table_rows_question_performance(dataset["questions"], rows) | |
| elif table_key == TABLE_EXPORT_RESPONSE_PERFORMANCE: | |
| table_rows = rows | |
| else: | |
| return "table_not_supported", None | |
| stem = "results" if dataset_key == DEFAULT_DATASET_KEY else f"results_{dataset_key}" | |
| filename_stem = f"{stem}_{table_key}" | |
| if export_format == "json": | |
| return "ok", (prepare_results_json(table_rows), "application/json", f"{filename_stem}.json") | |
| if export_format == "xlsx": | |
| return ( | |
| "ok", | |
| ( | |
| prepare_results_excel(table_rows), | |
| "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", | |
| f"{filename_stem}.xlsx", | |
| ), | |
| ) | |
| return "format_not_supported", None | |
| def _is_row_in_dataset_scope(row: dict[str, Any], *, dataset_key: str, dataset_signature: str) -> bool: | |
| row_dataset_key = str(row.get("dataset_key", "") or "").strip() | |
| if dataset_key == DEFAULT_DATASET_KEY: | |
| return row_dataset_key in {"", DEFAULT_DATASET_KEY} | |
| if row_dataset_key != dataset_key: | |
| return False | |
| return str(row.get("dataset_signature", "") or "").strip() == dataset_signature | |
| def delete_model_results(*, dataset_key: str, model: str) -> tuple[str, dict[str, Any] | None]: | |
| dataset = _dataset_option_map().get(dataset_key) | |
| if dataset is None: | |
| return "dataset_not_found", None | |
| selected_model_input = model.strip() | |
| selected_model_ref = to_model_ref(selected_model_input) | |
| if not selected_model_ref: | |
| return "invalid_model", None | |
| results_path, results_md_path = resolve_results_paths(dataset_key, DATA_DIR, ROOT) | |
| with portalocker.Lock(str(LOCK_PATH), timeout=10): | |
| rows = load_results(results_path) | |
| kept_rows: list[dict[str, Any]] = [] | |
| deleted_count = 0 | |
| for row in rows: | |
| row_model_ref = model_ref_from_record(row) | |
| if row_model_ref != selected_model_ref: | |
| kept_rows.append(row) | |
| continue | |
| if _is_row_in_dataset_scope(row, dataset_key=dataset_key, dataset_signature=str(dataset["signature"])): | |
| deleted_count += 1 | |
| continue | |
| kept_rows.append(row) | |
| if deleted_count == 0: | |
| return "model_not_found", None | |
| save_results(results_path, kept_rows) | |
| render_results_markdown(dataset["questions"], kept_rows, results_md_path) | |
| return "deleted", { | |
| "dataset_key": dataset_key, | |
| "model": selected_model_input, | |
| "deleted_count": deleted_count, | |
| "remaining_count": sum( | |
| 1 | |
| for row in kept_rows | |
| if model_ref_from_record(row) == selected_model_ref | |
| and _is_row_in_dataset_scope( | |
| row, | |
| dataset_key=dataset_key, | |
| dataset_signature=str(dataset["signature"]), | |
| ) | |
| ), | |
| } | |
| def apply_manual_result_override( | |
| *, | |
| dataset_key: str, | |
| question_id: str, | |
| model: str, | |
| status: str, | |
| reason: str, | |
| ) -> tuple[str, dict[str, Any] | None]: | |
| dataset = _dataset_option_map().get(dataset_key) | |
| if dataset is None: | |
| return "dataset_not_found", None | |
| override_defaults = { | |
| "success": {"score": 1, "reason": "User approval"}, | |
| "fail": {"score": 0, "reason": "User approval"}, | |
| "manual_review": {"score": None, "reason": "Marked by user for manual review"}, | |
| } | |
| selected = override_defaults.get(status) | |
| if selected is None: | |
| return "invalid_status", None | |
| selected_model_ref = to_model_ref(model) | |
| if not selected_model_ref: | |
| return "result_not_found", None | |
| selected_model_name, selected_source = split_model_ref(selected_model_ref) | |
| results_path, results_md_path = resolve_results_paths(dataset_key, DATA_DIR, ROOT) | |
| with portalocker.Lock(str(LOCK_PATH), timeout=10): | |
| rows = load_results(results_path) | |
| existing = next( | |
| ( | |
| row | |
| for row in rows | |
| if str(row.get("question_id", "")) == question_id and model_ref_from_record(row) == selected_model_ref | |
| ), | |
| None, | |
| ) | |
| if existing is None: | |
| return "result_not_found", None | |
| updated = dict(existing) | |
| updated["dataset_key"] = dataset_key | |
| updated["dataset_signature"] = dataset["signature"] | |
| updated["model"] = selected_model_ref | |
| updated["model_name"] = selected_model_name | |
| updated["model_source"] = selected_source | |
| if not str(updated.get("model_host", "") or "").strip(): | |
| updated["model_host"] = resolve_model_host(selected_source) | |
| updated["status"] = status | |
| updated["score"] = selected["score"] | |
| updated["auto_scored"] = False | |
| updated["interrupted"] = False | |
| updated["reason"] = normalize_reason_text(reason.strip() or selected["reason"]) | |
| updated["timestamp"] = datetime.now(timezone.utc).isoformat() | |
| updated["evaluation"] = _evaluation_label(status) | |
| updated["evaluation_method"] = _evaluation_method_label(False) | |
| if not str(updated.get("question_prompt_hash", "")).strip(): | |
| prompt = next( | |
| ( | |
| str(question.get("prompt", "")) | |
| for question in dataset["questions"] | |
| if str(question.get("id", "")) == question_id | |
| ), | |
| "", | |
| ) | |
| if prompt: | |
| updated["question_prompt_hash"] = record_prompt_hash(prompt) | |
| merged = upsert_result(rows, updated) | |
| save_results(results_path, merged) | |
| render_results_markdown(dataset["questions"], merged, results_md_path) | |
| return "updated", updated | |
| def start_run( | |
| *, | |
| session_id: str, | |
| dataset_key: str, | |
| question_id: str, | |
| models: list[str], | |
| system_prompt: str, | |
| ollama_api_key: str = "", | |
| ) -> tuple[int | None, str]: | |
| dataset = _dataset_option_map().get(dataset_key) | |
| if dataset is None: | |
| return None, "dataset_not_found" | |
| question = next((q for q in dataset["questions"] if str(q.get("id", "")) == question_id), None) | |
| if question is None: | |
| return None, "question_not_found" | |
| normalized_models = normalize_selected_models(*models) | |
| if not normalized_models: | |
| return None, "invalid_models" | |
| requires_cloud_access = any(split_model_ref(model_ref)[1] == CLOUD_SOURCE for model_ref in normalized_models) | |
| if requires_cloud_access: | |
| from engine import get_cloud_client | |
| try: | |
| get_cloud_client(api_key=ollama_api_key) | |
| except RuntimeError: | |
| return None, "missing_api_key" | |
| runner = get_runner(session_id) | |
| started = runner.start( | |
| models=normalized_models, | |
| question_id=question_id, | |
| prompt=str(question.get("prompt", "")), | |
| system_prompt=system_prompt, | |
| session_id=session_id, | |
| dataset_key=dataset_key, | |
| trace_id=uuid.uuid4().hex, | |
| ollama_api_key=ollama_api_key, | |
| ) | |
| if not started: | |
| snapshot = runner.snapshot() | |
| conflict_run_id = int(snapshot.get("run_id", 0)) or None | |
| return conflict_run_id, "conflict" | |
| snapshot = runner.snapshot() | |
| return int(snapshot.get("run_id", 0)), "started" | |
| def stop_run(*, session_id: str) -> None: | |
| runner = get_runner(session_id) | |
| runner.request_stop() | |
| def run_snapshot(*, session_id: str) -> dict[str, Any]: | |
| runner = get_runner(session_id) | |
| snapshot = runner.snapshot() | |
| _persist_completed_run_entries(snapshot) | |
| return snapshot | |
| def get_run_status(*, run_id: int, session_id: str) -> dict[str, Any] | None: | |
| snapshot = run_snapshot(session_id=session_id) | |
| if int(snapshot.get("run_id", 0)) != run_id: | |
| return None | |
| entries = snapshot.get("entries", []) | |
| interrupted = any(bool(item.get("interrupted")) for item in entries) | |
| error = next((str(item.get("error", "")) for item in entries if str(item.get("error", "")).strip()), "") | |
| status_entries: list[dict[str, Any]] = [] | |
| for item in entries: | |
| model_ref = model_ref_from_record({"model": item.get("model", ""), "model_source": item.get("source", "")}) | |
| _, source = split_model_ref(model_ref, str(item.get("source", "") or CLOUD_SOURCE)) | |
| host = str(item.get("host", "") or "").strip() or resolve_model_host(source) | |
| status_entry = { | |
| "model": model_ref, | |
| "source": source, | |
| "host": host, | |
| "running": bool(item.get("running")), | |
| "completed": bool(item.get("completed")), | |
| "interrupted": bool(item.get("interrupted")), | |
| "error": str(item.get("error", "")), | |
| "event": str(item.get("event", "")), | |
| "elapsed_ms": float(item.get("elapsed_ms", 0.0)), | |
| } | |
| generated_tokens = _optional_int(item.get("generated_tokens")) | |
| prompt_tokens = _optional_int(item.get("prompt_tokens")) | |
| if generated_tokens is not None: | |
| status_entry["generated_tokens"] = generated_tokens | |
| if prompt_tokens is not None: | |
| status_entry["prompt_tokens"] = prompt_tokens | |
| status_entries.append(status_entry) | |
| return { | |
| "run_id": run_id, | |
| "session_id": str(snapshot.get("session_id", "")), | |
| "dataset_key": str(snapshot.get("dataset_key", "")), | |
| "question_id": str(snapshot.get("question_id", "")), | |
| "running": bool(snapshot.get("running")), | |
| "completed": bool(snapshot.get("completed")), | |
| "interrupted": interrupted, | |
| "error": error, | |
| "entries": status_entries, | |
| } | |
| def _build_matrix(questions: list[dict[str, Any]], results: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| models = sorted({model_ref_from_record(row) for row in results if model_ref_from_record(row)}) | |
| indexed = { | |
| (str(row.get("question_id", "")), model_ref_from_record(row)): row | |
| for row in results | |
| if model_ref_from_record(row) | |
| } | |
| matrix: list[dict[str, Any]] = [] | |
| for question in questions: | |
| question_id = str(question.get("id", "")) | |
| row = {"question_id": question_id, "category": question.get("category", "GENEL"), "cells": {}} | |
| for model in models: | |
| row["cells"][model] = _format_matrix_cell(indexed.get((question_id, model))) | |
| matrix.append(row) | |
| return matrix | |
| def _table_rows_model_leader_board(results: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| metrics = compute_model_metrics(results) | |
| output: list[dict[str, Any]] = [] | |
| for row in metrics: | |
| median_ms = row.get("median_ms") | |
| median_seconds = round(float(median_ms) / 1000.0, 2) if median_ms is not None else None | |
| avg_generated_tokens = row.get("avg_generated_tokens") | |
| output.append( | |
| { | |
| "model": str(row.get("model", "")), | |
| "accuracy_percent": round(float(row.get("accuracy_percent", 0.0)), 1), | |
| "speed_score": round(float(row.get("latency_score", 0.0)), 1), | |
| "success_scored": f"{int(row.get('success_count', 0))}/{int(row.get('scored_count', 0))}", | |
| "median_seconds": median_seconds, | |
| "avg_generated_tokens": round(float(avg_generated_tokens), 1) if avg_generated_tokens is not None else None, | |
| } | |
| ) | |
| return output | |
| def _table_rows_group_performance( | |
| questions: list[dict[str, Any]], | |
| results: list[dict[str, Any]], | |
| *, | |
| group_key: str, | |
| fallback_value: str, | |
| ) -> list[dict[str, Any]]: | |
| question_to_group: dict[str, str] = {} | |
| group_counts: dict[str, int] = {} | |
| for question in questions: | |
| question_id = str(question.get("id", "")).strip() | |
| group_value = str(question.get(group_key, "")).strip() or fallback_value | |
| if question_id: | |
| question_to_group[question_id] = group_value | |
| group_counts[group_value] = group_counts.get(group_value, 0) + 1 | |
| models = sorted({model_ref_from_record(row) for row in results if model_ref_from_record(row)}) | |
| counters: dict[str, dict[str, dict[str, int]]] = {} | |
| for row in results: | |
| model = model_ref_from_record(row) | |
| question_id = str(row.get("question_id", "")).strip() | |
| status = str(row.get("status", "")).strip() | |
| if not model or not question_id: | |
| continue | |
| group_value = question_to_group.get(question_id, fallback_value) | |
| group_bucket = counters.setdefault(group_value, {}) | |
| model_bucket = group_bucket.setdefault(model, {"success": 0, "scored": 0}) | |
| if status in {"success", "fail"}: | |
| model_bucket["scored"] += 1 | |
| if status == "success": | |
| model_bucket["success"] += 1 | |
| output: list[dict[str, Any]] = [] | |
| for group_value in sorted(group_counts.keys()): | |
| row: dict[str, Any] = { | |
| group_key: group_value, | |
| "questions": int(group_counts.get(group_value, 0)), | |
| } | |
| for model in models: | |
| model_counter = counters.get(group_value, {}).get(model, {"success": 0, "scored": 0}) | |
| scored = int(model_counter.get("scored", 0)) | |
| if scored == 0: | |
| row[model] = None | |
| else: | |
| row[model] = round((100.0 * int(model_counter.get("success", 0))) / scored, 1) | |
| output.append(row) | |
| return output | |
| def _table_rows_question_performance(questions: list[dict[str, Any]], results: list[dict[str, Any]]) -> list[dict[str, Any]]: | |
| matrix = _build_matrix(questions, results) | |
| output: list[dict[str, Any]] = [] | |
| for row in matrix: | |
| item: dict[str, Any] = { | |
| "question_id": str(row.get("question_id", "")), | |
| "category": str(row.get("category", "")), | |
| } | |
| cells = row.get("cells", {}) | |
| if isinstance(cells, dict): | |
| for model, value in cells.items(): | |
| item[str(model)] = value | |
| output.append(item) | |
| return output | |
| def _format_matrix_cell(record: dict[str, Any] | None) -> str: | |
| if not record: | |
| return "-" | |
| status = str(record.get("status", "manual_review")) | |
| icon = {"success": "✅", "fail": "❌", "manual_review": "🟡"}.get(status, "🟡") | |
| latency = record.get("response_time_ms") | |
| generated_tokens = _optional_int(record.get("generated_tokens")) | |
| token_suffix = "" | |
| if generated_tokens is not None: | |
| token_suffix = f" | {generated_tokens} tok" | |
| if record.get("generated_tokens_estimated") is True: | |
| token_suffix += " (est.)" | |
| if latency is None: | |
| return f"{icon}{token_suffix}" if token_suffix else icon | |
| return f"{icon} {float(latency) / 1000.0:.2f}s{token_suffix}" | |
| def record_prompt_hash(prompt: str) -> str: | |
| return hashlib.sha256(prompt.encode("utf-8")).hexdigest()[:16] | |