from __future__ import annotations import argparse import json import logging import math import os import pickle import random import sys import time from pathlib import Path from typing import Dict, List, Optional, Tuple import numpy as np _THIS_FILE = os.path.abspath(__file__) _EXP_DIR = os.path.dirname(_THIS_FILE) _EXPERIMENTS = os.path.dirname(_EXP_DIR) _PROJECT_ROOT = os.path.dirname(_EXPERIMENTS) _SRC_DIR = os.path.join(_PROJECT_ROOT, "src") for _p in [_SRC_DIR, _PROJECT_ROOT]: if _p not in sys.path: sys.path.insert(0, _p) logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s [pairwise] %(message)s", datefmt="%H:%M:%S", ) logger = logging.getLogger("pairwise_llm") _PROVIDER_SLEEP: Dict[str, float] = { "groq": 2.1, "anthropic": 0.1, "ollama": 0.5, "cerebras": 0.5, } _PROVIDER_PRICE: Dict[str, Tuple[float, float]] = { "groq": (0.0, 0.0), "anthropic": (3.0, 15.0), "ollama": (0.0, 0.0), "cerebras": (0.0, 0.0), } _DEFAULT_MODELS: Dict[str, str] = { "groq": "llama-3.1-8b-instant", "anthropic": "claude-sonnet-4-6", "ollama": "gemma3:4b", "cerebras": "llama3.1-8b", } def build_jd_summary(jd_config) -> str: lines = ["JOB: Senior AI/ML Engineer — Retrieval & Ranking Systems"] lines.append("HARD REQUIREMENTS (must have):") for req in jd_config.hard_requirements: lines.append(f" - {req}") lines.append("PREFERRED (good to have):") preferred = jd_config.preferred_requirements keys = list(preferred.keys()) if isinstance(preferred, dict) else list(preferred)[:6] for req in keys[:6]: lines.append(f" - {req}") lines.append("EXPLICIT DISQUALIFIERS:") lines.append(" - Entire career at IT-services/consulting firms (TCS, Infosys, Wipro, etc.)") lines.append(" - AI experience is only LangChain/OpenAI API with no pre-LLM IR or ML foundation") lines.append(" - CV/speech-only ML background with no NLP/IR experience") lines.append(" - Title-chaser: avg tenure < 15 months across 3+ jobs") lines.append("LOCATION PREFERENCE: Noida or Pune strongly preferred; other India acceptable; " "outside India only if willing to relocate (no visa sponsorship)") lines.append("EXPERIENCE: 5-9 years preferred") lines.append("NOTICE PERIOD: Sub-30 days preferred; 30+ days raises the bar") return "\n".join(lines) def build_candidate_summary(candidate: dict) -> str: profile = candidate.get("profile", {}) or {} signals = candidate.get("redrob_signals", {}) or {} lines = [] lines.append(f"ID: {candidate.get('candidate_id', 'unknown')}") lines.append( f"Title: {profile.get('current_title', 'unknown')} " f"at {profile.get('current_company', 'unknown')}" ) lines.append(f"YOE: {profile.get('years_of_experience', 0)}") lines.append( f"Location: {profile.get('location', 'unknown')}, " f"{profile.get('country', 'unknown')}" ) skills = sorted( candidate.get("skills", []) or [], key=lambda s: s.get("duration_months", 0), reverse=True, )[:5] assessments = signals.get("skill_assessment_scores", {}) or {} skill_lines = [] for s in skills: name = s.get("name", "") prof = s.get("proficiency", "") dur = s.get("duration_months", 0) score = assessments.get(name) if score is not None: skill_lines.append(f"{name} ({prof}, {dur}mo, assessed: {score}/100)") else: skill_lines.append(f"{name} ({prof}, {dur}mo, unverified)") lines.append(f"Skills: {'; '.join(skill_lines)}") for i, role in enumerate((candidate.get("career_history", []) or [])[:3]): desc = (role.get("description") or "")[:60].replace("\n", " ") lines.append( f"Role {i+1}: {role.get('title')} @ {role.get('company')} " f"({role.get('industry')}, {role.get('company_size')}, " f"{role.get('duration_months')}mo) — {desc}..." ) lines.append(f"Notice: {signals.get('notice_period_days', 'unknown')} days") lines.append(f"Last active: {signals.get('last_active_date', 'unknown')}") lines.append(f"GitHub score: {signals.get('github_activity_score', -1)}") lines.append(f"Response rate: {signals.get('recruiter_response_rate', 'unknown')}") lines.append(f"Willing to relocate: {signals.get('willing_to_relocate', 'unknown')}") return "\n".join(lines) def _call_groq(client, model: str, prompt: str) -> Tuple[str, int, int]: response = client.chat.completions.create( model=model, max_tokens=10, messages=[{"role": "user", "content": prompt}], ) text = response.choices[0].message.content.strip().upper() return text, response.usage.prompt_tokens, response.usage.completion_tokens def _call_anthropic(client, model: str, prompt: str) -> Tuple[str, int, int]: response = client.messages.create( model=model, max_tokens=10, messages=[{"role": "user", "content": prompt}], ) text = response.content[0].text.strip().upper() return text, response.usage.input_tokens, response.usage.output_tokens def _call_cerebras(client, model: str, prompt: str) -> Tuple[str, int, int]: response = client.chat.completions.create( model=model, max_tokens=10, messages=[{"role": "user", "content": prompt}], ) text = response.choices[0].message.content.strip().upper() return text, response.usage.prompt_tokens, response.usage.completion_tokens def _call_ollama(model: str, prompt: str) -> Tuple[str, int, int]: import requests as _req try: response = _req.post( "http://localhost:11434/api/generate", json={ "model": model, "prompt": prompt, "stream": False, "options": { "temperature": 0, "num_predict": 10, "num_ctx": 2048, "num_gpu": 99, "stop": ["\n", ".", " \n"], }, }, timeout=120, ) response.raise_for_status() raw = response.json()["response"].strip().upper() if "CANDIDATE_A" in raw: return "CANDIDATE_A", 0, 0 elif "CANDIDATE_B" in raw: return "CANDIDATE_B", 0, 0 else: return "TIE", 0, 0 except _req.exceptions.ConnectionError: raise RuntimeError( "Cannot connect to Ollama at localhost:11434. " "It starts automatically on Windows after install. " "Verify with: ollama list" ) except Exception as e: raise RuntimeError(f"Ollama call failed: {e}") def get_pairwise_judgment( client, provider: str, model: str, jd_summary: str, summary_a: str, summary_b: str, pair_idx: int, ) -> Tuple[str, int, int]: prompt = f"""You are an expert technical recruiter. Read the job requirements and both candidate profiles carefully, then judge which candidate is the stronger fit. {jd_summary} --- CANDIDATE A --- {summary_a} --- CANDIDATE B --- {summary_b} Which candidate is a better fit for this specific role? Respond with EXACTLY one of these three strings and nothing else: CANDIDATE_A CANDIDATE_B TIE No explanation. No punctuation. Just the label.""" def _dispatch(): if provider == "groq": return _call_groq(client, model, prompt) elif provider == "anthropic": return _call_anthropic(client, model, prompt) elif provider == "cerebras": return _call_cerebras(client, model, prompt) elif provider == "ollama": return _call_ollama(model, prompt) else: raise ValueError(f"Unknown provider: {provider}") try: text, inp, out = _dispatch() except Exception as e: logger.warning("API error on pair %d: %s", pair_idx, e) time.sleep(5) try: text, inp, out = _dispatch() except Exception as e2: logger.warning("Retry failed on pair %d: %s — defaulting to TIE", pair_idx, e2) return "TIE", 0, 0 verdict = text if text in ("CANDIDATE_A", "CANDIDATE_B", "TIE") else "TIE" if text not in ("CANDIDATE_A", "CANDIDATE_B", "TIE"): logger.warning("Pair %d: unexpected output %r — defaulting to TIE", pair_idx, text) return verdict, inp, out def compute_elo_scores( annotations: List[dict], candidate_ids: List[str], ) -> Dict[str, float]: wins: Dict[str, float] = {cid: 0.0 for cid in candidate_ids} losses: Dict[str, float] = {cid: 0.0 for cid in candidate_ids} for ann in annotations: a, b, verdict = ann["candidate_a"], ann["candidate_b"], ann["verdict"] if verdict == "CANDIDATE_A": wins[a] += 1.0; losses[b] += 1.0 elif verdict == "CANDIDATE_B": wins[b] += 1.0; losses[a] += 1.0 else: wins[a] += 0.5; losses[a] += 0.5 wins[b] += 0.5; losses[b] += 0.5 elo: Dict[str, float] = {} for cid in candidate_ids: total = wins[cid] + losses[cid] if total == 0: elo[cid] = 1500.0 else: win_rate = (wins[cid] + 0.5) / (total + 1) elo[cid] = 400 * math.log10(win_rate / (1 - win_rate)) + 1500 return elo def elo_to_labels(elo_scores: Dict[str, float]) -> Dict[str, int]: values = sorted(elo_scores.values()) n = len(values) q75 = values[int(0.75 * n)] q50 = values[int(0.50 * n)] q25 = values[int(0.25 * n)] labels: Dict[str, int] = {} for cid, elo in elo_scores.items(): if elo >= q75: labels[cid] = 3 elif elo >= q50: labels[cid] = 2 elif elo >= q25: labels[cid] = 1 else: labels[cid] = 0 return labels def _get_top_skill(candidate: dict) -> str: skills = sorted( candidate.get("skills", []) or [], key=lambda s: s.get("duration_months", 0), reverse=True, ) return skills[0].get("name", "N/A") if skills else "N/A" def _spearman( candidate_ids: List[str], ranks_a: Dict[str, int], ranks_b: Dict[str, int], ) -> float: from scipy.stats import spearmanr common = [cid for cid in candidate_ids if cid in ranks_a and cid in ranks_b] if len(common) < 2: return 0.0 ra = [ranks_a[cid] for cid in common] rb = [ranks_b[cid] for cid in common] rho, _ = spearmanr(ra, rb) return float(rho) def print_model_comparison( stage1_candidates: Dict[str, dict], stage1_ids: List[str], bm25_scores: Dict[str, float], stage1_bm25_median: float, jd_config, old_model, new_model, feature_columns: List[str], ) -> None: from features import build_feature_vector logger.info("Building full feature matrix for comparison report...") feature_rows = [] ordered_ids = [] consistency_map: Dict[str, float] = {} for cid in stage1_ids: candidate = stage1_candidates.get(cid) if candidate is None: continue bs = bm25_scores.get(cid, 0.0) try: fv = build_feature_vector( candidate, jd_config, bm25_score=bs, stage1_bm25_median=stage1_bm25_median, ) row = [fv[col] for col in feature_columns] consistency_map[cid] = float(fv.get("consistency_score", 1.0)) except Exception as e: logger.warning("Feature extraction failed for %s: %s", cid, e) row = [0.0] * len(feature_columns) consistency_map[cid] = 1.0 feature_rows.append(row) ordered_ids.append(cid) X_full = np.array(feature_rows, dtype=np.float32) logger.info("Comparison feature matrix: shape=%s", X_full.shape) old_raw = old_model.predict(X_full) old_scores = {cid: float(s) for cid, s in zip(ordered_ids, old_raw)} old_ranked = sorted(old_scores.items(), key=lambda x: (-x[1], x[0])) old_rank_map = {cid: rank for rank, (cid, _) in enumerate(old_ranked, 1)} new_raw = new_model.predict(X_full) new_scores = { cid: float(s) * consistency_map.get(cid, 1.0) for cid, s in zip(ordered_ids, new_raw) } new_ranked = sorted(new_scores.items(), key=lambda x: (-x[1], x[0])) new_rank_map = {cid: rank for rank, (cid, _) in enumerate(new_ranked, 1)} old_top10 = [cid for cid, _ in old_ranked[:10]] new_top10 = [cid for cid, _ in new_ranked[:10]] overlap = len(set(old_top10) & set(new_top10)) top100_old = [cid for cid, _ in old_ranked[:100]] rho = _spearman(top100_old, old_rank_map, new_rank_map) moved_up: List[Tuple[str, int, int]] = [] moved_down: List[Tuple[str, int, int]] = [] for cid in ordered_ids: old_r = old_rank_map.get(cid, 9999) new_r = new_rank_map.get(cid, 9999) delta = old_r - new_r if delta >= 20: moved_up.append((cid, old_r, new_r)) elif delta <= -20: moved_down.append((cid, old_r, new_r)) moved_up.sort(key=lambda x: x[1] - x[2], reverse=True) moved_down.sort(key=lambda x: x[2] - x[1], reverse=True) new_top100 = [cid for cid, _ in new_ranked[:100]] low_cons_count = sum(1 for cid in new_top100 if consistency_map.get(cid, 1.0) < 0.25) honeypot_pass = low_cons_count < 10 print("\n" + "=" * 60) print("=== MODEL COMPARISON REPORT ===") print("=" * 60) print("\nCurrent model (heuristic labels) top-10:") for rank, cid in enumerate(old_top10, 1): c = stage1_candidates.get(cid, {}) p = c.get("profile", {}) or {} print(f" {rank:2d}. {cid} — {p.get('current_title','N/A')}, " f"{p.get('years_of_experience',0)}y, {_get_top_skill(c)}") print("\nNew model (LLM pairwise labels + consistency multiplier) top-10:") for rank, cid in enumerate(new_top10, 1): c = stage1_candidates.get(cid, {}) p = c.get("profile", {}) or {} cons = consistency_map.get(cid, 1.0) print(f" {rank:2d}. {cid} — {p.get('current_title','N/A')}, " f"{p.get('years_of_experience',0)}y, {_get_top_skill(c)}, " f"cons={cons:.2f}") print(f"\nOverlap: {overlap} of 10 top-10 candidates appear in both rankings") print(f"Spearman correlation (top-100): {rho:.3f} " f"[range: -1.0 to +1.0, higher = more agreement]") print("\nCandidates that MOVED UP 20+ positions in new model:") for cid, old_r, new_r in moved_up[:10]: c = stage1_candidates.get(cid, {}) p = c.get("profile", {}) or {} print(f" - {cid}: old={old_r}, new={new_r} | " f"{p.get('current_title','N/A')}, {_get_top_skill(c)}") print("\nCandidates that MOVED DOWN 20+ positions in new model:") for cid, old_r, new_r in moved_down[:10]: c = stage1_candidates.get(cid, {}) p = c.get("profile", {}) or {} print(f" - {cid}: old={old_r}, new={new_r} | " f"{p.get('current_title','N/A')}, {_get_top_skill(c)}") print(f"\nConsistency check — low-consistency (< 0.25) in new top-100:") print(f" Count: {low_cons_count} (must be < 10 to pass honeypot audit)") print(f" NOTE: consistency multiplier applied — this number should now be 0.") print("\n" + "=" * 60) print("=== VERDICT ===") print(f"Honeypot audit: {'PASS ✓' if honeypot_pass else 'FAIL ✗'}") print(f"Top-10 overlap with current: {overlap}/10") print(f"Spearman correlation: {rho:.3f}") if honeypot_pass and rho > 0.4: rec = "PROMISING — consider swapping model" elif honeypot_pass and rho <= 0.4: rec = "MIXED — honeypot passes but ranking diverges significantly; review movers" else: rec = "RISKY — honeypot audit fails; do not swap without further investigation" print(f"Recommendation: {rec}") print("=" * 60 + "\n") if not honeypot_pass: logger.error( "HONEYPOT AUDIT FAILED: %d low-consistency candidates in new top-100. " "The consistency multiplier should have fixed this — check that " "consistency_score is being computed correctly for these candidates.", low_cons_count, ) else: logger.info("Honeypot audit PASSED: %d low-consistency in new top 100.", low_cons_count) def main() -> None: parser = argparse.ArgumentParser( description=( "Offline pairwise LLM annotation experiment. " "If lgbm_model_llm.pkl already exists, runs Step 11 comparison only. " "NEVER imported by rank.py or any production module." ), formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) parser.add_argument("--candidates", required=True) parser.add_argument("--base-dir", required=True) parser.add_argument( "--provider", default="ollama", choices=["groq", "anthropic", "ollama", "cerebras"], ) parser.add_argument("--model", default=None) parser.add_argument("--api-key", default=None) parser.add_argument("--ollama-url", default="http://localhost:11434") args = parser.parse_args() provider = args.provider model = args.model or _DEFAULT_MODELS[provider] call_sleep = _PROVIDER_SLEEP[provider] price_in, price_out = _PROVIDER_PRICE[provider] if provider in ("groq", "anthropic", "cerebras") and not args.api_key: logger.error("--api-key is required when --provider is %s", provider) sys.exit(1) if provider == "ollama" and args.api_key: logger.info("--api-key ignored for ollama provider") base_dir = os.path.abspath(args.base_dir) candidates_path = os.path.abspath(args.candidates) precomputed_dir = os.path.join(base_dir, "precomputed") data_dir = os.path.join(base_dir, "data") annotations_path = os.path.join(_EXP_DIR, "annotations.jsonl") new_model_path = os.path.join(precomputed_dir, "lgbm_model_llm.pkl") old_model_path = os.path.join(precomputed_dir, "lgbm_model.pkl") logger.info("=" * 60) logger.info("PAIRWISE LLM ANNOTATION EXPERIMENT") logger.info("Provider: %s | Model: %s", provider, model) logger.info("Rate limit sleep: %.1fs between calls", call_sleep) logger.info("Cost: %s", "FREE" if price_in == 0 else f"${price_in}/M input, ${price_out}/M output") logger.info("Base dir: %s", base_dir) logger.info("Annotations: %s", annotations_path) logger.info("New model → %s", new_model_path) logger.info("Old model %s (will NOT be touched)", old_model_path) logger.info("=" * 60) if provider == "ollama": import requests as _req try: r = _req.get("http://localhost:11434/api/tags", timeout=5) r.raise_for_status() available = [m["name"] for m in r.json().get("models", [])] found = ( model in available or model.split(":")[0] in [m.split(":")[0] for m in available] ) if not found: logger.error( "Model '%s' not in Ollama. Available: %s. " "Pull it: ollama pull %s", model, available, model ) sys.exit(1) logger.info("Ollama reachable. Model '%s' available.", model) except _req.exceptions.ConnectionError: logger.error( "Ollama not running at localhost:11434. " "On Windows it auto-starts after install — " "check Task Manager for 'ollama' process." ) sys.exit(1) from features import build_feature_vector, FEATURE_COLUMNS from features import ( c1_timeline_impossibility, c2_signup_anomaly, c3_salary_inversion, c4_assessment_contradiction, c5_engagement_mismatch, ) from jd_parser import parse_jd from retrieval import load_numpy_bm25_artifacts, run_dual_pass_retrieval logger.info("STEP 1: Loading Stage 1 candidate pool...") bm25 = load_numpy_bm25_artifacts(precomputed_dir) if bm25 is None: bm25_path = os.path.join(precomputed_dir, "bm25_index.pkl") if not os.path.isfile(bm25_path): logger.error("Missing bm25_index.pkl — run precompute.py first.") sys.exit(1) with open(bm25_path, "rb") as f: bm25 = pickle.load(f) logger.info("Loaded legacy BM25Okapi") else: logger.info("Loaded NumpyBM25 (fast path)") ids_path = os.path.join(precomputed_dir, "candidate_ids.pkl") with open(ids_path, "rb") as f: all_candidate_ids = pickle.load(f) aliases_path = os.path.join(data_dir, "skill_aliases.json") jd_config = parse_jd(aliases_path) logger.info( "JD config: %d hard reqs, %d preferred reqs", len(jd_config.hard_requirements), len(jd_config.preferred_requirements), ) stage1_ids, bm25_scores = run_dual_pass_retrieval(bm25, all_candidate_ids, jd_config) stage1_bm25_median = float(np.median(list(bm25_scores.values()))) logger.info("Stage 1 pool: %d candidates, median BM25=%.4f", len(stage1_ids), stage1_bm25_median) offsets_path = os.path.join(precomputed_dir, "candidate_offsets.pkl") stage1_candidate_list: List[dict] = [] if os.path.isfile(offsets_path): with open(offsets_path, "rb") as f: candidate_offsets = pickle.load(f) logger.info("Loading Stage 1 records via byte-offset index...") with open(candidates_path, "rb") as f: for cid in stage1_ids: offset = candidate_offsets.get(cid) if offset is None: continue f.seek(offset) raw = f.readline() try: c = json.loads(raw.decode("utf-8", errors="ignore").strip()) stage1_candidate_list.append(c) except json.JSONDecodeError: pass else: logger.info("No offset index — streaming JSONL (slow)...") stage1_id_set = set(stage1_ids) found: Dict[str, dict] = {} with open(candidates_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: c = json.loads(line) except json.JSONDecodeError: continue cid = c.get("candidate_id") if cid and cid in stage1_id_set: found[cid] = c if len(found) == len(stage1_id_set): break stage1_candidate_list = [found[cid] for cid in stage1_ids if cid in found] stage1_candidates: Dict[str, dict] = { c.get("candidate_id"): c for c in stage1_candidate_list if c.get("candidate_id") } logger.info("Stage 1 records loaded: %d candidates", len(stage1_candidates)) model_already_exists = os.path.isfile(new_model_path) annots_already_exist = os.path.isfile(annotations_path) if model_already_exists and annots_already_exist: logger.info( "lgbm_model_llm.pkl and annotations.jsonl both exist — " "skipping Steps 2-10, running Step 11 comparison only." ) with open(old_model_path, "rb") as f: old_model = pickle.load(f) with open(new_model_path, "rb") as f: new_model = pickle.load(f) logger.info("STEP 11: Generating model comparison report...") print_model_comparison( stage1_candidates=stage1_candidates, stage1_ids=stage1_ids, bm25_scores=bm25_scores, stage1_bm25_median=stage1_bm25_median, jd_config=jd_config, old_model=old_model, new_model=new_model, feature_columns=FEATURE_COLUMNS, ) logger.info("=" * 60) logger.info("EXPERIMENT COMPLETE") logger.info("New model: %s", new_model_path) logger.info( "To swap into production: copy %s %s", new_model_path, old_model_path, ) logger.info("(Manual, deliberate action only — verify top-10 first)") logger.info("=" * 60) return logger.info("STEP 2: Stratified sampling of 500 candidates...") random.seed(42) from features import build_feature_vector import lightgbm as lgb with open(old_model_path, "rb") as f: old_model_for_ranking = pickle.load(f) logger.info("Computing feature vectors for all Stage 1 candidates...") all_feature_rows = [] all_fv_ids = [] consistency_scores_all: Dict[str, float] = {} for cid in stage1_ids: candidate = stage1_candidates.get(cid) if candidate is None: continue bs = bm25_scores.get(cid, 0.0) try: fv = build_feature_vector(candidate, jd_config, bm25_score=bs, stage1_bm25_median=stage1_bm25_median) row = [fv[col] for col in FEATURE_COLUMNS] consistency_scores_all[cid] = float(fv.get("consistency_score", 1.0)) except Exception: row = [0.0] * len(FEATURE_COLUMNS) consistency_scores_all[cid] = 1.0 all_feature_rows.append(row) all_fv_ids.append(cid) X_all = np.array(all_feature_rows, dtype=np.float32) logger.info("Feature matrix (Stage 1): shape=%s", X_all.shape) raw_scores = old_model_for_ranking.predict(X_all) lgbm_ranked = sorted(zip(all_fv_ids, raw_scores), key=lambda x: -x[1]) lgbm_rank_map = {cid: rank for rank, (cid, _) in enumerate(lgbm_ranked, 1)} TOTAL = 500 N_A, N_B, N_C = 75, 100, 325 MIN_LOW_CONS = 25 top100_cids = [cid for cid, _ in lgbm_ranked[:100]] ranks_101_300 = [cid for cid, _ in lgbm_ranked[100:300]] ranks_301_plus = [cid for cid, _ in lgbm_ranked[300:]] stratum_a = random.sample(top100_cids, min(N_A, len(top100_cids))) stratum_b = random.sample(ranks_101_300, min(N_B, len(ranks_101_300))) low_cons_pool = [cid for cid in ranks_301_plus if consistency_scores_all.get(cid, 1.0) < 0.5] guaranteed_low = random.sample(low_cons_pool, min(MIN_LOW_CONS, len(low_cons_pool))) remaining_c = [cid for cid in ranks_301_plus if cid not in guaranteed_low] fill_c = random.sample(remaining_c, max(0, N_C - len(guaranteed_low))) stratum_c = guaranteed_low + fill_c sample_ids = list(dict.fromkeys(stratum_a + stratum_b + stratum_c))[:TOTAL] logger.info( "Stratum sizes: A=%d (top-50 + 25 from 51-150), B=%d (51-150), C=%d (151+)", len(stratum_a), len(stratum_b), len(stratum_c), ) logger.info("Low-consistency guaranteed in Stratum C: %d (target: ≥%d)", len(guaranteed_low), MIN_LOW_CONS) logger.info("Total sample pool: %d candidates", len(sample_ids)) logger.info("STEP 3: Generating pairwise matchups (5 opponents per candidate)...") N_OPPONENTS = 5 seen_pairs: set = set() pairs: List[Tuple[str, str]] = [] for cid_a in sample_ids: pool = [c for c in sample_ids if c != cid_a] random.shuffle(pool) count = 0 for cid_b in pool: key = frozenset({cid_a, cid_b}) if key not in seen_pairs and count < N_OPPONENTS: seen_pairs.add(key) pairs.append((cid_a, cid_b)) count += 1 logger.info("Unique pairs generated: %d", len(pairs)) logger.info("STEP 6: Annotating pairs with %s (%s)...", provider, model) existing_annotations: List[dict] = [] existing_pair_keys: set = set() if os.path.isfile(annotations_path): logger.info("Found existing annotations file — loading for resumability...") with open(annotations_path, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: ann = json.loads(line) existing_annotations.append(ann) existing_pair_keys.add(frozenset({ann["candidate_a"], ann["candidate_b"]})) except json.JSONDecodeError: pass logger.info("Loaded %d existing annotations (will skip these pairs)", len(existing_annotations)) remaining_pairs = [(a, b) for a, b in pairs if frozenset({a, b}) not in existing_pair_keys] logger.info("Pairs remaining to annotate: %d of %d", len(remaining_pairs), len(pairs)) jd_summary = build_jd_summary(jd_config) client = None if provider == "groq": from groq import Groq client = Groq(api_key=args.api_key) logger.info("Groq client initialized (model: %s)", model) elif provider == "anthropic": import anthropic as _anthropic client = _anthropic.Anthropic(api_key=args.api_key) logger.info("Anthropic client initialized (model: %s)", model) elif provider == "cerebras": from cerebras.cloud.sdk import Cerebras client = Cerebras(api_key=args.api_key) logger.info("Cerebras client initialized (model: %s)", model) else: logger.info("Ollama provider: calls go directly to localhost:11434 via requests") logger.info("Running 5-call timing probe for Ollama...") probe_pairs = remaining_pairs[:5] if len(remaining_pairs) >= 5 else pairs[:5] probe_times = [] probe_inp = [] probe_out = [] for i, (a, b) in enumerate(probe_pairs): sa = build_candidate_summary(stage1_candidates.get(a, {"candidate_id": a})) sb = build_candidate_summary(stage1_candidates.get(b, {"candidate_id": b})) t0 = time.time() _, inp, out = get_pairwise_judgment(client, provider, model, jd_summary, sa, sb, i) elapsed = time.time() - t0 probe_times.append(elapsed) probe_inp.append(inp) probe_out.append(out) time.sleep(call_sleep) avg_secs = sum(probe_times) / len(probe_times) avg_inp = sum(probe_inp) / len(probe_inp) avg_out = sum(probe_out) / len(probe_out) est_min = (avg_secs + call_sleep) * len(remaining_pairs) / 60 est_cost = (avg_inp * len(remaining_pairs) / 1e6 * price_in + avg_out * len(remaining_pairs) / 1e6 * price_out) print("\n" + "=" * 50) print("=== RUN ESTIMATE ===") print(f"Provider: {provider} ({model})") print(f"Pairs to annotate: {len(remaining_pairs)}") if provider in ("groq", "anthropic", "cerebras"): print(f"Avg input tokens per call: {avg_inp:.0f}") else: print(f"Avg seconds per call: {avg_secs:.1f}s") print(f"Estimated cost: {'FREE' if est_cost == 0 else f'${est_cost:.2f}'}") print(f"Estimated time: ~{est_min:.0f} min ({est_min/60:.1f} hrs)") if provider == "ollama": print(f"GPU acceleration: {'YES' if avg_secs < 2.0 else 'NO — running on CPU (slow)'}") print("=" * 50) confirm = input("Proceed with full run? (yes/no): ").strip().lower() if confirm != "yes": logger.info("User declined — exiting. Run again to resume.") sys.exit(0) logger.info("Starting full annotation run (%d pairs remaining)...", len(remaining_pairs)) total_inp = 0 total_out = 0 annot_file = open(annotations_path, "a", encoding="utf-8") try: for idx, (cid_a, cid_b) in enumerate(remaining_pairs): sa = build_candidate_summary(stage1_candidates.get(cid_a, {"candidate_id": cid_a})) sb = build_candidate_summary(stage1_candidates.get(cid_b, {"candidate_id": cid_b})) verdict, inp, out = get_pairwise_judgment( client, provider, model, jd_summary, sa, sb, idx ) total_inp += inp total_out += out record = { "pair_id": idx, "candidate_a": cid_a, "candidate_b": cid_b, "verdict": verdict, "input_tokens": inp, "output_tokens": out, } annot_file.write(json.dumps(record) + "\n") annot_file.flush() existing_annotations.append(record) time.sleep(call_sleep) if (idx + 1) % 100 == 0: cost_so_far = (total_inp / 1e6 * price_in + total_out / 1e6 * price_out) logger.info( "Progress: %d/%d pairs | cost: $%.2f | elapsed: ~%d min", idx + 1, len(remaining_pairs), cost_so_far, int((idx+1)*(avg_secs+call_sleep)/60) ) except KeyboardInterrupt: logger.info("") logger.info("=" * 60) logger.info("INTERRUPTED by user (Ctrl+C) — progress saved cleanly.") logger.info("Pairs completed so far: %d", len(existing_annotations)) logger.info("Annotations file: %s", annotations_path) logger.info("Re-run the same command to resume from pair %d.", len(existing_annotations)) logger.info("=" * 60) annot_file.close() sys.exit(0) finally: annot_file.close() actual_cost = total_inp / 1e6 * price_in + total_out / 1e6 * price_out logger.info( "Annotation complete. Total tokens: %d input / %d output. " "Actual total cost: $%.2f", total_inp, total_out, actual_cost, ) logger.info("STEP 7: Computing Elo scores from pairwise verdicts...") elo_scores = compute_elo_scores(existing_annotations, sample_ids) elo_vals = list(elo_scores.values()) logger.info( "Elo distribution: min=%.1f, max=%.1f, mean=%.1f, std=%.1f", min(elo_vals), max(elo_vals), sum(elo_vals)/len(elo_vals), float(np.std(elo_vals)), ) winners = sum(1 for v in elo_vals if v > 1500) logger.info("Elo above 1500 (winners): %d | at/below 1500 (losers): %d", winners, len(elo_vals) - winners) logger.info("STEP 8: Converting Elo scores to 0-3 relevance labels...") labels = elo_to_labels(elo_scores) dist = {0: 0, 1: 0, 2: 0, 3: 0} for v in labels.values(): dist[v] += 1 logger.info("Label distribution: 0=%d, 1=%d, 2=%d, 3=%d", dist[0], dist[1], dist[2], dist[3]) if dist[3] < 30: logger.warning( "Only %d candidates with label 3 — training signal may be sparse.", dist[3] ) logger.info("STEP 9: Extracting feature matrix for %d annotated candidates...", len(sample_ids)) train_rows = [] train_ids = [] for cid in sample_ids: candidate = stage1_candidates.get(cid) if candidate is None: continue bs = bm25_scores.get(cid, 0.0) try: fv = build_feature_vector(candidate, jd_config, bm25_score=bs, stage1_bm25_median=stage1_bm25_median) row = [fv[col] for col in FEATURE_COLUMNS] except Exception as e: logger.warning("Feature extraction failed for %s: %s", cid, e) row = [0.0] * len(FEATURE_COLUMNS) train_rows.append(row) train_ids.append(cid) X_train_full = np.array(train_rows, dtype=np.float32) y_full = np.array([labels.get(cid, 0) for cid in train_ids], dtype=np.int32) logger.info("Feature matrix (%d): shape=%s", len(sample_ids), X_train_full.shape) logger.info("STEP 10: Training LightGBM on LLM pairwise labels...") random.seed(42) n_val = int(len(train_ids) * 0.2) perm = list(range(len(train_ids))) random.shuffle(perm) val_idx = perm[:n_val] train_idx = perm[n_val:] X_tr = X_train_full[train_idx] y_tr = y_full[train_idx] X_vl = X_train_full[val_idx] y_vl = y_full[val_idx] logger.info("Train/val split: %d train, %d val", len(train_idx), len(val_idx)) dist_tr = {k: int((y_tr == k).sum()) for k in [0,1,2,3]} logger.info("Train label distribution: 0=%d, 1=%d, 2=%d, 3=%d", *[dist_tr[k] for k in [0,1,2,3]]) train_ds = lgb.Dataset(X_tr, label=y_tr, group=[len(train_idx)], feature_name=FEATURE_COLUMNS) val_ds = lgb.Dataset(X_vl, label=y_vl, group=[len(val_idx)], feature_name=FEATURE_COLUMNS, reference=train_ds) params = { "objective": "lambdarank", "metric": "ndcg", "eval_at": [5, 10, 50], "num_leaves": 63, "learning_rate": 0.05, "min_child_samples": 20, "subsample": 0.8, "colsample_bytree": 0.8, "random_state": 42, "n_jobs": -1, "verbose": -1, } t0 = time.time() new_model = lgb.train( params, train_ds, num_boost_round=300, valid_sets=[val_ds], callbacks=[ lgb.early_stopping(stopping_rounds=30, verbose=False), lgb.log_evaluation(period=50), ], ) logger.info("LightGBM training complete in %.1fs", time.time() - t0) importances = sorted( zip(FEATURE_COLUMNS, new_model.feature_importance(importance_type="gain")), key=lambda x: x[1], reverse=True, ) logger.info("Top 5 feature importances (gain):") for fname, imp in importances[:5]: logger.info(" %s: %.2f", fname, imp) with open(new_model_path, "wb") as f: pickle.dump(new_model, f) logger.info("New model saved to: %s", new_model_path) logger.info("lgbm_model.pkl untouched: %s", old_model_path) logger.info("STEP 11: Generating model comparison report...") with open(old_model_path, "rb") as f: old_model_final = pickle.load(f) print_model_comparison( stage1_candidates=stage1_candidates, stage1_ids=stage1_ids, bm25_scores=bm25_scores, stage1_bm25_median=stage1_bm25_median, jd_config=jd_config, old_model=old_model_final, new_model=new_model, feature_columns=FEATURE_COLUMNS, ) logger.info("=" * 60) logger.info("EXPERIMENT COMPLETE") logger.info("Annotations: %s", annotations_path) logger.info("New model: %s", new_model_path) logger.info( "To swap into production: copy %s %s (manual, deliberate action only)", new_model_path, old_model_path, ) logger.info("=" * 60) if __name__ == "__main__": main()