Prompt_Squirrel_RAG / scripts /eval_pipeline.py
Food Desert
Switch Stage3 to explicit-only no-why selection, drop bear probe, and set k=1 defaults
06a3c46
"""End-to-end evaluation harness for the Prompt Squirrel RAG pipeline.
Measures per-stage and overall metrics using ground-truth tagged samples
from the e621 evaluation dataset.
Metrics computed:
- Stage 2 (Retrieval): Recall@k — what fraction of ground-truth tags
appear among the retrieved candidates
- Stage 3 (Selection): Precision, Recall, F1 — how well the final
selected tags match the ground truth
Usage:
# Full end-to-end (Stage 1 + 2 + 3), 20 random samples:
python scripts/eval_pipeline.py --n 20
# Reproducible run with specific seed:
python scripts/eval_pipeline.py --n 50 --seed 123
# Parallel processing with 4 workers (default):
python scripts/eval_pipeline.py --n 50 --workers 4
# Sequential mode (disable parallelism):
python scripts/eval_pipeline.py --n 20 --workers 1
# Skip Stage 1 LLM rewrite (cheaper, tests Stage 2+3 only):
python scripts/eval_pipeline.py --n 20 --skip-rewrite
# First N samples in file order (no shuffle):
python scripts/eval_pipeline.py --n 20 --no-shuffle
Results are always saved as JSONL to data/eval_results/ (auto-named by timestamp)
or to a custom path with -o.
Requires:
- OPENROUTER_API_KEY env var (for Stage 1 rewrite and Stage 3 selection)
- fluffyrock_3m.csv and other retrieval assets in the project root
- data/eval_samples/e621_sfw_sample_1000_seed123_buffer10000.jsonl
"""
from __future__ import annotations
import argparse
import json
import os
import random
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, List, Optional, Set, Tuple
_REPO_ROOT = Path(__file__).resolve().parents[1]
if str(_REPO_ROOT) not in sys.path:
sys.path.insert(0, str(_REPO_ROOT))
os.chdir(_REPO_ROOT)
def _ensure_utf8_stdio() -> None:
try:
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace")
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8", errors="replace")
except Exception:
pass
EVAL_DATA_PATH = _REPO_ROOT / "data" / "eval_samples" / "e621_sfw_sample_1000_seed123_buffer10000_expanded.jsonl"
EVAL_DATA_PATH_RAW = _REPO_ROOT / "data" / "eval_samples" / "e621_sfw_sample_1000_seed123_buffer10000.jsonl"
# Character tag types that go through the alias filter pipeline
_CHARACTER_TYPES = {"character"}
# Copyright tags are filtered out entirely
_COPYRIGHT_TYPES = {"copyright"}
# Tags excluded from evaluation metrics but NOT removed from the pipeline.
# These are tags that either: can't be inferred from a caption (resolution,
# art medium), describe structural properties better handled outside the
# retrieval pipeline (backgrounds), or are annotation artifacts.
_EVAL_EXCLUDED_TAGS = frozenset({
# Annotation artifacts
"invalid_tag", "invalid_background",
# Resolution / file meta — not inferrable from caption
"hi_res", "absurd_res", "low_res", "superabsurd_res",
# Structural background tags — better recommended independently
"simple_background", "abstract_background", "detailed_background",
"gradient_background", "blurred_background", "textured_background",
"transparent_background", "white_background",
})
def _classify_tags(tags: Set[str], get_type_fn) -> Tuple[Set[str], Set[str]]:
"""Split tags into (character_tags, general_tags).
Copyright tags are excluded from both sets since they're filtered
before any selection happens.
"""
character = set()
general = set()
for tag in tags:
ttype = get_type_fn(tag)
if ttype in _CHARACTER_TYPES:
character.add(tag)
elif ttype not in _COPYRIGHT_TYPES:
general.add(tag)
return character, general
def _flatten_ground_truth_tags(tags_categorized_str: str) -> Set[str]:
"""Parse the categorized ground-truth JSON string into a flat set of tags."""
if not tags_categorized_str:
return set()
try:
cats = json.loads(tags_categorized_str)
except json.JSONDecodeError:
return set()
tags = set()
for tag_list in cats.values():
if isinstance(tag_list, list):
for t in tag_list:
tags.add(t.strip())
return tags
@dataclass
class SampleResult:
sample_id: Any
caption: str
ground_truth_tags: Set[str]
# Stage 1
rewrite_phrases: List[str] = field(default_factory=list)
# Stage 2
retrieved_tags: Set[str] = field(default_factory=set)
retrieval_recall: float = 0.0
# Stage 3 — overall
selected_tags: Set[str] = field(default_factory=set)
stage3_selected_tags: Set[str] = field(default_factory=set)
stage3_selected_scores: Dict[str, float] = field(default_factory=dict)
stage3_selected_ranks: Dict[str, int] = field(default_factory=dict)
stage3_selected_phrase_ranks: Dict[str, int] = field(default_factory=dict)
selection_precision: float = 0.0
selection_recall: float = 0.0
selection_f1: float = 0.0
# Stage 3 — character tags only
gt_character_tags: Set[str] = field(default_factory=set)
selected_character_tags: Set[str] = field(default_factory=set)
retrieved_character_tags: Set[str] = field(default_factory=set)
char_retrieval_recall: float = 0.0
char_precision: float = 0.0
char_recall: float = 0.0
char_f1: float = 0.0
# Stage 3 — general tags only (non-character, non-copyright)
gt_general_tags: Set[str] = field(default_factory=set)
selected_general_tags: Set[str] = field(default_factory=set)
general_precision: float = 0.0
general_recall: float = 0.0
general_f1: float = 0.0
# New diagnostic metrics
retrieval_precision: float = 0.0 # |retrieved ∩ gt| / |retrieved|
selection_given_retrieval: float = 0.0 # |selected ∩ gt| / |retrieved ∩ gt|
over_selection_ratio: float = 0.0 # |selected| / |gt|
# Why distribution (from Stage 3 LLM)
why_counts: Dict[str, int] = field(default_factory=dict)
stage3_diag: Dict[str, Any] = field(default_factory=dict)
# Tag implications
implied_tags: Set[str] = field(default_factory=set) # tags added via implications (not LLM-selected)
# Structural inference tags (solo/duo/male/female/anthro/biped etc.)
structural_tags: List[str] = field(default_factory=list)
# Simplified probe tags (reliability-gated fixed probe list)
probe_tags: List[str] = field(default_factory=list)
# Per-tag evidence: tag -> {"source": "stage3"|"structural"|"implied", "why": ..., "score": ...}
tag_evidence: Dict[str, Dict[str, Any]] = field(default_factory=dict)
# Leaf-only metrics (strips implied ancestors from both sides)
leaf_precision: float = 0.0
leaf_recall: float = 0.0
leaf_f1: float = 0.0
leaf_selected_count: int = 0
leaf_gt_count: int = 0
# Timing
stage1_time: float = 0.0
stage2_time: float = 0.0
stage3_time: float = 0.0
stage3s_time: float = 0.0
stage3p_time: float = 0.0
# Categorized suggestions (for ranking metrics)
categorized_suggestions: Dict[str, List[Tuple[str, float]]] = field(default_factory=dict)
# Errors
error: Optional[str] = None
# Non-fatal issues/warnings captured from pipeline logs (fallbacks, retries, API errors)
issues: List[str] = field(default_factory=list)
def _compute_metrics(predicted: Set[str], ground_truth: Set[str]) -> Tuple[float, float, float]:
"""Compute precision, recall, F1."""
if not predicted and not ground_truth:
return 1.0, 1.0, 1.0
if not predicted:
return 0.0, 0.0, 0.0
if not ground_truth:
return 0.0, 0.0, 0.0
tp = len(predicted & ground_truth)
precision = tp / len(predicted)
recall = tp / len(ground_truth)
f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0
return precision, recall, f1
def _build_selection_query(
caption: str,
rewrite_phrases: Sequence[str],
structural_tags: Sequence[str],
probe_tags: Sequence[str],
) -> str:
lines = [f"IMAGE DESCRIPTION: {caption.strip()}"]
if rewrite_phrases:
lines.append("REWRITE PHRASES: " + ", ".join(rewrite_phrases))
hint_tags = list(structural_tags) + list(probe_tags)
if hint_tags:
lines.append("INFERRED TAG HINTS (context only): " + ", ".join(sorted(set(hint_tags))))
return "\n".join(lines)
def _process_one_sample(
sample: Dict[str, Any],
index: int,
total: int,
skip_rewrite: bool,
allow_nsfw: bool,
mode: str,
chunk_size: int,
per_phrase_k: int,
per_phrase_final_k: int,
temperature: float,
max_tokens: int,
verbose: bool,
print_lock: threading.Lock,
min_why: Optional[str] = None,
expand_implications: bool = False,
infer_structural: bool = False,
infer_probe: bool = False,
) -> SampleResult:
"""Process a single eval sample through the full pipeline. Thread-safe."""
from psq_rag.llm.rewrite import llm_rewrite_prompt
from psq_rag.retrieval.psq_retrieval import psq_candidates_from_rewrite_phrases
from psq_rag.llm.select import llm_select_indices, llm_infer_structural_tags, llm_infer_probe_tags
from psq_rag.retrieval.state import get_tag_type_name, expand_tags_via_implications, get_leaf_tags
def log(msg: str) -> None:
msg_str = str(msg)
msg_l = msg_str.lower()
if any(k in msg_l for k in ("error", "fallback", "gave up", "warning", "filtered", "refusal")):
result.issues.append(msg_str)
if verbose:
with print_lock:
print(f" [{index+1}] {msg_str}")
sid = sample["id"]
caption = sample["caption"]
gt_tags = sample["gt_tags"]
result = SampleResult(
sample_id=sid,
caption=caption[:120] + ("..." if len(caption) > 120 else ""),
ground_truth_tags=gt_tags,
)
with print_lock:
print(f"[{index+1}/{total}] id={sid} gt_tags={len(gt_tags)}")
try:
# --- Stage 1x: Start independent LLM calls concurrently ---
def _run_stage1_rewrite() -> Tuple[str, float]:
t0 = time.time()
rewritten_local = llm_rewrite_prompt(caption, log)
return rewritten_local or "", (time.time() - t0)
def _run_stage3_structural() -> Tuple[List[str], float]:
t0s = time.time()
structural = llm_infer_structural_tags(
caption, log=log, temperature=temperature,
)
return structural, (time.time() - t0s)
def _run_stage3_probe() -> Tuple[List[str], float]:
t0p = time.time()
probed = llm_infer_probe_tags(
caption, log=log, temperature=temperature,
)
return probed, (time.time() - t0p)
pre_workers = (0 if skip_rewrite else 1) + int(infer_structural) + int(infer_probe)
if pre_workers > 0:
with ThreadPoolExecutor(max_workers=pre_workers) as pre_ex:
fut_rewrite = pre_ex.submit(_run_stage1_rewrite) if not skip_rewrite else None
fut_struct = pre_ex.submit(_run_stage3_structural) if infer_structural else None
fut_probe = pre_ex.submit(_run_stage3_probe) if infer_probe else None
# --- Stage 1: LLM Rewrite (result consumed first for retrieval) ---
if skip_rewrite:
phrases = [p.strip() for p in caption.split(",") if p.strip()]
if len(phrases) <= 1:
phrases = [p.strip() for p in caption.replace(".", ",").split(",") if p.strip()]
result.rewrite_phrases = phrases
result.stage1_time = 0.0
else:
rewritten, t1 = fut_rewrite.result() if fut_rewrite is not None else ("", 0.0)
result.stage1_time = t1
if rewritten:
result.rewrite_phrases = [p.strip() for p in rewritten.split(",") if p.strip()]
else:
result.rewrite_phrases = [p.strip() for p in caption.split(",") if p.strip()]
if len(result.rewrite_phrases) <= 1:
result.rewrite_phrases = [p.strip() for p in caption.replace(".", ",").split(",") if p.strip()]
log(f"Phrases ({len(result.rewrite_phrases)}): {result.rewrite_phrases[:5]}")
# Wait for side-channel calls before retrieval so their tags can
# influence TF-IDF context scoring in Stage 2.
if fut_struct is not None:
structural, stage3s_time = fut_struct.result()
result.stage3s_time = stage3s_time
result.structural_tags = structural
else:
result.stage3s_time = 0.0
result.structural_tags = []
if fut_probe is not None:
probe_tags, stage3p_time = fut_probe.result()
result.stage3p_time = stage3p_time
result.probe_tags = probe_tags
else:
result.stage3p_time = 0.0
result.probe_tags = []
# --- Stage 2: Retrieval ---
t0 = time.time()
retrieval_context_tags = list(dict.fromkeys(result.structural_tags + result.probe_tags))
retrieval_result = psq_candidates_from_rewrite_phrases(
rewrite_phrases=result.rewrite_phrases,
allow_nsfw_tags=allow_nsfw,
context_tags=retrieval_context_tags,
per_phrase_final_k=per_phrase_final_k,
global_k=300,
return_phrase_ranks=True,
verbose=False,
)
result.stage2_time = time.time() - t0
else:
# Should not happen with current defaults, but keep safe behavior.
result.rewrite_phrases = [p.strip() for p in caption.split(",") if p.strip()]
result.stage1_time = 0.0
result.stage2_time = 0.0
result.stage3s_time = 0.0
result.structural_tags = []
result.stage3p_time = 0.0
result.probe_tags = []
retrieval_result = []
phrase_rank_by_tag = {}
if isinstance(retrieval_result, tuple):
if len(retrieval_result) == 2:
candidates, phrase_rank_by_tag = retrieval_result
else:
candidates = retrieval_result[0]
else:
candidates = retrieval_result
result.retrieved_tags = {c.tag for c in candidates}
if gt_tags:
result.retrieval_recall = len(result.retrieved_tags & gt_tags) / len(gt_tags)
log(f"Retrieved {len(candidates)} candidates, recall={result.retrieval_recall:.3f}")
# --- Stage 3: LLM Selection (uses rewrite + structural/probe context) ---
def _run_stage3_selection() -> Tuple[List[int], Dict[str, str], Dict[str, Any], float]:
t0 = time.time()
selection_query = _build_selection_query(
caption=caption,
rewrite_phrases=result.rewrite_phrases,
structural_tags=result.structural_tags,
probe_tags=result.probe_tags,
)
picked, why_map, diag = llm_select_indices(
query_text=selection_query,
candidates=candidates,
max_pick=0,
log=log,
mode=mode,
chunk_size=chunk_size,
per_phrase_k=per_phrase_k,
temperature=temperature,
max_tokens=max_tokens,
return_metadata=True,
return_diagnostics=True,
min_why=min_why,
)
return picked, why_map, diag or {}, (time.time() - t0)
picked_indices, tag_why, stage3_diag, stage3_time = _run_stage3_selection()
result.stage3_time = stage3_time
result.stage3_diag = stage3_diag or {}
result.selected_tags = {candidates[idx].tag for idx in picked_indices} if picked_indices else set()
result.stage3_selected_tags = set(result.selected_tags)
# Build per-tag evidence from Stage 3 selection
rank_by_tag = {c.tag: i + 1 for i, c in enumerate(candidates)}
for idx in picked_indices:
tag = candidates[idx].tag
result.stage3_selected_scores[tag] = round(candidates[idx].score_combined, 4)
result.stage3_selected_ranks[tag] = rank_by_tag.get(tag, len(candidates) + 1)
if phrase_rank_by_tag:
result.stage3_selected_phrase_ranks[tag] = phrase_rank_by_tag.get(tag, len(candidates) + 1)
result.tag_evidence[tag] = {
"source": "stage3",
"why": tag_why.get(tag, "unknown"),
"retrieval_score": round(candidates[idx].score_combined, 4),
}
# Why distribution
why_counts: Dict[str, int] = {}
for w in tag_why.values():
why_counts[w] = why_counts.get(w, 0) + 1
result.why_counts = why_counts
# Structural tag inference (solo/duo/male/female/anthro/biped etc.)
if infer_structural:
# Add structural tags not already selected
for st in result.structural_tags:
if st not in result.selected_tags:
result.tag_evidence[st] = {"source": "structural"}
result.selected_tags.add(st)
log(f"Structural: {result.structural_tags}")
if infer_probe:
for pt in result.probe_tags:
if pt not in result.selected_tags:
result.tag_evidence[pt] = {"source": "probe"}
result.selected_tags.add(pt)
log(f"Probe: {result.probe_tags}")
# Tag implication expansion (post-Stage 3)
if expand_implications and result.selected_tags:
expanded, implied_only = expand_tags_via_implications(result.selected_tags)
result.implied_tags = implied_only
for imp_tag in implied_only:
result.tag_evidence[imp_tag] = {"source": "implied"}
result.selected_tags = expanded
log(f"Implications: +{len(implied_only)} tags")
# Generate categorized suggestions (for ranking metrics)
try:
from psq_rag.tagging.categorized_suggestions import (
generate_categorized_suggestions,
)
# Use selected tags to generate category-wise ranked suggestions
categorized = generate_categorized_suggestions(
selected_tags=list(result.selected_tags),
allow_nsfw_tags=allow_nsfw,
top_n_per_category=20, # Get top 20 per category for eval
top_n_other=50,
)
# Convert to simple dict format: category -> [(tag, score), ...]
result.categorized_suggestions = {}
for cat_name, cat_sugg in categorized.by_category.items():
result.categorized_suggestions[cat_name] = cat_sugg.suggestions
# Also store "other" suggestions
result.categorized_suggestions['other'] = categorized.other_suggestions
log(f"Categorized: {len(result.categorized_suggestions)} categories")
except Exception as e:
log(f"Warning: Failed to generate categorized suggestions: {e}")
# Remove eval-excluded tags from predictions before scoring
result.selected_tags -= _EVAL_EXCLUDED_TAGS
result.retrieved_tags -= _EVAL_EXCLUDED_TAGS
# Overall selection metrics (expanded — both sides have full implication chains)
p, r, f1 = _compute_metrics(result.selected_tags, gt_tags)
result.selection_precision = p
result.selection_recall = r
result.selection_f1 = f1
# Leaf-only metrics (strips implied ancestors from both sides)
leaf_sel = get_leaf_tags(result.selected_tags)
leaf_gt = get_leaf_tags(gt_tags)
lp, lr, lf1 = _compute_metrics(leaf_sel, leaf_gt)
result.leaf_precision = lp
result.leaf_recall = lr
result.leaf_f1 = lf1
result.leaf_selected_count = len(leaf_sel)
result.leaf_gt_count = len(leaf_gt)
# Diagnostic metrics
retrieved_and_gt = result.retrieved_tags & gt_tags
selected_and_gt = result.selected_tags & gt_tags
if result.retrieved_tags:
result.retrieval_precision = len(retrieved_and_gt) / len(result.retrieved_tags)
if retrieved_and_gt:
result.selection_given_retrieval = len(selected_and_gt) / len(retrieved_and_gt)
if gt_tags:
result.over_selection_ratio = len(result.selected_tags) / len(gt_tags)
# Split ground-truth and selected tags by type
gt_char, gt_gen = _classify_tags(gt_tags, get_tag_type_name)
sel_char, sel_gen = _classify_tags(result.selected_tags, get_tag_type_name)
ret_char, _ = _classify_tags(result.retrieved_tags, get_tag_type_name)
result.gt_character_tags = gt_char
result.selected_character_tags = sel_char
result.retrieved_character_tags = ret_char
result.gt_general_tags = gt_gen
result.selected_general_tags = sel_gen
# Character-specific metrics
if gt_char:
result.char_retrieval_recall = len(ret_char & gt_char) / len(gt_char)
cp, cr, cf1 = _compute_metrics(sel_char, gt_char)
result.char_precision = cp
result.char_recall = cr
result.char_f1 = cf1
# General-tag metrics
gp, gr, gf1 = _compute_metrics(sel_gen, gt_gen)
result.general_precision = gp
result.general_recall = gr
result.general_f1 = gf1
# Per-sample output line
char_info = ""
if gt_char:
char_info = f" char[gt={len(gt_char)} sel={len(sel_char)} P={cp:.2f} R={cr:.2f}]"
impl_info = f" (+{len(result.implied_tags)} implied)" if result.implied_tags else ""
struct_info = f" (+{len(result.structural_tags)} structural)" if result.structural_tags else ""
probe_info = f" (+{len(result.probe_tags)} probe)" if result.probe_tags else ""
with print_lock:
print(
f" [{index+1}] retrieval_recall={result.retrieval_recall:.3f} "
f"sel_P={p:.3f} sel_R={r:.3f} sel_F1={f1:.3f} "
f"selected={len(result.selected_tags)}{impl_info}{struct_info}{probe_info}{char_info} "
f"t1={result.stage1_time:.1f}s t2={result.stage2_time:.1f}s t3={result.stage3_time:.1f}s"
)
except Exception as e:
result.error = str(e)
result.issues.append(f"fatal_exception: {e}")
with print_lock:
print(f" [{index+1}] ERROR: {e}")
return result
def _prewarm_retrieval_assets() -> None:
"""Force-load all lazy retrieval assets so threads don't race on init."""
from psq_rag.retrieval.state import (
get_tfidf_components,
get_tag2aliases,
get_tag_type_name,
get_tag_implications,
)
print("Pre-warming retrieval assets (TF-IDF, FastText, HNSW, aliases)...")
t0 = time.time()
get_tfidf_components() # loads joblib, HNSW indexes, FastText model
get_tag2aliases() # loads CSV alias dict
get_tag_type_name("_warmup_") # ensures tag type dict is built
get_tag_implications() # loads implication graph
print(f" Assets loaded in {time.time() - t0:.1f}s")
def run_eval(
n_samples: int = 20,
caption_field: str = "caption_cogvlm",
skip_rewrite: bool = False,
allow_nsfw: bool = False,
mode: str = "chunked_map_union",
chunk_size: int = 60,
per_phrase_k: int = 2,
per_phrase_final_k: int = 1,
temperature: float = 0.0,
max_tokens: int = 512,
verbose: bool = False,
shuffle: bool = True,
seed: int = 42,
workers: int = 1,
min_why: Optional[str] = "strong_implied",
eval_path: Optional[str] = None,
expand_implications: bool = False,
infer_structural: bool = False,
infer_probe: bool = False,
) -> List[SampleResult]:
expand_gt = expand_implications
if expand_gt:
from psq_rag.retrieval.state import expand_tags_via_implications as _expand_gt_tags
# Load eval samples — prefer expanded file, fall back to raw
eval_path_obj = Path(eval_path) if eval_path else EVAL_DATA_PATH
if not eval_path_obj.is_absolute():
eval_path_obj = (_REPO_ROOT / eval_path_obj).resolve()
if not eval_path_obj.is_file() and eval_path is None:
eval_path_obj = EVAL_DATA_PATH_RAW
if not eval_path_obj.is_file():
print(f"ERROR: Eval data not found: {EVAL_DATA_PATH}")
sys.exit(1)
print(f"WARNING: Expanded eval data not found, falling back to raw: {eval_path_obj}")
print(" Run: python scripts/preprocess_eval_data.py")
elif not eval_path_obj.is_file():
print(f"ERROR: Eval data not found: {eval_path_obj}")
sys.exit(1)
all_samples = []
using_expanded = False
with eval_path_obj.open("r", encoding="utf-8") as f:
for line in f:
row = json.loads(line)
caption = row.get(caption_field, "")
if not caption or not caption.strip():
continue
# Prefer pre-expanded GT; fall back to flattening categorized
if "tags_ground_truth_expanded" in row:
gt_tags = set(row["tags_ground_truth_expanded"])
using_expanded = True
else:
gt_tags = _flatten_ground_truth_tags(row.get("tags_ground_truth_categorized", ""))
if not gt_tags:
continue
# Remove eval-excluded tags from GT
gt_tags -= _EVAL_EXCLUDED_TAGS
if expand_gt:
gt_tags, _ = _expand_gt_tags(gt_tags)
gt_tags -= _EVAL_EXCLUDED_TAGS
all_samples.append({
"id": row.get("id", row.get("row_id", len(all_samples))),
"caption": caption.strip(),
"gt_tags": gt_tags,
})
if using_expanded:
print("Using implication-expanded ground truth")
if shuffle:
rng = random.Random(seed)
rng.shuffle(all_samples)
samples = all_samples[:n_samples]
print(f"Loaded {len(samples)}/{len(all_samples)} samples (caption_field={caption_field})")
print(f"eval_path={eval_path_obj}")
print(f"shuffle={shuffle}, seed={seed}, skip_rewrite={skip_rewrite}, allow_nsfw={allow_nsfw}, mode={mode}")
print(f"workers={workers}")
print()
# Pre-warm shared retrieval assets before spawning threads
_prewarm_retrieval_assets()
print()
print_lock = threading.Lock()
total = len(samples)
if workers <= 1:
# Sequential mode (original behavior)
results: List[SampleResult] = []
for i, sample in enumerate(samples):
result = _process_one_sample(
sample, i, total,
skip_rewrite, allow_nsfw, mode, chunk_size,
per_phrase_k, per_phrase_final_k, temperature, max_tokens, verbose,
print_lock, min_why,
expand_implications,
infer_structural,
infer_probe,
)
results.append(result)
else:
# Parallel mode
print(f"Processing {total} samples with {workers} parallel workers...")
print()
# Submit all samples; use index to preserve original ordering
results_by_index: Dict[int, SampleResult] = {}
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = {
executor.submit(
_process_one_sample,
sample, i, total,
skip_rewrite, allow_nsfw, mode, chunk_size,
per_phrase_k, per_phrase_final_k, temperature, max_tokens, verbose,
print_lock, min_why,
expand_implications,
infer_structural,
infer_probe,
): i
for i, sample in enumerate(samples)
}
for future in as_completed(futures):
idx = futures[future]
try:
results_by_index[idx] = future.result()
except Exception as e:
# Should not happen since _process_one_sample catches exceptions,
# but guard against unexpected errors
with print_lock:
print(f" [{idx+1}] WORKER ERROR: {e}")
result = SampleResult(
sample_id=samples[idx]["id"],
caption=samples[idx]["caption"][:120],
ground_truth_tags=samples[idx]["gt_tags"],
error=f"Worker error: {e}",
)
results_by_index[idx] = result
# Reassemble in original order
results = [results_by_index[i] for i in range(total)]
return results
def _safe_avg(values: List[float]) -> float:
return sum(values) / len(values) if values else 0.0
def print_summary(results: List[SampleResult]) -> None:
"""Print aggregate metrics across all samples."""
valid = [r for r in results if r.error is None]
errored = [r for r in results if r.error is not None]
if not valid:
print("\nNo valid results to summarize.")
return
n = len(valid)
avg_retrieval_recall = sum(r.retrieval_recall for r in valid) / n
avg_sel_precision = sum(r.selection_precision for r in valid) / n
avg_sel_recall = sum(r.selection_recall for r in valid) / n
avg_sel_f1 = sum(r.selection_f1 for r in valid) / n
avg_retrieved = sum(len(r.retrieved_tags) for r in valid) / n
avg_selected = sum(len(r.selected_tags) for r in valid) / n
avg_gt = sum(len(r.ground_truth_tags) for r in valid) / n
avg_t1 = sum(r.stage1_time for r in valid) / n
avg_t2 = sum(r.stage2_time for r in valid) / n
avg_t3 = sum(r.stage3_time for r in valid) / n
print()
print("=" * 70)
print(f"EVALUATION SUMMARY ({n} samples, {len(errored)} errors)")
print("=" * 70)
print()
print("Stage 2 - Retrieval:")
print(f" Avg recall@300: {avg_retrieval_recall:.4f}")
print(f" Avg candidates: {avg_retrieved:.1f}")
avg_retrieval_precision = _safe_avg([r.retrieval_precision for r in valid])
avg_sel_given_ret = _safe_avg([r.selection_given_retrieval for r in valid
if (r.retrieved_tags & r.ground_truth_tags)])
avg_over_sel = _safe_avg([r.over_selection_ratio for r in valid])
avg_implied = sum(len(r.implied_tags) for r in valid) / n
avg_structural = sum(len(r.structural_tags) for r in valid) / n
avg_probe = sum(len(r.probe_tags) for r in valid) / n
print()
print("Stage 3 - Selection (ALL tags):")
print(f" Avg precision: {avg_sel_precision:.4f}")
print(f" Avg recall: {avg_sel_recall:.4f}")
print(f" Avg F1: {avg_sel_f1:.4f}")
print(f" Avg selected tags: {avg_selected:.1f}")
if avg_implied > 0:
print(f" Avg implied tags: {avg_implied:.1f} (added via tag implications)")
if avg_structural > 0:
print(f" Avg structural tags: {avg_structural:.1f} (inferred via statement agreement)")
if avg_probe > 0:
print(f" Avg probe tags: {avg_probe:.1f} (inferred via simplified probe query)")
print(f" Avg ground-truth tags:{avg_gt:.1f}")
# Leaf-only metrics
avg_leaf_p = _safe_avg([r.leaf_precision for r in valid])
avg_leaf_r = _safe_avg([r.leaf_recall for r in valid])
avg_leaf_f1 = _safe_avg([r.leaf_f1 for r in valid])
avg_leaf_sel = _safe_avg([r.leaf_selected_count for r in valid])
avg_leaf_gt = _safe_avg([r.leaf_gt_count for r in valid])
print()
print("Stage 3 - Selection (LEAF tags only — implied ancestors stripped):")
print(f" Avg precision: {avg_leaf_p:.4f}")
print(f" Avg recall: {avg_leaf_r:.4f}")
print(f" Avg F1: {avg_leaf_f1:.4f}")
print(f" Avg leaf selected: {avg_leaf_sel:.1f}")
print(f" Avg leaf ground-truth:{avg_leaf_gt:.1f}")
print()
print("Diagnostic Metrics:")
print(f" Retrieval precision: {avg_retrieval_precision:.4f} (|ret∩gt|/|ret|, noise level fed to Stage 3)")
print(f" Sel-given-retrieval: {avg_sel_given_ret:.4f} (of gt tags retrieved, fraction kept by Stage 3)")
print(f" Over-selection ratio: {avg_over_sel:.2f}x (|selected|/|gt|, ideal ~1.0)")
stage3_diag_rows = [r.stage3_diag for r in valid if r.stage3_diag]
if stage3_diag_rows:
calls_total = sum(int(d.get("calls_total", 0)) for d in stage3_diag_rows)
calls_exhausted = sum(int(d.get("calls_exhausted_retries", 0)) for d in stage3_diag_rows)
attempts_total = sum(int(d.get("attempts_total", 0)) for d in stage3_diag_rows)
attempts_parse_fail = sum(int(d.get("attempt_parse_fail", 0)) for d in stage3_diag_rows)
attempts_errors = sum(int(d.get("attempt_errors", 0)) for d in stage3_diag_rows)
print()
print("Stage 3 Structured Output Reliability:")
print(f" Calls total: {calls_total}")
print(f" Calls exhausted: {calls_exhausted} ({(100 * calls_exhausted / calls_total) if calls_total else 0:.1f}%)")
print(f" Attempts total: {attempts_total}")
print(f" Parse/schema failures:{attempts_parse_fail} ({(100 * attempts_parse_fail / attempts_total) if attempts_total else 0:.1f}%)")
print(f" Call errors/exc: {attempts_errors} ({(100 * attempts_errors / attempts_total) if attempts_total else 0:.1f}%)")
by_n_agg: Dict[int, Dict[str, int]] = {}
for d in stage3_diag_rows:
for n_str, n_stats in d.get("attempts_by_n_local", {}).items():
try:
n_local = int(n_str)
except Exception:
continue
cur = by_n_agg.setdefault(n_local, {"attempts": 0, "parse_fail": 0, "errors": 0})
cur["attempts"] += int(n_stats.get("attempts", 0))
cur["parse_fail"] += int(n_stats.get("parse_fail", 0))
cur["errors"] += int(n_stats.get("errors", 0))
if by_n_agg:
print(" Failure by call size (N_local):")
for n_local in sorted(by_n_agg.keys()):
s = by_n_agg[n_local]
fail = s["parse_fail"] + s["errors"]
rate = (100 * fail / s["attempts"]) if s["attempts"] else 0.0
print(
f" N={n_local:3d} attempts={s['attempts']:4d} "
f"fail={fail:4d} ({rate:5.1f}%)"
)
# Why distribution across all samples
total_why: Dict[str, int] = {}
for r in valid:
for w, cnt in r.why_counts.items():
total_why[w] = total_why.get(w, 0) + cnt
if total_why:
total_selections = sum(total_why.values())
print()
print("Why Distribution (Stage 3 LLM rationale):")
for w in ["explicit", "strong_implied", "weak_implied", "style_or_meta", "other"]:
cnt = total_why.get(w, 0)
pct = 100 * cnt / total_selections if total_selections else 0
print(f" {w:20s} {cnt:4d} ({pct:5.1f}%)")
# --- Character tag breakdown ---
# Only include samples that actually have character tags in ground truth
samples_with_chars = [r for r in valid if r.gt_character_tags]
# Samples where the system selected character tags (true or false positive)
samples_selecting_chars = [r for r in valid if r.selected_character_tags]
print()
print("-" * 70)
print(f"CHARACTER TAGS ({len(samples_with_chars)}/{n} samples have character ground-truth)")
print("-" * 70)
if samples_with_chars:
avg_char_retrieval_recall = _safe_avg([r.char_retrieval_recall for r in samples_with_chars])
avg_char_p = _safe_avg([r.char_precision for r in samples_with_chars])
avg_char_r = _safe_avg([r.char_recall for r in samples_with_chars])
avg_char_f1 = _safe_avg([r.char_f1 for r in samples_with_chars])
avg_gt_char = _safe_avg([len(r.gt_character_tags) for r in samples_with_chars])
avg_sel_char = _safe_avg([len(r.selected_character_tags) for r in samples_with_chars])
print(f" Retrieval recall: {avg_char_retrieval_recall:.4f}")
print(f" Selection precision: {avg_char_p:.4f}")
print(f" Selection recall: {avg_char_r:.4f}")
print(f" Selection F1: {avg_char_f1:.4f}")
print(f" Avg gt char tags: {avg_gt_char:.1f}")
print(f" Avg selected chars: {avg_sel_char:.1f}")
# Show character-specific failures
char_misses = []
char_false_pos = []
for r in samples_with_chars:
missed = r.gt_character_tags - r.selected_character_tags
for m in missed:
char_misses.append((r.sample_id, m))
extra = r.selected_character_tags - r.gt_character_tags
for e in extra:
char_false_pos.append((r.sample_id, e))
if char_misses:
print(f"\n Missed characters ({len(char_misses)} total):")
for sid, tag in char_misses[:10]:
print(f" id={sid}: missed {tag}")
if char_false_pos:
print(f"\n False positive characters ({len(char_false_pos)} total):")
for sid, tag in char_false_pos[:10]:
print(f" id={sid}: wrongly selected {tag}")
else:
print(" (no samples had character tags in ground truth)")
# False positive characters in samples WITHOUT character ground-truth
no_char_gt_but_selected = [r for r in valid if not r.gt_character_tags and r.selected_character_tags]
if no_char_gt_but_selected:
print(f"\n Spurious character selections ({len(no_char_gt_but_selected)} samples):")
print(" (These samples had NO character in ground truth but system selected one)")
for r in no_char_gt_but_selected[:5]:
print(f" id={r.sample_id}: selected {sorted(r.selected_character_tags)}")
# --- General tag breakdown ---
print()
print("-" * 70)
print("GENERAL TAGS (non-character, non-copyright)")
print("-" * 70)
avg_gen_p = _safe_avg([r.general_precision for r in valid])
avg_gen_r = _safe_avg([r.general_recall for r in valid])
avg_gen_f1 = _safe_avg([r.general_f1 for r in valid])
avg_gt_gen = _safe_avg([len(r.gt_general_tags) for r in valid])
avg_sel_gen = _safe_avg([len(r.selected_general_tags) for r in valid])
print(f" Selection precision: {avg_gen_p:.4f}")
print(f" Selection recall: {avg_gen_r:.4f}")
print(f" Selection F1: {avg_gen_f1:.4f}")
print(f" Avg gt general tags: {avg_gt_gen:.1f}")
print(f" Avg selected general: {avg_sel_gen:.1f}")
print()
print("-" * 70)
avg_t3s = sum(r.stage3s_time for r in valid) / n
avg_t3p = sum(r.stage3p_time for r in valid) / n
print("Timing (avg per sample):")
print(f" Stage 1 (rewrite): {avg_t1:.2f}s")
print(f" Stage 2 (retrieval): {avg_t2:.2f}s")
print(f" Stage 3 (selection): {avg_t3:.2f}s")
if avg_t3s > 0:
print(f" Stage 3s (structural):{avg_t3s:.2f}s")
if avg_t3p > 0:
print(f" Stage 3p (probe): {avg_t3p:.2f}s")
print(f" Total: {avg_t1 + avg_t2 + avg_t3 + avg_t3s + avg_t3p:.2f}s")
print()
# Show worst and best F1 samples
by_f1 = sorted(valid, key=lambda r: r.selection_f1)
print("Lowest F1 samples (overall):")
for r in by_f1[:3]:
print(f" id={r.sample_id} F1={r.selection_f1:.3f} P={r.selection_precision:.3f} R={r.selection_recall:.3f}")
missed = r.ground_truth_tags - r.selected_tags
extra = r.selected_tags - r.ground_truth_tags
if missed:
print(f" missed: {sorted(missed)[:10]}")
if extra:
print(f" extra: {sorted(extra)[:10]}")
print()
print("Highest F1 samples (overall):")
for r in by_f1[-3:]:
print(f" id={r.sample_id} F1={r.selection_f1:.3f} P={r.selection_precision:.3f} R={r.selection_recall:.3f}")
if errored:
print()
print(f"Errors ({len(errored)}):")
for r in errored[:5]:
print(f" id={r.sample_id}: {r.error}")
print("=" * 70)
def main(argv=None) -> int:
_ensure_utf8_stdio()
ap = argparse.ArgumentParser(description="End-to-end pipeline evaluation")
ap.add_argument("--n", type=int, default=20, help="Number of samples to evaluate")
ap.add_argument("--caption-field", default="caption_cogvlm",
choices=["caption_cogvlm", "caption_llm_0", "caption_llm_1",
"caption_llm_2", "caption_llm_3", "caption_llm_4",
"caption_llm_5", "caption_llm_6", "caption_llm_7"],
help="Which caption field to use as input")
ap.add_argument("--skip-rewrite", action="store_true",
help="Skip Stage 1 LLM rewrite; split caption directly into phrases")
ap.add_argument("--allow-nsfw", action="store_true", help="Allow NSFW tags")
ap.add_argument("--mode", default="chunked_map_union",
choices=["single_shot", "chunked_map_union"])
ap.add_argument("--chunk-size", type=int, default=60)
ap.add_argument("--per-phrase-k", type=int, default=2)
ap.add_argument("--per-phrase-final-k", type=int, default=1,
help="Top-K candidates per phrase after scoring (retrieval cap)")
ap.add_argument("--temperature", type=float, default=0.0)
ap.add_argument("--max-tokens", type=int, default=512)
ap.add_argument("--verbose", "-v", action="store_true", help="Show per-call Stage 3 logs")
ap.add_argument("--output", "-o", type=str, default=None,
help="Save detailed results as JSONL (default: auto-generated in data/eval_results/)")
ap.add_argument("--shuffle", action="store_true", default=True,
help="Randomly shuffle samples before selecting (default: True)")
ap.add_argument("--no-shuffle", dest="shuffle", action="store_false",
help="Use samples in file order (first N)")
ap.add_argument("--seed", type=int, default=42,
help="Random seed for shuffle (default: 42)")
ap.add_argument("--workers", "-w", type=int, default=4,
help="Number of parallel workers (default: 4, use 1 for sequential)")
ap.add_argument("--eval-path", type=str, default=None,
help="Optional path to eval JSONL (defaults to expanded 1000-sample set).")
ap.add_argument("--min-why", default="strong_implied",
choices=["explicit", "strong_implied", "weak_implied", "style_or_meta", "other", "none"],
help="Minimum 'why' confidence to keep (default: strong_implied). Use 'none' to disable filtering.")
ap.add_argument("--expand-implications", action="store_true", default=False,
help="Expand selected tags via tag implication chains (e.g. fox→canine→canid→mammal)")
ap.add_argument("--infer-structural", action="store_true", default=False,
help="Infer structural tags (solo/duo/male/female/anthro/biped) via LLM statement agreement")
ap.add_argument("--infer-probe", action="store_true", default=True,
help="Infer simplified reliability-gated probe tags via LLM (default: on)")
ap.add_argument("--no-infer-probe", dest="infer_probe", action="store_false",
help="Disable simplified probe inference")
args = ap.parse_args(list(argv) if argv is not None else None)
# Convert "none" string to actual None for disabling the filter
min_why_val = None if args.min_why == "none" else args.min_why
results = run_eval(
n_samples=args.n,
caption_field=args.caption_field,
skip_rewrite=args.skip_rewrite,
allow_nsfw=args.allow_nsfw,
mode=args.mode,
chunk_size=args.chunk_size,
per_phrase_k=args.per_phrase_k,
per_phrase_final_k=args.per_phrase_final_k,
temperature=args.temperature,
max_tokens=args.max_tokens,
verbose=args.verbose,
shuffle=args.shuffle,
seed=args.seed,
workers=args.workers,
min_why=min_why_val,
eval_path=args.eval_path,
expand_implications=args.expand_implications,
infer_structural=args.infer_structural,
infer_probe=args.infer_probe,
)
print_summary(results)
# Save results in two formats:
# 1. Compact metrics JSONL (small, for git / LLM reading)
# 2. Full detail JSONL (large, for analysis scripts, gitignored)
results_dir = _REPO_ROOT / "data" / "eval_results"
results_dir.mkdir(parents=True, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
base_name = f"eval_{args.caption_field}_n{args.n}_seed{args.seed}_{timestamp}"
if args.output:
out_path = Path(args.output)
else:
out_path = results_dir / f"{base_name}.jsonl"
detail_path = results_dir / f"{base_name}_detail.jsonl"
out_path.parent.mkdir(parents=True, exist_ok=True)
# Write run metadata as first line
meta = {
"_meta": True,
"timestamp": datetime.now().isoformat(),
"n_samples": len(results),
"caption_field": args.caption_field,
"skip_rewrite": args.skip_rewrite,
"allow_nsfw": args.allow_nsfw,
"mode": args.mode,
"chunk_size": args.chunk_size,
"eval_path": args.eval_path,
"per_phrase_k": args.per_phrase_k,
"per_phrase_final_k": args.per_phrase_final_k,
"temperature": args.temperature,
"shuffle": args.shuffle,
"seed": args.seed,
"workers": args.workers,
"min_why": args.min_why,
"expand_implications": args.expand_implications,
"infer_structural": args.infer_structural,
"infer_probe": args.infer_probe,
"n_errors": sum(1 for r in results if r.error),
"n_issue_samples": sum(1 for r in results if r.issues),
"n_issues_total": sum(len(r.issues) for r in results),
}
with out_path.open("w", encoding="utf-8") as f:
f.write(json.dumps(meta, ensure_ascii=False) + "\n")
for r in results:
# Compact format: metrics + counts + small diff sets (not full tag lists)
missed_tags = sorted(r.ground_truth_tags - r.selected_tags)
extra_tags = sorted(r.selected_tags - r.ground_truth_tags)
row = {
"id": r.sample_id,
# Counts (not full lists)
"n_gt": len(r.ground_truth_tags),
"n_retrieved": len(r.retrieved_tags),
"n_selected": len(r.selected_tags),
"n_implied": len(r.implied_tags),
"n_structural": len(r.structural_tags),
"n_probe": len(r.probe_tags),
# Overall metrics
"ret_R": round(r.retrieval_recall, 4),
"P": round(r.selection_precision, 4),
"R": round(r.selection_recall, 4),
"F1": round(r.selection_f1, 4),
# Leaf metrics
"leaf_P": round(r.leaf_precision, 4),
"leaf_R": round(r.leaf_recall, 4),
"leaf_F1": round(r.leaf_f1, 4),
"n_leaf_sel": r.leaf_selected_count,
"n_leaf_gt": r.leaf_gt_count,
# Diagnostic
"ret_P": round(r.retrieval_precision, 4),
"sel_given_ret": round(r.selection_given_retrieval, 4),
"over_sel": round(r.over_selection_ratio, 2),
"why": r.why_counts,
"stage3_diag": r.stage3_diag,
# Character metrics (compact)
"n_gt_char": len(r.gt_character_tags),
"n_sel_char": len(r.selected_character_tags),
"char_F1": round(r.char_f1, 4),
# General metrics (compact)
"gen_P": round(r.general_precision, 4),
"gen_R": round(r.general_recall, 4),
"gen_F1": round(r.general_f1, 4),
# Diff sets (small — only the errors, not the full lists)
"missed": missed_tags,
"extra": extra_tags,
# Full tag lists (needed for categorized evaluation)
"ground_truth_tags": sorted(r.ground_truth_tags),
"selected_tags": sorted(r.selected_tags),
"stage3_selected": sorted(r.stage3_selected_tags),
"stage3_selected_scores": r.stage3_selected_scores,
"stage3_selected_ranks": r.stage3_selected_ranks,
"stage3_selected_phrase_ranks": r.stage3_selected_phrase_ranks,
# Evidence for extra tags (why did these false positives get through?)
"extra_evidence": {t: r.tag_evidence.get(t, {}) for t in extra_tags},
# Structural tags inferred
"structural": r.structural_tags,
"probe": r.probe_tags,
# Timing
"t1": round(r.stage1_time, 2),
"t2": round(r.stage2_time, 2),
"t3": round(r.stage3_time, 2),
"t3s": round(r.stage3s_time, 2),
"t3p": round(r.stage3p_time, 2),
"err": r.error,
"issues": r.issues,
}
f.write(json.dumps(row, ensure_ascii=False) + "\n")
print(f"\nCompact results saved to: {out_path}")
# Write full detail file (for analysis scripts)
with detail_path.open("w", encoding="utf-8") as f:
f.write(json.dumps(meta, ensure_ascii=False) + "\n")
for r in results:
row = {
"sample_id": r.sample_id,
"caption": r.caption,
"ground_truth_tags": sorted(r.ground_truth_tags),
"rewrite_phrases": r.rewrite_phrases,
"retrieved_tags": sorted(r.retrieved_tags),
"selected_tags": sorted(r.selected_tags),
"stage3_selected": sorted(r.stage3_selected_tags),
"stage3_selected_scores": r.stage3_selected_scores,
"stage3_selected_ranks": r.stage3_selected_ranks,
"stage3_selected_phrase_ranks": r.stage3_selected_phrase_ranks,
"implied_tags": sorted(r.implied_tags),
"structural_tags": r.structural_tags,
"probe_tags": r.probe_tags,
"categorized_suggestions": r.categorized_suggestions,
"why_counts": r.why_counts,
"stage3_diag": r.stage3_diag,
"tag_evidence": r.tag_evidence,
"gt_character_tags": sorted(r.gt_character_tags),
"selected_character_tags": sorted(r.selected_character_tags),
"gt_general_tags": sorted(r.gt_general_tags),
"selected_general_tags": sorted(r.selected_general_tags),
"error": r.error,
"issues": r.issues,
}
f.write(json.dumps(row, ensure_ascii=False) + "\n")
print(f"Detail results saved to: {detail_path}")
return 0
if __name__ == "__main__":
sys.exit(main())