linvest21's picture
download
raw
17.7 kB
from __future__ import annotations
import argparse
import gc
import json
import math
import os
import re
import tempfile
import time
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from eval.model_quality_gate import evaluate_model_quality_gate
OUTPUT_WRITE_RETRIES = int(os.environ.get("SHFT_OUTPUT_WRITE_RETRIES", "5"))
OUTPUT_WRITE_RETRY_SECONDS = float(os.environ.get("SHFT_OUTPUT_WRITE_RETRY_SECONDS", "2"))
def utc_now() -> str:
return datetime.now(UTC).replace(microsecond=0).isoformat().replace("+00:00", "Z")
def load_jsonl(path: Path) -> list[dict[str, Any]]:
rows: list[dict[str, Any]] = []
for line_no, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
if not line.strip():
continue
item = json.loads(line)
if not isinstance(item, dict):
raise ValueError(f"record at {path}:{line_no} must be an object")
rows.append(item)
if not rows:
raise ValueError(f"eval suite has no records: {path}")
return rows
def write_json(path: Path, payload: dict[str, Any]) -> None:
content = json.dumps(payload, indent=2, sort_keys=True) + "\n"
last_exc: OSError | None = None
for attempt in range(1, max(1, OUTPUT_WRITE_RETRIES) + 1):
try:
path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=path.parent, delete=False) as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
tmp_path.replace(path)
return
except OSError as exc:
last_exc = exc
print(f"[SHFT hf-eval] warning: write retry {attempt}/{OUTPUT_WRITE_RETRIES} path={path} error={exc}", flush=True)
time.sleep(OUTPUT_WRITE_RETRY_SECONDS)
if last_exc is not None:
raise last_exc
def write_progress_json(path: Path, payload: dict[str, Any]) -> None:
try:
write_json(path, payload)
except OSError as exc:
print(f"[SHFT hf-eval] warning: progress write failed path={path} error={exc}", flush=True)
def load_json_or_none(path: Path) -> dict[str, Any] | None:
if not path.exists():
return None
return json.loads(path.read_text(encoding="utf-8-sig"))
def write_jsonl(path: Path, rows: list[dict[str, Any]]) -> None:
content = "\n".join(json.dumps(row, sort_keys=True) for row in rows) + "\n"
last_exc: OSError | None = None
for attempt in range(1, max(1, OUTPUT_WRITE_RETRIES) + 1):
try:
path.parent.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w", encoding="utf-8", dir=path.parent, delete=False) as tmp:
tmp.write(content)
tmp_path = Path(tmp.name)
tmp_path.replace(path)
return
except OSError as exc:
last_exc = exc
print(f"[SHFT hf-eval] warning: write retry {attempt}/{OUTPUT_WRITE_RETRIES} path={path} error={exc}", flush=True)
time.sleep(OUTPUT_WRITE_RETRY_SECONDS)
if last_exc is not None:
raise last_exc
def write_partial_jsonl(path: Path, rows: list[dict[str, Any]]) -> None:
try:
write_jsonl(path, rows)
except OSError as exc:
print(f"[SHFT hf-eval] warning: partial predictions write failed path={path} error={exc}", flush=True)
def reference_fields_for_item(item: dict[str, Any]) -> dict[str, Any]:
"""Carry curated gold answers and rubric points into the prediction row.
The preference builder prefers an explicit gold answer for `chosen`; without
this propagation it falls back to the baseline answer and caps the trained
model at parity. Rubric points let it synthesize a reference when no gold
answer has been authored yet.
"""
fields: dict[str, Any] = {}
for key in ("gold_answer", "reference_answer", "expected_answer"):
value = item.get(key)
if isinstance(value, str) and value.strip():
fields[key] = value.strip()
break
for key in ("expected_points", "critical_checks"):
value = item.get(key)
if isinstance(value, list) and value:
fields[key] = value
return fields
def prompt_messages(prompt: str) -> list[dict[str, str]]:
return [
{
"role": "system",
"content": (
"You are a cautious financial analyst. Answer briefly, distinguish reported facts from inference, "
"avoid investment advice, identify the key risk or tradeoff, and show calculations when needed."
),
},
{"role": "user", "content": prompt},
]
def prompt_text(prompt: str) -> str:
return (
"You are a cautious financial analyst. Answer briefly, distinguish facts from inference, "
"avoid investment advice, and show calculations when needed.\n\n"
f"Question: {prompt}\nAnswer:"
)
def generate(model: Any, tokenizer: Any, prompt: str, *, max_new_tokens: int) -> str:
import torch
generation_config = getattr(model, "generation_config", None)
if generation_config is not None:
generation_config.do_sample = False
deterministic_defaults = {"temperature": 1.0, "top_p": 1.0, "top_k": 50}
for sampling_field, default_value in deterministic_defaults.items():
if hasattr(generation_config, sampling_field):
setattr(generation_config, sampling_field, default_value)
if getattr(tokenizer, "chat_template", None):
encoded = tokenizer.apply_chat_template(
prompt_messages(prompt),
add_generation_prompt=True,
return_tensors="pt",
return_dict=True,
).to(model.device)
prompt_token_count = int(encoded["input_ids"].shape[-1])
else:
encoded = tokenizer(prompt_text(prompt), return_tensors="pt").to(model.device)
prompt_token_count = int(encoded["input_ids"].shape[-1])
with torch.no_grad():
output = model.generate(
**encoded,
max_new_tokens=max_new_tokens,
do_sample=False,
pad_token_id=tokenizer.pad_token_id,
eos_token_id=tokenizer.eos_token_id,
)
generated_tokens = output[0][prompt_token_count:]
return tokenizer.decode(generated_tokens, skip_special_tokens=True).strip()
def score_answer(item: dict[str, Any], answer: str) -> dict[str, Any]:
text = answer.lower()
prompt = str(item.get("prompt", "")).lower()
scores = {
"no_investment_advice": not bool(re.search(r"\b(buy|sell|hold|short|go long|price target)\b", text)),
"no_unsupported_certainty": not bool(re.search(r"\b(certainly|guaranteed|will definitely|must happen)\b", text)),
"neutral_language": bool(re.search(r"\b(risk|could|may|suggests|indicates|monitor|investigate|tradeoff|pressure)\b", text)),
"fact_inference_separation": bool(re.search(r"\b(reported|fact|while|however|but|suggests|implies|inference)\b", text)),
"risk_or_tradeoff_identified": bool(re.search(r"\b(risk|tradeoff|pressure|concern|deteriorat|slow|declin|margin|cash flow|leverage|inventory)\b", text)),
}
numeric_required = str(item.get("task")) == "quantitative_qa" or bool(re.search(r"\bcalculate\b|from \$?\d|to \$?\d|%|\bratio\b", prompt))
if numeric_required:
scores["numeric_reasoning_present"] = bool(re.search(r"\d", text)) and bool(re.search(r"\b(to|from|increase|decrease|ratio|x|%)\b", text))
else:
scores["numeric_reasoning_present"] = True
critical_pass = (
scores["no_investment_advice"]
and scores["no_unsupported_certainty"]
and scores["numeric_reasoning_present"]
)
point_values = [
scores["neutral_language"],
scores["fact_inference_separation"],
scores["risk_or_tradeoff_identified"],
scores["numeric_reasoning_present"],
]
score = sum(1 for value in point_values if value) / len(point_values)
if not critical_pass:
score *= 0.5
return {
"score": round(score, 4),
"critical_pass": critical_pass,
"checks": scores,
"scoring_mode": "deterministic_heuristic_v0",
}
def aggregate(rows: list[dict[str, Any]], prefix: str) -> dict[str, Any]:
scores = [float(row[f"{prefix}_score"]["score"]) for row in rows]
critical = [bool(row[f"{prefix}_score"]["critical_pass"]) for row in rows]
task_scores: dict[str, list[float]] = {}
for row in rows:
task_scores.setdefault(str(row["task"]), []).append(float(row[f"{prefix}_score"]["score"]))
return {
"aggregate": round(sum(scores) / len(scores), 4),
"critical_pass_rate": round(sum(1 for value in critical if value) / len(critical), 4),
"sample_count": len(rows),
"task_scores": {task: round(sum(values) / len(values), 4) for task, values in sorted(task_scores.items())},
}
def paired_report(rows: list[dict[str, Any]]) -> dict[str, Any]:
baseline = aggregate(rows, "baseline")
candidate = aggregate(rows, "candidate")
deltas = [float(row["candidate_score"]["score"]) - float(row["baseline_score"]["score"]) for row in rows]
wins = sum(1 for delta in deltas if delta > 0)
ties = sum(1 for delta in deltas if delta == 0)
losses = sum(1 for delta in deltas if delta < 0)
max_loss_rate = float(os.environ.get("SHFT_MAX_PAIRWISE_LOSS_RATE", "0.05"))
min_critical_pass_rate = float(os.environ.get("SHFT_MIN_CRITICAL_PASS_RATE", "0.50"))
min_aggregate = float(os.environ.get("SHFT_MIN_CANDIDATE_AGGREGATE", "0.50"))
critical_not_regressed = candidate["critical_pass_rate"] >= baseline["critical_pass_rate"]
loss_rate_ok = losses <= len(rows) * max_loss_rate
aggregate_ok = candidate["aggregate"] >= min_aggregate and candidate["aggregate"] > baseline["aggregate"]
critical_ok = candidate["critical_pass_rate"] >= min_critical_pass_rate and critical_not_regressed
mean_delta = sum(deltas) / len(deltas)
variance = sum((delta - mean_delta) ** 2 for delta in deltas) / max(1, len(deltas) - 1)
stderr = math.sqrt(variance / len(deltas))
legacy_gate = {
"eligible_for_promotion": aggregate_ok and loss_rate_ok and critical_ok,
"rules": [
f"candidate aggregate must exceed baseline aggregate and be at least {min_aggregate:.2f}",
f"pairwise loss count must be no more than {max_loss_rate:.0%} of eval samples",
"critical pass rate must not regress",
f"candidate critical pass rate must be at least {min_critical_pass_rate:.0%}",
],
"checks": {
"aggregate_ok": aggregate_ok,
"loss_rate_ok": loss_rate_ok,
"critical_not_regressed": critical_not_regressed,
"critical_minimum_ok": candidate["critical_pass_rate"] >= min_critical_pass_rate,
},
"warnings": [
"baseline aggregate is zero; improvement percent is undefined and candidate quality must be judged by absolute thresholds"
]
if baseline["aggregate"] == 0
else [],
}
report = {
"baseline": baseline,
"candidate": candidate,
"improvement": {
"aggregate_abs": round(candidate["aggregate"] - baseline["aggregate"], 4),
"aggregate_pct": round(((candidate["aggregate"] - baseline["aggregate"]) / baseline["aggregate"] * 100), 4)
if baseline["aggregate"]
else None,
"critical_pass_rate_abs": round(candidate["critical_pass_rate"] - baseline["critical_pass_rate"], 4),
"paired_mean_delta": round(mean_delta, 4),
"paired_delta_stderr": round(stderr, 4),
"pairwise_win_rate": round(wins / len(deltas), 4),
"pairwise_tie_rate": round(ties / len(deltas), 4),
"pairwise_loss_rate": round(losses / len(deltas), 4),
"wins": wins,
"ties": ties,
"losses": losses,
},
"paired_eval_gate": legacy_gate,
}
full_gate = evaluate_model_quality_gate(paired_eval={"sample_count": len(rows), **report})
report["model_quality_gate"] = full_gate
report["promotion_gate"] = {
"eligible_for_promotion": full_gate["eligible_for_promotion"],
"quality_signal": full_gate["quality_signal"],
"rules": [
"paired model-vs-model eval must pass absolute thresholds",
"training budget must satisfy production minimums",
"corpus coverage must retain source records across train/valid/test",
"model-as-judge rubric report must pass",
"human spot-check report must approve with no critical failures",
],
"checks": full_gate["checks"],
"errors": full_gate["errors"],
"warnings": full_gate["warnings"],
"paired_eval_gate": legacy_gate,
}
return report
def main() -> int:
parser = argparse.ArgumentParser(description="Run paired Linvest21 frozen-suite evaluation on Hugging Face Jobs.")
parser.add_argument("--run-id", required=True)
parser.add_argument("--base-model-id", required=True)
parser.add_argument("--baseline-adapter-dir")
parser.add_argument("--candidate-adapter-dir", required=True)
parser.add_argument("--eval-jsonl", required=True)
parser.add_argument("--output-dir", required=True)
parser.add_argument("--max-new-tokens", type=int, default=160)
parser.add_argument("--max-samples", type=int, default=120)
args = parser.parse_args()
import torch
from peft import PeftModel
from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig
output_dir = Path(args.output_dir)
rows = load_jsonl(Path(args.eval_jsonl))[: args.max_samples]
tokenizer = AutoTokenizer.from_pretrained(args.base_model_id, use_fast=True)
if tokenizer.pad_token is None:
tokenizer.pad_token = tokenizer.eos_token
quantization = BitsAndBytesConfig(load_in_4bit=True, bnb_4bit_quant_type="nf4", bnb_4bit_compute_dtype=torch.bfloat16)
def load_eval_model(adapter_dir: str | None = None) -> Any:
model = AutoModelForCausalLM.from_pretrained(
args.base_model_id,
quantization_config=quantization,
device_map="auto",
trust_remote_code=False,
)
if adapter_dir:
model = PeftModel.from_pretrained(model, adapter_dir, is_trainable=False)
model.eval()
return model
progress = {
"run_id": args.run_id,
"status": "running",
"stage": "baseline_generation",
"completed_baseline": 0,
"completed_candidate": 0,
"total": len(rows),
}
write_progress_json(output_dir / "progress.json", progress)
baseline_model = load_eval_model(args.baseline_adapter_dir)
evaluated: list[dict[str, Any]] = []
for idx, item in enumerate(rows, start=1):
baseline_answer = generate(baseline_model, tokenizer, str(item["prompt"]), max_new_tokens=args.max_new_tokens)
evaluated.append(
{
"id": item["id"],
"task": item["task"],
"prompt": item["prompt"],
"baseline_answer": baseline_answer,
"baseline_score": score_answer(item, baseline_answer),
**reference_fields_for_item(item),
}
)
if idx % 5 == 0 or idx == len(rows):
progress = {**progress, "completed_baseline": idx}
write_progress_json(output_dir / "progress.json", progress)
write_partial_jsonl(output_dir / "paired_predictions.partial.jsonl", evaluated)
del baseline_model
gc.collect()
if torch.cuda.is_available():
torch.cuda.empty_cache()
candidate_model = load_eval_model(args.candidate_adapter_dir)
progress = {**progress, "stage": "candidate_generation"}
write_progress_json(output_dir / "progress.json", progress)
for idx, (row, item) in enumerate(zip(evaluated, rows, strict=True), start=1):
candidate_answer = generate(candidate_model, tokenizer, str(item["prompt"]), max_new_tokens=args.max_new_tokens)
row["candidate_answer"] = candidate_answer
row["candidate_score"] = score_answer(item, candidate_answer)
row["delta"] = round(float(row["candidate_score"]["score"]) - float(row["baseline_score"]["score"]), 4)
if idx % 5 == 0 or idx == len(rows):
progress = {**progress, "completed_candidate": idx}
write_progress_json(output_dir / "progress.json", progress)
write_partial_jsonl(output_dir / "paired_predictions.partial.jsonl", evaluated)
report = {
"run_id": args.run_id,
"status": "completed",
"completed_at": utc_now(),
"base_model_id": args.base_model_id,
"baseline_adapter_dir": args.baseline_adapter_dir,
"candidate_adapter_dir": args.candidate_adapter_dir,
"selected_checkpoint": load_json_or_none(Path(args.candidate_adapter_dir).parent / "selected_checkpoint.json"),
"eval_jsonl": args.eval_jsonl,
"scoring_mode": "deterministic_heuristic_v0",
"sample_count": len(evaluated),
**paired_report(evaluated),
}
write_jsonl(output_dir / "paired_predictions.jsonl", evaluated)
write_json(output_dir / "paired_eval_report.json", report)
write_progress_json(output_dir / "progress.json", {**progress, "status": "completed", "stage": "completed"})
print(json.dumps(report, indent=2, sort_keys=True))
return 0
if __name__ == "__main__":
raise SystemExit(main())

Xet Storage Details

Size:
17.7 kB
·
Xet hash:
00d86f66b89285d255f12fef7be5674d701076209bc4f05e8ace252a932bfd5e

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.