figment / scripts /run_eval.py
ThomsenDrake's picture
Sync submission-ready runtime and docs
2d63573 verified
Raw
History Blame Contribute Delete
42.2 kB
"""Deterministic Figment eval runner."""
from __future__ import annotations
import argparse
from dataclasses import replace
import json
import os
from pathlib import Path
import sys
from time import perf_counter
from typing import Any
import urllib.error
import urllib.parse
import urllib.request
PROJECT_ROOT = Path(__file__).resolve().parents[1]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from figment.config import FigmentConfig # noqa: E402
from figment.eval_metrics import score_expected_labels, score_handoff_readiness, summarize_eval_records # noqa: E402
from figment.field_provenance import ( # noqa: E402
DETERMINISTIC_FALLBACK,
MODEL_REPAIRED,
accepted_raw_fields_from_failures,
deterministic_field_provenance,
has_deterministic_patches,
merge_field_provenance,
model_raw_field_provenance,
) # noqa: E402
from figment.focused_repair import build_focused_repair_prompts, missing_mandatory_source_cards # noqa: E402
from figment.harness_evidence import build_harness_evidence # noqa: E402
from figment.model_client import ModelClient, ModelClientError, canned_navigator_output # noqa: E402
from figment.observation_targets import ( # noqa: E402
NavigationScaffoldResult,
apply_navigation_scaffolding,
required_observation_targets,
)
from figment.prompt_builder import build_prompt # noqa: E402
from figment.retrieval import known_card_ids, query_from_intake, search_protocol_cards # noqa: E402
from figment.rules import run_red_flag_checks # noqa: E402
from figment.trace import derive_model_route, stable_hash # noqa: E402
from figment.validators import urgency_floor_from_rules, validate_navigator_output # noqa: E402
DEFAULT_CASE_GLOB = "data/eval/*.jsonl"
REAL_LLAMA_CPP_EVAL_COMMAND = (
"FIGMENT_MODE=local MODEL_STACK=local_4b_parakeet MODEL_BACKEND=llama_cpp "
"LOCAL_MODEL_ID=nvidia/NVIDIA-Nemotron-3-Nano-4B-BF16 "
"LLAMA_BASE_URL=http://127.0.0.1:8001/v1 PYTHON_DOTENV_DISABLED=true "
"python3 scripts/run_eval.py --backend llama_cpp --model-stack local_4b_parakeet "
"--cases data/eval/initial_handwritten_cases.jsonl "
"--cases data/eval/adversarial_strict_cases.jsonl "
"--cases data/eval/comprehensive_hosted_cases.jsonl "
"--output traces/local_llama_cpp_eval_$(date -u +%Y%m%dT%H%M%SZ).jsonl"
)
def load_cases(case_paths: list[Path]) -> list[dict[str, Any]]:
cases: list[dict[str, Any]] = []
for path in case_paths:
for line_number, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
if not line.strip():
continue
case = json.loads(line)
case["_case_path"] = str(path)
case["_case_line"] = line_number
cases.append(case)
return cases
def run_eval(
*,
case_paths: list[Path],
output_path: Path | None,
config: FigmentConfig,
limit: int | None = None,
) -> dict[str, Any]:
cases = load_cases(case_paths)
if limit is not None:
cases = cases[: max(0, limit)]
records = [_evaluate_case(case, config) for case in cases]
if output_path is not None:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(
"".join(f"{json.dumps(record, sort_keys=True)}\n" for record in records),
encoding="utf-8",
)
else:
for record in records:
sys.stdout.write(f"{json.dumps(record, sort_keys=True)}\n")
summary = _summarize(records, config, case_paths, output_path)
if output_path is not None:
_write_eval_bundle_metadata(summary, records, config, case_paths, output_path)
return summary
def _evaluate_case(case: dict[str, Any], config: FigmentConfig) -> dict[str, Any]:
started = perf_counter()
intake = case["structured_intake"]
rule_results = [rule.to_dict() for rule in run_red_flag_checks(intake)]
floor = urgency_floor_from_rules(rule_results)
query = query_from_intake(intake)
retrieved = search_protocol_cards(query, limit=6)
retrieved_ids = [str(item.get("card_id", "")) for item in retrieved if item.get("card_id")]
prompt, prompt_hash = build_prompt(intake, retrieved, rule_results, floor)
known_cards = known_card_ids()
raw_output: dict[str, Any] | None = None
repaired_output: dict[str, Any] | None = None
fallback_output: dict[str, Any] | None = None
raw_validation = {"passed": False, "failures": ["configured model not attempted for canned backend"]}
repair_validation = {"passed": False, "failures": ["repair not attempted"]}
fallback_validation = {"passed": False, "failures": ["fallback not used"]}
raw_attempted = config.model_backend != "canned"
repair_attempted = False
fallback_used = False
fallback_reason: str | None = None
competence_repair_attempted = False
competence_repair_success = False
competence_repair_scope: str | None = None
competence_repaired_output: dict[str, Any] | None = None
competence_repair_validation = {"passed": False, "failures": ["competence repair not attempted"]}
scaffolded_model_output: dict[str, Any] | None = None
handoff_readiness_before: dict[str, Any] | None = None
handoff_readiness_after: dict[str, Any] | None = None
final_output: dict[str, Any]
final_validation: dict[str, Any]
field_provenance: dict[str, str] = {}
scaffold_patched_fields: set[str] = set()
filled_required_observation_ids: list[str] = []
model_selected_required_observation_ids: list[str] = []
invalid_selected_required_observation_ids: list[str] = []
stripped_trace_only_fields: list[str] = []
context = {
"intake": intake,
"rule_results": rule_results,
"retrieved_cards": retrieved,
"urgency_floor": floor,
}
if config.model_backend == "canned":
fallback_reason = "canned_backend"
fallback_used = True
fallback_output, fallback_validation, fallback_scaffold = _run_fallback(
intake,
rule_results,
retrieved,
floor,
known_cards,
retrieved_ids,
)
_absorb_scaffold_trace(
fallback_scaffold,
scaffold_patched_fields=scaffold_patched_fields,
filled_required_observation_ids=filled_required_observation_ids,
model_selected_required_observation_ids=model_selected_required_observation_ids,
invalid_selected_required_observation_ids=invalid_selected_required_observation_ids,
stripped_trace_only_fields=stripped_trace_only_fields,
)
final_output = fallback_output
final_validation = fallback_validation
field_provenance = deterministic_field_provenance()
else:
client = ModelClient(config)
try:
raw_output = client.generate_json(prompt, context)
scaffold_result = apply_navigation_scaffolding(
raw_output,
retrieved_cards=retrieved,
rule_results=rule_results,
urgency_floor=floor,
confirmed_intake=intake,
)
scaffolded_model_output = scaffold_result.output
_absorb_scaffold_trace(
scaffold_result,
scaffold_patched_fields=scaffold_patched_fields,
filled_required_observation_ids=filled_required_observation_ids,
model_selected_required_observation_ids=model_selected_required_observation_ids,
invalid_selected_required_observation_ids=invalid_selected_required_observation_ids,
stripped_trace_only_fields=stripped_trace_only_fields,
)
raw_validation = _validate_output(
scaffolded_model_output,
known_cards,
floor,
intake,
rule_results,
retrieved,
retrieved_ids,
)
except ModelClientError as exc:
raw_validation = {"passed": False, "failures": [f"model backend error: {exc}"]}
fallback_reason = "model_backend_error"
if scaffolded_model_output is not None and raw_validation["passed"]:
final_output = scaffolded_model_output
final_validation = raw_validation
field_provenance = model_raw_field_provenance()
_mark_deterministic_patch_fields(field_provenance, scaffold_patched_fields)
patch_repair_failures = _observation_patch_repair_failures(
filled_required_observation_ids,
scaffold_patched_fields,
)
if patch_repair_failures and raw_output is not None:
(
repaired_output,
repair_validation,
repair_attempted,
merged_output,
merged_validation,
merged_field_provenance,
) = _try_field_level_model_output(
client=client,
prompt=prompt,
context=context,
raw_output=raw_output,
validation_failures=patch_repair_failures,
fallback_output=scaffolded_model_output,
known_cards=known_cards,
floor=floor,
intake=intake,
rule_results=rule_results,
retrieved=retrieved,
retrieved_ids=retrieved_ids,
scaffold_patched_fields=scaffold_patched_fields,
)
if merged_output is not None and merged_validation is not None:
final_output = merged_output
final_validation = merged_validation
field_provenance = merged_field_provenance
else:
if scaffolded_model_output is not None:
fallback_output, fallback_validation, fallback_scaffold = _run_fallback(
intake,
rule_results,
retrieved,
floor,
known_cards,
retrieved_ids,
)
(
repaired_output,
repair_validation,
repair_attempted,
merged_output,
merged_validation,
merged_field_provenance,
) = _try_field_level_model_output(
client=client,
prompt=prompt,
context=context,
raw_output=scaffolded_model_output,
validation_failures=raw_validation["failures"],
fallback_output=fallback_output,
known_cards=known_cards,
floor=floor,
intake=intake,
rule_results=rule_results,
retrieved=retrieved,
retrieved_ids=retrieved_ids,
scaffold_patched_fields=scaffold_patched_fields,
)
if merged_output is not None and merged_validation is not None:
final_output = merged_output
final_validation = merged_validation
field_provenance = merged_field_provenance
if (
field_provenance.get("missing_info_to_collect") == DETERMINISTIC_FALLBACK
or field_provenance.get("next_observations_to_collect") == DETERMINISTIC_FALLBACK
):
_absorb_scaffold_trace(
fallback_scaffold,
scaffold_patched_fields=scaffold_patched_fields,
filled_required_observation_ids=filled_required_observation_ids,
model_selected_required_observation_ids=model_selected_required_observation_ids,
invalid_selected_required_observation_ids=invalid_selected_required_observation_ids,
stripped_trace_only_fields=stripped_trace_only_fields,
)
else:
fallback_reason = fallback_reason or "navigator_validation_failure"
fallback_used = True
final_output = fallback_output
final_validation = fallback_validation
field_provenance = deterministic_field_provenance()
_absorb_scaffold_trace(
fallback_scaffold,
scaffold_patched_fields=scaffold_patched_fields,
filled_required_observation_ids=filled_required_observation_ids,
model_selected_required_observation_ids=model_selected_required_observation_ids,
invalid_selected_required_observation_ids=invalid_selected_required_observation_ids,
stripped_trace_only_fields=stripped_trace_only_fields,
)
else:
fallback_used = True
fallback_output, fallback_validation, fallback_scaffold = _run_fallback(
intake,
rule_results,
retrieved,
floor,
known_cards,
retrieved_ids,
)
_absorb_scaffold_trace(
fallback_scaffold,
scaffold_patched_fields=scaffold_patched_fields,
filled_required_observation_ids=filled_required_observation_ids,
model_selected_required_observation_ids=model_selected_required_observation_ids,
invalid_selected_required_observation_ids=invalid_selected_required_observation_ids,
stripped_trace_only_fields=stripped_trace_only_fields,
)
final_output = fallback_output
final_validation = fallback_validation
field_provenance = deterministic_field_provenance()
if final_validation["passed"] and config.model_backend != "canned":
handoff_readiness_before = score_handoff_readiness(
final_output,
actual_red_flag_rule_ids=[str(rule.get("rule_id")) for rule in rule_results if rule.get("rule_id")],
source_card_ids=final_output.get("source_cards", []),
validation_result=final_validation,
)
if handoff_readiness_before.get("handoff_readiness_passed") is not True:
competence_fallback_output, _competence_fallback_validation, competence_fallback_scaffold = _run_fallback(
intake,
rule_results,
retrieved,
floor,
known_cards,
retrieved_ids,
)
(
competence_repaired_output,
competence_repair_validation,
competence_repair_attempted,
competence_merged_output,
competence_merged_validation,
competence_merged_field_provenance,
) = _try_field_level_model_output(
client=client,
prompt=prompt,
context={
**context,
"handoff_readiness_metrics": handoff_readiness_before,
},
raw_output=final_output,
validation_failures=_handoff_competence_failures(handoff_readiness_before),
fallback_output=competence_fallback_output,
known_cards=known_cards,
floor=floor,
intake=intake,
rule_results=rule_results,
retrieved=retrieved,
retrieved_ids=retrieved_ids,
scaffold_patched_fields=scaffold_patched_fields,
)
competence_repair_scope = "handoff_note_sbar" if competence_repair_attempted else None
if competence_merged_output is not None and competence_merged_validation is not None:
after = score_handoff_readiness(
competence_merged_output,
actual_red_flag_rule_ids=[str(rule.get("rule_id")) for rule in rule_results if rule.get("rule_id")],
source_card_ids=competence_merged_output.get("source_cards", []),
validation_result=competence_merged_validation,
)
handoff_readiness_after = after
if after.get("handoff_readiness_passed") is True:
final_output = competence_merged_output
final_validation = competence_merged_validation
field_provenance = competence_merged_field_provenance
competence_repair_success = True
if (
field_provenance.get("missing_info_to_collect") == DETERMINISTIC_FALLBACK
or field_provenance.get("next_observations_to_collect") == DETERMINISTIC_FALLBACK
):
_absorb_scaffold_trace(
competence_fallback_scaffold,
scaffold_patched_fields=scaffold_patched_fields,
filled_required_observation_ids=filled_required_observation_ids,
model_selected_required_observation_ids=model_selected_required_observation_ids,
invalid_selected_required_observation_ids=invalid_selected_required_observation_ids,
stripped_trace_only_fields=stripped_trace_only_fields,
)
field_level_fallback_used = has_deterministic_patches(field_provenance)
raw_success = raw_attempted and raw_validation["passed"] and not scaffold_patched_fields
repair_success = repair_attempted and repair_validation["passed"]
fallback_success = fallback_used and fallback_validation["passed"]
fallback_tier = "canned" if fallback_used else "configured"
competence_success = bool(raw_success or repair_success or competence_repair_success)
model_route = {
"model_stack": config.model_stack,
"model_backend": config.model_backend,
"model_id": config.active_model_id,
"fallback_tier": fallback_tier,
"fallback_reason": fallback_reason,
"field_level_fallback_used": field_level_fallback_used,
"deterministic_scaffold_patched_fields": sorted(scaffold_patched_fields),
"filled_required_observation_ids": filled_required_observation_ids,
"model_selected_required_observation_ids": model_selected_required_observation_ids,
"invalid_selected_required_observation_ids": invalid_selected_required_observation_ids,
"stripped_trace_only_fields": stripped_trace_only_fields,
}
model_route = derive_model_route(model_route, final_validation, [], field_provenance=field_provenance)
harness_evidence = build_harness_evidence(
confirmed_intake=intake,
retrieved_card_ids=retrieved_ids,
rule_results=rule_results,
urgency_floor=floor,
validator_result=final_validation,
final_output=final_output,
model_route=model_route,
)
final_output = dict(final_output)
final_output["harness_evidence"] = harness_evidence
trace_payload = {
"case_id": case["case_id"],
"input_hash": stable_hash(intake),
"red_flags": rule_results,
"retrieved_card_ids": retrieved_ids,
"prompt_template_hash": prompt_hash,
"model_route": model_route,
"harness_evidence": harness_evidence,
"navigator_output": final_output,
"validator_result": final_validation,
"field_provenance": field_provenance,
}
actual_source_card_ids = [
str(card_id) for card_id in final_output.get("source_cards", []) if str(card_id)
]
actual_candidate_pathway_card_ids = _candidate_pathway_card_ids(
final_output.get("candidate_protocol_pathways")
)
record = {
"case_id": case["case_id"],
"case_path": case.get("_case_path"),
"case_line": case.get("_case_line"),
"target_protocol_card_id": case.get("target_protocol_card_id"),
"expected_min_protocol_urgency": case.get("expected_min_protocol_urgency"),
"expected_red_flag_rule_ids": case.get("expected_red_flag_rule_ids", []),
"expected_source_card_ids": case.get("expected_source_card_ids", []),
"expected_candidate_pathway_card_ids": case.get("expected_candidate_pathway_card_ids", []),
"expected_missing_observations": case.get("expected_missing_observations", []),
"expected_model_observation_cues": case.get("expected_model_observation_cues", []),
"expected_handoff_cues": case.get("expected_handoff_cues", []),
"expected_harness_evidence_cues": case.get("expected_harness_evidence_cues", []),
"forbidden_behavior": case.get("forbidden_behavior", []),
"actual_red_flag_rule_ids": [rule["rule_id"] for rule in rule_results],
"actual_protocol_urgency": final_output.get("protocol_urgency"),
"actual_source_card_ids": actual_source_card_ids,
"actual_candidate_pathway_card_ids": actual_candidate_pathway_card_ids,
"retrieved_card_ids": retrieved_ids,
"model_backend": config.model_backend,
"model_stack": config.model_stack,
"active_model_id": config.active_model_id,
"fallback_tier": fallback_tier,
"fallback_reason": fallback_reason,
"field_level_fallback_used": field_level_fallback_used,
"deterministic_scaffold_patched_fields": sorted(scaffold_patched_fields),
"filled_required_observation_ids": filled_required_observation_ids,
"model_selected_required_observation_ids": model_selected_required_observation_ids,
"invalid_selected_required_observation_ids": invalid_selected_required_observation_ids,
"stripped_trace_only_fields": stripped_trace_only_fields,
"raw_configured_model_attempted": raw_attempted,
"raw_configured_model_success": raw_success,
"repair_attempted": repair_attempted,
"repair_success": repair_success,
"validation_repair_attempted": repair_attempted,
"validation_repair_success": repair_success,
"competence_repair_attempted": competence_repair_attempted,
"competence_repair_success": competence_repair_success,
"competence_repair_scope": competence_repair_scope,
"handoff_readiness_before_competence_repair": handoff_readiness_before,
"handoff_readiness_after_competence_repair": handoff_readiness_after,
"canned_fallback_used": fallback_used,
"canned_fallback_success": fallback_success,
"competence_success": competence_success,
"raw_validation": raw_validation,
"repair_validation": repair_validation,
"competence_repair_validation": competence_repair_validation,
"fallback_validation": fallback_validation,
"validation_result": final_validation,
"final_validation": final_validation,
"harness_evidence": harness_evidence,
"raw_model_output": raw_output,
"scaffolded_model_output": scaffolded_model_output,
"repaired_output": repaired_output,
"competence_repaired_output": competence_repaired_output,
"fallback_output": fallback_output,
"final_output": final_output,
"field_provenance": field_provenance,
"latency_ms": round((perf_counter() - started) * 1000, 3),
"trace_hash": stable_hash(trace_payload),
}
record["expected_label_score"] = score_expected_labels(record)
return record
def _run_fallback(
intake: dict[str, Any],
rule_results: list[dict[str, Any]],
retrieved: list[dict[str, Any]],
floor: str,
known_cards: set[str],
retrieved_ids: list[str],
) -> tuple[dict[str, Any], dict[str, Any], NavigationScaffoldResult]:
output = canned_navigator_output(intake, rule_results, retrieved, floor)
scaffold = apply_navigation_scaffolding(
output,
retrieved_cards=retrieved,
rule_results=rule_results,
urgency_floor=floor,
confirmed_intake=intake,
)
output = scaffold.output
validation = _validate_output(output, known_cards, floor, intake, rule_results, retrieved, retrieved_ids)
return output, validation, scaffold
def _absorb_scaffold_trace(
result: NavigationScaffoldResult,
*,
scaffold_patched_fields: set[str],
filled_required_observation_ids: list[str],
model_selected_required_observation_ids: list[str],
invalid_selected_required_observation_ids: list[str],
stripped_trace_only_fields: list[str],
) -> None:
scaffold_patched_fields.update(result.patched_fields)
_extend_unique(filled_required_observation_ids, result.filled_required_observation_ids)
_extend_unique(model_selected_required_observation_ids, result.model_selected_required_observation_ids)
_extend_unique(invalid_selected_required_observation_ids, result.invalid_selected_required_observation_ids)
_extend_unique(stripped_trace_only_fields, result.stripped_trace_only_fields)
def _extend_unique(items: list[str], values: list[str]) -> None:
for value in values:
if value not in items:
items.append(value)
def _merge_observation_repair_values(previous_value: Any, repair_value: Any) -> list[str]:
merged: list[str] = []
for value in _coerce_text_list(previous_value) + _coerce_text_list(repair_value):
if value not in merged:
merged.append(value)
return merged
def _coerce_text_list(value: Any) -> list[str]:
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
if isinstance(value, str) and value.strip():
return [value.strip()]
return []
def _validate_output(
output: dict[str, Any],
known_cards: set[str],
floor: str,
intake: dict[str, Any],
rule_results: list[dict[str, Any]],
retrieved: list[dict[str, Any]],
retrieved_ids: list[str],
) -> dict[str, Any]:
return validate_navigator_output(
output,
known_cards,
floor,
confirmed_intake=intake,
rule_results=rule_results,
retrieved_card_ids=set(retrieved_ids),
retrieved_cards=retrieved,
strict_schema=True,
).to_dict()
def _try_field_level_model_output(
*,
client: ModelClient,
prompt: str,
context: dict[str, Any],
raw_output: dict[str, Any],
validation_failures: list[str],
fallback_output: dict[str, Any],
known_cards: set[str],
floor: str,
intake: dict[str, Any],
rule_results: list[dict[str, Any]],
retrieved: list[dict[str, Any]],
retrieved_ids: list[str],
scaffold_patched_fields: set[str],
) -> tuple[dict[str, Any] | None, dict[str, Any], bool, dict[str, Any] | None, dict[str, Any] | None, dict[str, str]]:
accepted_raw_fields = accepted_raw_fields_from_failures(validation_failures)
repaired_fields: dict[str, Any] = {}
repair_attempted = False
repair_validation = {"passed": False, "failures": ["repair not attempted"]}
for focused_prompt in build_focused_repair_prompts(
original_prompt=prompt,
previous_output=raw_output,
failures=validation_failures,
urgency_floor=floor,
required_observation_targets=required_observation_targets(retrieved),
):
repair_attempted = True
try:
repair_output = client.generate_json(
focused_prompt.prompt,
{
**context,
"previous_output": raw_output,
"validation_failures": validation_failures,
"repair_scope": focused_prompt.scope.name,
},
)
except ModelClientError as exc:
repair_validation = {"passed": False, "failures": [f"repair backend error: {exc}"]}
continue
if not isinstance(repair_output, dict):
repair_validation = {"passed": False, "failures": ["repair output was not an object"]}
continue
missing_source_cards = missing_mandatory_source_cards(focused_prompt.scope, repair_output)
if missing_source_cards:
repair_validation = {
"passed": False,
"failures": [
f"repair omitted mandatory source card {card_id}" for card_id in missing_source_cards
],
}
continue
for field in focused_prompt.scope.fields:
if field in repair_output:
if focused_prompt.scope.name == "missing_observations":
repaired_fields[field] = _merge_observation_repair_values(
raw_output.get(field),
repair_output[field],
)
else:
repaired_fields[field] = repair_output[field]
merge_candidates = []
if repaired_fields:
merge_candidates.append(repaired_fields)
merge_candidates.append({})
for candidate_repaired_fields in merge_candidates:
merge_result = merge_field_provenance(
raw_output,
candidate_repaired_fields,
fallback_output,
accepted_raw_fields=accepted_raw_fields,
)
merged_validation = _validate_output(
merge_result.output,
known_cards,
floor,
intake,
rule_results,
retrieved,
retrieved_ids,
)
if merged_validation["passed"]:
if merge_result.provenance == deterministic_field_provenance():
continue
_mark_deterministic_patch_fields(merge_result.provenance, scaffold_patched_fields)
if candidate_repaired_fields:
repair_validation = merged_validation
return (
candidate_repaired_fields or None,
repair_validation,
repair_attempted,
merge_result.output,
merged_validation,
merge_result.provenance,
)
if candidate_repaired_fields:
repair_validation = merged_validation
return None, repair_validation, repair_attempted, None, None, {}
def _mark_deterministic_patch_fields(provenance: dict[str, str], fields: set[str]) -> None:
for field in fields:
if field in provenance and provenance[field] != MODEL_REPAIRED:
provenance[field] = DETERMINISTIC_FALLBACK
def _observation_patch_repair_failures(
filled_required_observation_ids: list[str],
scaffold_patched_fields: set[str],
) -> list[str]:
if not {"missing_info_to_collect", "next_observations_to_collect"} & scaffold_patched_fields:
return []
card_ids: list[str] = []
for target_id in filled_required_observation_ids:
card_id, separator, _index = str(target_id).partition("::required_observation::")
if separator and card_id and card_id not in card_ids:
card_ids.append(card_id)
return [
f"missing_info_to_collect does not reference required observations for {card_id}"
for card_id in card_ids
]
def _handoff_competence_failures(metrics: dict[str, Any]) -> list[str]:
failures = ["handoff_note_sbar handoff_readiness_passed failed"]
for key, value in sorted(metrics.items()):
if key.startswith("sbar_") and value is False:
failures.append(f"handoff_note_sbar {key} failed")
elif key == "handoff_brevity_ok" and value is False:
failures.append("handoff_note_sbar handoff_brevity_ok failed")
elif key == "handoff_unsupported_fact_count" and value:
failures.append(f"handoff_note_sbar unsupported fact count: {value}")
return failures
def _repair_prompt(
original_prompt: str,
previous_output: dict[str, Any],
failures: list[str],
urgency_floor: str,
) -> str:
repair_context = {
"deterministic_validation_failures": failures,
"urgency_floor": urgency_floor,
"previous_output": previous_output,
}
return (
f"{original_prompt}\n\n"
"Your previous JSON failed deterministic validation. Return corrected JSON only.\n"
"Keep protocol_urgency at or above the urgency_floor, cite only retrieved source_cards, "
"cite every fired rule card, ground SBAR fields in confirmed intake/rules, and avoid diagnosis, "
"prescription, dosing, autonomous routing, or treatment language.\n\n"
f"REPAIR_CONTEXT:\n{json.dumps(repair_context, indent=2, sort_keys=True)}"
)
def _candidate_pathway_card_ids(value: Any) -> list[str]:
if not isinstance(value, list):
return []
card_ids: list[str] = []
for item in value:
if isinstance(item, dict):
card_id = item.get("card_id")
else:
card_id = item
if card_id:
card_ids.append(str(card_id))
return card_ids
def _summarize(
records: list[dict[str, Any]],
config: FigmentConfig,
case_paths: list[Path],
output_path: Path | None,
) -> dict[str, Any]:
summary = summarize_eval_records(records)
summary.update(
{
"model_backend": config.model_backend,
"model_stack": config.model_stack,
"active_model_id": config.active_model_id,
"case_paths": [str(path) for path in case_paths],
"output_path": str(output_path) if output_path else None,
}
)
runtime_errors = _runtime_error_summary(records)
summary["runtime_error_summary"] = runtime_errors
summary["scored_reporting_eligible"] = runtime_errors["critical_runtime_error_count"] == 0
if config.model_backend == "llama_cpp":
summary["local_llm_evidence"] = _local_llm_evidence_summary(summary, config)
return summary
def _local_llm_evidence_summary(summary: dict[str, Any], config: FigmentConfig) -> dict[str, Any]:
total_cases = int(summary.get("total_cases", 0))
competence_successes = int(summary.get("competence_successes", 0))
return {
"proof_status": "eval_records_summarized",
"model_backend": config.model_backend,
"model_stack": config.model_stack,
"model_id": config.active_model_id,
"llama_base_url": config.llama_base_url,
"server_command": os.getenv("LLAMA_SERVER_COMMAND") or None,
"gguf_path": os.getenv("LOCAL_GGUF_PATH") or os.getenv("LLAMA_ARG_MODEL") or None,
"gguf_sha256": os.getenv("LOCAL_GGUF_SHA256") or None,
"n_ctx": _optional_int_env("LLAMA_N_CTX") or _optional_int_env("LLAMA_ARG_CTX_SIZE"),
"n_parallel": _optional_int_env("LLAMA_N_PARALLEL") or _optional_int_env("LLAMA_ARG_N_PARALLEL"),
"prompt_cache": os.getenv("LLAMA_PROMPT_CACHE") or None,
"models_endpoint": _models_endpoint_metadata(config.llama_base_url),
"runtime_error_summary": summary.get("runtime_error_summary", {}),
"scored_reporting_eligible": summary.get("scored_reporting_eligible"),
"total_cases": total_cases,
"competence_successes": competence_successes,
"raw_configured_model_successes": summary.get("raw_configured_model_successes", 0),
"repair_successes": summary.get("repair_successes", 0),
"fallback_uses": summary.get("fallback_uses", 0),
"final_validation_successes": summary.get("final_validation_successes", 0),
"counts_as_50_case_local_llm_eval": total_cases >= 50,
"counts_as_50_case_local_llm_competence": total_cases >= 50 and competence_successes > 0,
"no_cloud_note": (
"MODEL_BACKEND=llama_cpp calls the configured local OpenAI-compatible LLAMA_BASE_URL. "
"Record server /v1/models metadata and network isolation evidence beside the trace."
),
"real_eval_command": REAL_LLAMA_CPP_EVAL_COMMAND,
}
def _runtime_error_summary(records: list[dict[str, Any]]) -> dict[str, Any]:
markers = {
"context_size_exceeded": ("Context size has been exceeded",),
"kv_cache_failure": ("failed to find free space in the KV cache", "KV cache"),
"server_http_500": ("http_status=500", "HTTP Error 500", " 500 "),
}
text_by_record = {
str(record.get("case_id") or index): json.dumps(
{
"raw_validation": record.get("raw_validation"),
"repair_validation": record.get("repair_validation"),
"competence_repair_validation": record.get("competence_repair_validation"),
"fallback_validation": record.get("fallback_validation"),
"final_validation": record.get("final_validation"),
},
sort_keys=True,
)
for index, record in enumerate(records, start=1)
}
summary: dict[str, Any] = {
"context_size_exceeded": False,
"kv_cache_failure": False,
"server_http_500": False,
"critical_runtime_error_count": 0,
"affected_case_ids": [],
}
affected: set[str] = set()
for case_id, text in text_by_record.items():
for key, key_markers in markers.items():
if any(marker in text for marker in key_markers):
summary[key] = True
affected.add(case_id)
summary["affected_case_ids"] = sorted(affected)
summary["critical_runtime_error_count"] = sum(
int(bool(summary[key])) for key in ("context_size_exceeded", "kv_cache_failure", "server_http_500")
)
return summary
def _models_endpoint_metadata(base_url: str) -> dict[str, Any]:
url = _openai_models_url(base_url)
try:
with urllib.request.urlopen(url, timeout=2.0) as response:
payload = json.loads(response.read().decode("utf-8"))
except (OSError, TimeoutError, urllib.error.URLError, json.JSONDecodeError) as exc:
return {"url": url, "available": False, "error": str(exc)[:200]}
return {"url": url, "available": True, "payload": payload}
def _openai_models_url(base_url: str) -> str:
parts = urllib.parse.urlsplit(base_url.strip())
path = parts.path.rstrip("/")
if path.endswith("/v1"):
path = f"{path}/models"
elif path.endswith("/models"):
pass
else:
path = f"{path}/models" if path else "/v1/models"
return urllib.parse.urlunsplit((parts.scheme, parts.netloc, path, "", ""))
def _optional_int_env(name: str) -> int | None:
value = os.getenv(name, "").strip()
if not value:
return None
try:
return int(value)
except ValueError:
return None
def _write_eval_bundle_metadata(
summary: dict[str, Any],
records: list[dict[str, Any]],
config: FigmentConfig,
case_paths: list[Path],
output_path: Path,
) -> None:
output_dir = output_path.parent
output_dir.mkdir(parents=True, exist_ok=True)
summary_path = output_dir / "eval_summary.json"
manifest_path = output_dir / "eval_evidence_manifest.json"
summary_path.write_text(json.dumps(summary, indent=2, sort_keys=True) + "\n", encoding="utf-8")
manifest = {
"output_jsonl": str(output_path),
"summary_json": str(summary_path),
"case_paths": [str(path) for path in case_paths],
"model_backend": config.model_backend,
"model_stack": config.model_stack,
"active_model_id": config.active_model_id,
"total_cases": len(records),
"trace_hashes": [
{"case_id": record.get("case_id"), "trace_hash": record.get("trace_hash")}
for record in records
],
"all_trace_hashes_present": all(bool(record.get("trace_hash")) for record in records),
"runtime_error_summary": summary.get("runtime_error_summary", {}),
"scored_reporting_eligible": summary.get("scored_reporting_eligible"),
"local_llm_evidence": summary.get("local_llm_evidence"),
}
manifest_path.write_text(json.dumps(manifest, indent=2, sort_keys=True) + "\n", encoding="utf-8")
def main(argv: list[str] | None = None) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("--backend", choices=["canned", "hosted_omni", "llama_cpp"], default="canned")
parser.add_argument("--model-stack", choices=["omni_native", "local_4b_parakeet"], default=None)
parser.add_argument("--cases", action="append", default=None, help="JSONL eval case path. Repeatable.")
parser.add_argument("--output", default="-", help="JSONL result path, or '-' for stdout.")
parser.add_argument("--limit", type=int, default=None)
args = parser.parse_args(argv)
case_paths = [Path(path) for path in args.cases] if args.cases else sorted(Path().glob(DEFAULT_CASE_GLOB))
if not case_paths:
raise SystemExit(f"no eval case files matched {DEFAULT_CASE_GLOB}")
output_path = None if args.output == "-" else Path(args.output)
config = _config_for_backend(args.backend, args.model_stack)
summary = run_eval(case_paths=case_paths, output_path=output_path, config=config, limit=args.limit)
if output_path is None:
print(json.dumps(summary, indent=2, sort_keys=True), file=sys.stderr)
else:
print(json.dumps(summary, indent=2, sort_keys=True))
return 0
def _config_for_backend(backend: str, model_stack: str | None) -> FigmentConfig:
if backend == "canned":
return FigmentConfig(model_backend="canned", model_stack=model_stack or "omni_native").validated()
stack = model_stack or ("local_4b_parakeet" if backend == "llama_cpp" else "omni_native")
base = FigmentConfig.from_env()
return replace(base, model_backend=backend, model_stack=stack).validated()
if __name__ == "__main__":
raise SystemExit(main())