| from __future__ import annotations | |
| import argparse | |
| import json | |
| import logging | |
| import math | |
| import os | |
| import pickle | |
| import random | |
| import sys | |
| import time | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Tuple | |
| import numpy as np | |
| _THIS_FILE = os.path.abspath(__file__) | |
| _EXP_DIR = os.path.dirname(_THIS_FILE) | |
| _EXPERIMENTS = os.path.dirname(_EXP_DIR) | |
| _PROJECT_ROOT = os.path.dirname(_EXPERIMENTS) | |
| _SRC_DIR = os.path.join(_PROJECT_ROOT, "src") | |
| for _p in [_SRC_DIR, _PROJECT_ROOT]: | |
| if _p not in sys.path: | |
| sys.path.insert(0, _p) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s [pairwise] %(message)s", | |
| datefmt="%H:%M:%S", | |
| ) | |
| logger = logging.getLogger("pairwise_llm") | |
| _PROVIDER_SLEEP: Dict[str, float] = { | |
| "groq": 2.1, | |
| "anthropic": 0.1, | |
| "ollama": 0.5, | |
| "cerebras": 0.5, | |
| } | |
| _PROVIDER_PRICE: Dict[str, Tuple[float, float]] = { | |
| "groq": (0.0, 0.0), | |
| "anthropic": (3.0, 15.0), | |
| "ollama": (0.0, 0.0), | |
| "cerebras": (0.0, 0.0), | |
| } | |
| _DEFAULT_MODELS: Dict[str, str] = { | |
| "groq": "llama-3.1-8b-instant", | |
| "anthropic": "claude-sonnet-4-6", | |
| "ollama": "gemma3:4b", | |
| "cerebras": "llama3.1-8b", | |
| } | |
| def build_jd_summary(jd_config) -> str: | |
| lines = ["JOB: Senior AI/ML Engineer — Retrieval & Ranking Systems"] | |
| lines.append("HARD REQUIREMENTS (must have):") | |
| for req in jd_config.hard_requirements: | |
| lines.append(f" - {req}") | |
| lines.append("PREFERRED (good to have):") | |
| preferred = jd_config.preferred_requirements | |
| keys = list(preferred.keys()) if isinstance(preferred, dict) else list(preferred)[:6] | |
| for req in keys[:6]: | |
| lines.append(f" - {req}") | |
| lines.append("EXPLICIT DISQUALIFIERS:") | |
| lines.append(" - Entire career at IT-services/consulting firms (TCS, Infosys, Wipro, etc.)") | |
| lines.append(" - AI experience is only LangChain/OpenAI API with no pre-LLM IR or ML foundation") | |
| lines.append(" - CV/speech-only ML background with no NLP/IR experience") | |
| lines.append(" - Title-chaser: avg tenure < 15 months across 3+ jobs") | |
| lines.append("LOCATION PREFERENCE: Noida or Pune strongly preferred; other India acceptable; " | |
| "outside India only if willing to relocate (no visa sponsorship)") | |
| lines.append("EXPERIENCE: 5-9 years preferred") | |
| lines.append("NOTICE PERIOD: Sub-30 days preferred; 30+ days raises the bar") | |
| return "\n".join(lines) | |
| def build_candidate_summary(candidate: dict) -> str: | |
| profile = candidate.get("profile", {}) or {} | |
| signals = candidate.get("redrob_signals", {}) or {} | |
| lines = [] | |
| lines.append(f"ID: {candidate.get('candidate_id', 'unknown')}") | |
| lines.append( | |
| f"Title: {profile.get('current_title', 'unknown')} " | |
| f"at {profile.get('current_company', 'unknown')}" | |
| ) | |
| lines.append(f"YOE: {profile.get('years_of_experience', 0)}") | |
| lines.append( | |
| f"Location: {profile.get('location', 'unknown')}, " | |
| f"{profile.get('country', 'unknown')}" | |
| ) | |
| skills = sorted( | |
| candidate.get("skills", []) or [], | |
| key=lambda s: s.get("duration_months", 0), | |
| reverse=True, | |
| )[:5] | |
| assessments = signals.get("skill_assessment_scores", {}) or {} | |
| skill_lines = [] | |
| for s in skills: | |
| name = s.get("name", "") | |
| prof = s.get("proficiency", "") | |
| dur = s.get("duration_months", 0) | |
| score = assessments.get(name) | |
| if score is not None: | |
| skill_lines.append(f"{name} ({prof}, {dur}mo, assessed: {score}/100)") | |
| else: | |
| skill_lines.append(f"{name} ({prof}, {dur}mo, unverified)") | |
| lines.append(f"Skills: {'; '.join(skill_lines)}") | |
| for i, role in enumerate((candidate.get("career_history", []) or [])[:3]): | |
| desc = (role.get("description") or "")[:60].replace("\n", " ") | |
| lines.append( | |
| f"Role {i+1}: {role.get('title')} @ {role.get('company')} " | |
| f"({role.get('industry')}, {role.get('company_size')}, " | |
| f"{role.get('duration_months')}mo) — {desc}..." | |
| ) | |
| lines.append(f"Notice: {signals.get('notice_period_days', 'unknown')} days") | |
| lines.append(f"Last active: {signals.get('last_active_date', 'unknown')}") | |
| lines.append(f"GitHub score: {signals.get('github_activity_score', -1)}") | |
| lines.append(f"Response rate: {signals.get('recruiter_response_rate', 'unknown')}") | |
| lines.append(f"Willing to relocate: {signals.get('willing_to_relocate', 'unknown')}") | |
| return "\n".join(lines) | |
| def _call_groq(client, model: str, prompt: str) -> Tuple[str, int, int]: | |
| response = client.chat.completions.create( | |
| model=model, | |
| max_tokens=10, | |
| messages=[{"role": "user", "content": prompt}], | |
| ) | |
| text = response.choices[0].message.content.strip().upper() | |
| return text, response.usage.prompt_tokens, response.usage.completion_tokens | |
| def _call_anthropic(client, model: str, prompt: str) -> Tuple[str, int, int]: | |
| response = client.messages.create( | |
| model=model, | |
| max_tokens=10, | |
| messages=[{"role": "user", "content": prompt}], | |
| ) | |
| text = response.content[0].text.strip().upper() | |
| return text, response.usage.input_tokens, response.usage.output_tokens | |
| def _call_cerebras(client, model: str, prompt: str) -> Tuple[str, int, int]: | |
| response = client.chat.completions.create( | |
| model=model, | |
| max_tokens=10, | |
| messages=[{"role": "user", "content": prompt}], | |
| ) | |
| text = response.choices[0].message.content.strip().upper() | |
| return text, response.usage.prompt_tokens, response.usage.completion_tokens | |
| def _call_ollama(model: str, prompt: str) -> Tuple[str, int, int]: | |
| import requests as _req | |
| try: | |
| response = _req.post( | |
| "http://localhost:11434/api/generate", | |
| json={ | |
| "model": model, | |
| "prompt": prompt, | |
| "stream": False, | |
| "options": { | |
| "temperature": 0, | |
| "num_predict": 10, | |
| "num_ctx": 2048, | |
| "num_gpu": 99, | |
| "stop": ["\n", ".", " \n"], | |
| }, | |
| }, | |
| timeout=120, | |
| ) | |
| response.raise_for_status() | |
| raw = response.json()["response"].strip().upper() | |
| if "CANDIDATE_A" in raw: | |
| return "CANDIDATE_A", 0, 0 | |
| elif "CANDIDATE_B" in raw: | |
| return "CANDIDATE_B", 0, 0 | |
| else: | |
| return "TIE", 0, 0 | |
| except _req.exceptions.ConnectionError: | |
| raise RuntimeError( | |
| "Cannot connect to Ollama at localhost:11434. " | |
| "It starts automatically on Windows after install. " | |
| "Verify with: ollama list" | |
| ) | |
| except Exception as e: | |
| raise RuntimeError(f"Ollama call failed: {e}") | |
| def get_pairwise_judgment( | |
| client, | |
| provider: str, | |
| model: str, | |
| jd_summary: str, | |
| summary_a: str, | |
| summary_b: str, | |
| pair_idx: int, | |
| ) -> Tuple[str, int, int]: | |
| prompt = f"""You are an expert technical recruiter. Read the job requirements and both candidate profiles carefully, then judge which candidate is the stronger fit. | |
| {jd_summary} | |
| --- CANDIDATE A --- | |
| {summary_a} | |
| --- CANDIDATE B --- | |
| {summary_b} | |
| Which candidate is a better fit for this specific role? | |
| Respond with EXACTLY one of these three strings and nothing else: | |
| CANDIDATE_A | |
| CANDIDATE_B | |
| TIE | |
| No explanation. No punctuation. Just the label.""" | |
| def _dispatch(): | |
| if provider == "groq": | |
| return _call_groq(client, model, prompt) | |
| elif provider == "anthropic": | |
| return _call_anthropic(client, model, prompt) | |
| elif provider == "cerebras": | |
| return _call_cerebras(client, model, prompt) | |
| elif provider == "ollama": | |
| return _call_ollama(model, prompt) | |
| else: | |
| raise ValueError(f"Unknown provider: {provider}") | |
| try: | |
| text, inp, out = _dispatch() | |
| except Exception as e: | |
| logger.warning("API error on pair %d: %s", pair_idx, e) | |
| time.sleep(5) | |
| try: | |
| text, inp, out = _dispatch() | |
| except Exception as e2: | |
| logger.warning("Retry failed on pair %d: %s — defaulting to TIE", pair_idx, e2) | |
| return "TIE", 0, 0 | |
| verdict = text if text in ("CANDIDATE_A", "CANDIDATE_B", "TIE") else "TIE" | |
| if text not in ("CANDIDATE_A", "CANDIDATE_B", "TIE"): | |
| logger.warning("Pair %d: unexpected output %r — defaulting to TIE", pair_idx, text) | |
| return verdict, inp, out | |
| def compute_elo_scores( | |
| annotations: List[dict], | |
| candidate_ids: List[str], | |
| ) -> Dict[str, float]: | |
| wins: Dict[str, float] = {cid: 0.0 for cid in candidate_ids} | |
| losses: Dict[str, float] = {cid: 0.0 for cid in candidate_ids} | |
| for ann in annotations: | |
| a, b, verdict = ann["candidate_a"], ann["candidate_b"], ann["verdict"] | |
| if verdict == "CANDIDATE_A": | |
| wins[a] += 1.0; losses[b] += 1.0 | |
| elif verdict == "CANDIDATE_B": | |
| wins[b] += 1.0; losses[a] += 1.0 | |
| else: | |
| wins[a] += 0.5; losses[a] += 0.5 | |
| wins[b] += 0.5; losses[b] += 0.5 | |
| elo: Dict[str, float] = {} | |
| for cid in candidate_ids: | |
| total = wins[cid] + losses[cid] | |
| if total == 0: | |
| elo[cid] = 1500.0 | |
| else: | |
| win_rate = (wins[cid] + 0.5) / (total + 1) | |
| elo[cid] = 400 * math.log10(win_rate / (1 - win_rate)) + 1500 | |
| return elo | |
| def elo_to_labels(elo_scores: Dict[str, float]) -> Dict[str, int]: | |
| values = sorted(elo_scores.values()) | |
| n = len(values) | |
| q75 = values[int(0.75 * n)] | |
| q50 = values[int(0.50 * n)] | |
| q25 = values[int(0.25 * n)] | |
| labels: Dict[str, int] = {} | |
| for cid, elo in elo_scores.items(): | |
| if elo >= q75: labels[cid] = 3 | |
| elif elo >= q50: labels[cid] = 2 | |
| elif elo >= q25: labels[cid] = 1 | |
| else: labels[cid] = 0 | |
| return labels | |
| def _get_top_skill(candidate: dict) -> str: | |
| skills = sorted( | |
| candidate.get("skills", []) or [], | |
| key=lambda s: s.get("duration_months", 0), | |
| reverse=True, | |
| ) | |
| return skills[0].get("name", "N/A") if skills else "N/A" | |
| def _spearman( | |
| candidate_ids: List[str], | |
| ranks_a: Dict[str, int], | |
| ranks_b: Dict[str, int], | |
| ) -> float: | |
| from scipy.stats import spearmanr | |
| common = [cid for cid in candidate_ids if cid in ranks_a and cid in ranks_b] | |
| if len(common) < 2: | |
| return 0.0 | |
| ra = [ranks_a[cid] for cid in common] | |
| rb = [ranks_b[cid] for cid in common] | |
| rho, _ = spearmanr(ra, rb) | |
| return float(rho) | |
| def print_model_comparison( | |
| stage1_candidates: Dict[str, dict], | |
| stage1_ids: List[str], | |
| bm25_scores: Dict[str, float], | |
| stage1_bm25_median: float, | |
| jd_config, | |
| old_model, | |
| new_model, | |
| feature_columns: List[str], | |
| ) -> None: | |
| from features import build_feature_vector | |
| logger.info("Building full feature matrix for comparison report...") | |
| feature_rows = [] | |
| ordered_ids = [] | |
| consistency_map: Dict[str, float] = {} | |
| for cid in stage1_ids: | |
| candidate = stage1_candidates.get(cid) | |
| if candidate is None: | |
| continue | |
| bs = bm25_scores.get(cid, 0.0) | |
| try: | |
| fv = build_feature_vector( | |
| candidate, jd_config, | |
| bm25_score=bs, | |
| stage1_bm25_median=stage1_bm25_median, | |
| ) | |
| row = [fv[col] for col in feature_columns] | |
| consistency_map[cid] = float(fv.get("consistency_score", 1.0)) | |
| except Exception as e: | |
| logger.warning("Feature extraction failed for %s: %s", cid, e) | |
| row = [0.0] * len(feature_columns) | |
| consistency_map[cid] = 1.0 | |
| feature_rows.append(row) | |
| ordered_ids.append(cid) | |
| X_full = np.array(feature_rows, dtype=np.float32) | |
| logger.info("Comparison feature matrix: shape=%s", X_full.shape) | |
| old_raw = old_model.predict(X_full) | |
| old_scores = {cid: float(s) for cid, s in zip(ordered_ids, old_raw)} | |
| old_ranked = sorted(old_scores.items(), key=lambda x: (-x[1], x[0])) | |
| old_rank_map = {cid: rank for rank, (cid, _) in enumerate(old_ranked, 1)} | |
| new_raw = new_model.predict(X_full) | |
| new_scores = { | |
| cid: float(s) * consistency_map.get(cid, 1.0) | |
| for cid, s in zip(ordered_ids, new_raw) | |
| } | |
| new_ranked = sorted(new_scores.items(), key=lambda x: (-x[1], x[0])) | |
| new_rank_map = {cid: rank for rank, (cid, _) in enumerate(new_ranked, 1)} | |
| old_top10 = [cid for cid, _ in old_ranked[:10]] | |
| new_top10 = [cid for cid, _ in new_ranked[:10]] | |
| overlap = len(set(old_top10) & set(new_top10)) | |
| top100_old = [cid for cid, _ in old_ranked[:100]] | |
| rho = _spearman(top100_old, old_rank_map, new_rank_map) | |
| moved_up: List[Tuple[str, int, int]] = [] | |
| moved_down: List[Tuple[str, int, int]] = [] | |
| for cid in ordered_ids: | |
| old_r = old_rank_map.get(cid, 9999) | |
| new_r = new_rank_map.get(cid, 9999) | |
| delta = old_r - new_r | |
| if delta >= 20: | |
| moved_up.append((cid, old_r, new_r)) | |
| elif delta <= -20: | |
| moved_down.append((cid, old_r, new_r)) | |
| moved_up.sort(key=lambda x: x[1] - x[2], reverse=True) | |
| moved_down.sort(key=lambda x: x[2] - x[1], reverse=True) | |
| new_top100 = [cid for cid, _ in new_ranked[:100]] | |
| low_cons_count = sum(1 for cid in new_top100 if consistency_map.get(cid, 1.0) < 0.25) | |
| honeypot_pass = low_cons_count < 10 | |
| print("\n" + "=" * 60) | |
| print("=== MODEL COMPARISON REPORT ===") | |
| print("=" * 60) | |
| print("\nCurrent model (heuristic labels) top-10:") | |
| for rank, cid in enumerate(old_top10, 1): | |
| c = stage1_candidates.get(cid, {}) | |
| p = c.get("profile", {}) or {} | |
| print(f" {rank:2d}. {cid} — {p.get('current_title','N/A')}, " | |
| f"{p.get('years_of_experience',0)}y, {_get_top_skill(c)}") | |
| print("\nNew model (LLM pairwise labels + consistency multiplier) top-10:") | |
| for rank, cid in enumerate(new_top10, 1): | |
| c = stage1_candidates.get(cid, {}) | |
| p = c.get("profile", {}) or {} | |
| cons = consistency_map.get(cid, 1.0) | |
| print(f" {rank:2d}. {cid} — {p.get('current_title','N/A')}, " | |
| f"{p.get('years_of_experience',0)}y, {_get_top_skill(c)}, " | |
| f"cons={cons:.2f}") | |
| print(f"\nOverlap: {overlap} of 10 top-10 candidates appear in both rankings") | |
| print(f"Spearman correlation (top-100): {rho:.3f} " | |
| f"[range: -1.0 to +1.0, higher = more agreement]") | |
| print("\nCandidates that MOVED UP 20+ positions in new model:") | |
| for cid, old_r, new_r in moved_up[:10]: | |
| c = stage1_candidates.get(cid, {}) | |
| p = c.get("profile", {}) or {} | |
| print(f" - {cid}: old={old_r}, new={new_r} | " | |
| f"{p.get('current_title','N/A')}, {_get_top_skill(c)}") | |
| print("\nCandidates that MOVED DOWN 20+ positions in new model:") | |
| for cid, old_r, new_r in moved_down[:10]: | |
| c = stage1_candidates.get(cid, {}) | |
| p = c.get("profile", {}) or {} | |
| print(f" - {cid}: old={old_r}, new={new_r} | " | |
| f"{p.get('current_title','N/A')}, {_get_top_skill(c)}") | |
| print(f"\nConsistency check — low-consistency (< 0.25) in new top-100:") | |
| print(f" Count: {low_cons_count} (must be < 10 to pass honeypot audit)") | |
| print(f" NOTE: consistency multiplier applied — this number should now be 0.") | |
| print("\n" + "=" * 60) | |
| print("=== VERDICT ===") | |
| print(f"Honeypot audit: {'PASS ✓' if honeypot_pass else 'FAIL ✗'}") | |
| print(f"Top-10 overlap with current: {overlap}/10") | |
| print(f"Spearman correlation: {rho:.3f}") | |
| if honeypot_pass and rho > 0.4: | |
| rec = "PROMISING — consider swapping model" | |
| elif honeypot_pass and rho <= 0.4: | |
| rec = "MIXED — honeypot passes but ranking diverges significantly; review movers" | |
| else: | |
| rec = "RISKY — honeypot audit fails; do not swap without further investigation" | |
| print(f"Recommendation: {rec}") | |
| print("=" * 60 + "\n") | |
| if not honeypot_pass: | |
| logger.error( | |
| "HONEYPOT AUDIT FAILED: %d low-consistency candidates in new top-100. " | |
| "The consistency multiplier should have fixed this — check that " | |
| "consistency_score is being computed correctly for these candidates.", | |
| low_cons_count, | |
| ) | |
| else: | |
| logger.info("Honeypot audit PASSED: %d low-consistency in new top 100.", low_cons_count) | |
| def main() -> None: | |
| parser = argparse.ArgumentParser( | |
| description=( | |
| "Offline pairwise LLM annotation experiment. " | |
| "If lgbm_model_llm.pkl already exists, runs Step 11 comparison only. " | |
| "NEVER imported by rank.py or any production module." | |
| ), | |
| formatter_class=argparse.ArgumentDefaultsHelpFormatter, | |
| ) | |
| parser.add_argument("--candidates", required=True) | |
| parser.add_argument("--base-dir", required=True) | |
| parser.add_argument( | |
| "--provider", | |
| default="ollama", | |
| choices=["groq", "anthropic", "ollama", "cerebras"], | |
| ) | |
| parser.add_argument("--model", default=None) | |
| parser.add_argument("--api-key", default=None) | |
| parser.add_argument("--ollama-url", default="http://localhost:11434") | |
| args = parser.parse_args() | |
| provider = args.provider | |
| model = args.model or _DEFAULT_MODELS[provider] | |
| call_sleep = _PROVIDER_SLEEP[provider] | |
| price_in, price_out = _PROVIDER_PRICE[provider] | |
| if provider in ("groq", "anthropic", "cerebras") and not args.api_key: | |
| logger.error("--api-key is required when --provider is %s", provider) | |
| sys.exit(1) | |
| if provider == "ollama" and args.api_key: | |
| logger.info("--api-key ignored for ollama provider") | |
| base_dir = os.path.abspath(args.base_dir) | |
| candidates_path = os.path.abspath(args.candidates) | |
| precomputed_dir = os.path.join(base_dir, "precomputed") | |
| data_dir = os.path.join(base_dir, "data") | |
| annotations_path = os.path.join(_EXP_DIR, "annotations.jsonl") | |
| new_model_path = os.path.join(precomputed_dir, "lgbm_model_llm.pkl") | |
| old_model_path = os.path.join(precomputed_dir, "lgbm_model.pkl") | |
| logger.info("=" * 60) | |
| logger.info("PAIRWISE LLM ANNOTATION EXPERIMENT") | |
| logger.info("Provider: %s | Model: %s", provider, model) | |
| logger.info("Rate limit sleep: %.1fs between calls", call_sleep) | |
| logger.info("Cost: %s", "FREE" if price_in == 0 else f"${price_in}/M input, ${price_out}/M output") | |
| logger.info("Base dir: %s", base_dir) | |
| logger.info("Annotations: %s", annotations_path) | |
| logger.info("New model → %s", new_model_path) | |
| logger.info("Old model %s (will NOT be touched)", old_model_path) | |
| logger.info("=" * 60) | |
| if provider == "ollama": | |
| import requests as _req | |
| try: | |
| r = _req.get("http://localhost:11434/api/tags", timeout=5) | |
| r.raise_for_status() | |
| available = [m["name"] for m in r.json().get("models", [])] | |
| found = ( | |
| model in available | |
| or model.split(":")[0] in [m.split(":")[0] for m in available] | |
| ) | |
| if not found: | |
| logger.error( | |
| "Model '%s' not in Ollama. Available: %s. " | |
| "Pull it: ollama pull %s", model, available, model | |
| ) | |
| sys.exit(1) | |
| logger.info("Ollama reachable. Model '%s' available.", model) | |
| except _req.exceptions.ConnectionError: | |
| logger.error( | |
| "Ollama not running at localhost:11434. " | |
| "On Windows it auto-starts after install — " | |
| "check Task Manager for 'ollama' process." | |
| ) | |
| sys.exit(1) | |
| from features import build_feature_vector, FEATURE_COLUMNS | |
| from features import ( | |
| c1_timeline_impossibility, c2_signup_anomaly, | |
| c3_salary_inversion, c4_assessment_contradiction, | |
| c5_engagement_mismatch, | |
| ) | |
| from jd_parser import parse_jd | |
| from retrieval import load_numpy_bm25_artifacts, run_dual_pass_retrieval | |
| logger.info("STEP 1: Loading Stage 1 candidate pool...") | |
| bm25 = load_numpy_bm25_artifacts(precomputed_dir) | |
| if bm25 is None: | |
| bm25_path = os.path.join(precomputed_dir, "bm25_index.pkl") | |
| if not os.path.isfile(bm25_path): | |
| logger.error("Missing bm25_index.pkl — run precompute.py first.") | |
| sys.exit(1) | |
| with open(bm25_path, "rb") as f: | |
| bm25 = pickle.load(f) | |
| logger.info("Loaded legacy BM25Okapi") | |
| else: | |
| logger.info("Loaded NumpyBM25 (fast path)") | |
| ids_path = os.path.join(precomputed_dir, "candidate_ids.pkl") | |
| with open(ids_path, "rb") as f: | |
| all_candidate_ids = pickle.load(f) | |
| aliases_path = os.path.join(data_dir, "skill_aliases.json") | |
| jd_config = parse_jd(aliases_path) | |
| logger.info( | |
| "JD config: %d hard reqs, %d preferred reqs", | |
| len(jd_config.hard_requirements), len(jd_config.preferred_requirements), | |
| ) | |
| stage1_ids, bm25_scores = run_dual_pass_retrieval(bm25, all_candidate_ids, jd_config) | |
| stage1_bm25_median = float(np.median(list(bm25_scores.values()))) | |
| logger.info("Stage 1 pool: %d candidates, median BM25=%.4f", len(stage1_ids), stage1_bm25_median) | |
| offsets_path = os.path.join(precomputed_dir, "candidate_offsets.pkl") | |
| stage1_candidate_list: List[dict] = [] | |
| if os.path.isfile(offsets_path): | |
| with open(offsets_path, "rb") as f: | |
| candidate_offsets = pickle.load(f) | |
| logger.info("Loading Stage 1 records via byte-offset index...") | |
| with open(candidates_path, "rb") as f: | |
| for cid in stage1_ids: | |
| offset = candidate_offsets.get(cid) | |
| if offset is None: | |
| continue | |
| f.seek(offset) | |
| raw = f.readline() | |
| try: | |
| c = json.loads(raw.decode("utf-8", errors="ignore").strip()) | |
| stage1_candidate_list.append(c) | |
| except json.JSONDecodeError: | |
| pass | |
| else: | |
| logger.info("No offset index — streaming JSONL (slow)...") | |
| stage1_id_set = set(stage1_ids) | |
| found: Dict[str, dict] = {} | |
| with open(candidates_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| c = json.loads(line) | |
| except json.JSONDecodeError: | |
| continue | |
| cid = c.get("candidate_id") | |
| if cid and cid in stage1_id_set: | |
| found[cid] = c | |
| if len(found) == len(stage1_id_set): | |
| break | |
| stage1_candidate_list = [found[cid] for cid in stage1_ids if cid in found] | |
| stage1_candidates: Dict[str, dict] = { | |
| c.get("candidate_id"): c | |
| for c in stage1_candidate_list | |
| if c.get("candidate_id") | |
| } | |
| logger.info("Stage 1 records loaded: %d candidates", len(stage1_candidates)) | |
| model_already_exists = os.path.isfile(new_model_path) | |
| annots_already_exist = os.path.isfile(annotations_path) | |
| if model_already_exists and annots_already_exist: | |
| logger.info( | |
| "lgbm_model_llm.pkl and annotations.jsonl both exist — " | |
| "skipping Steps 2-10, running Step 11 comparison only." | |
| ) | |
| with open(old_model_path, "rb") as f: | |
| old_model = pickle.load(f) | |
| with open(new_model_path, "rb") as f: | |
| new_model = pickle.load(f) | |
| logger.info("STEP 11: Generating model comparison report...") | |
| print_model_comparison( | |
| stage1_candidates=stage1_candidates, | |
| stage1_ids=stage1_ids, | |
| bm25_scores=bm25_scores, | |
| stage1_bm25_median=stage1_bm25_median, | |
| jd_config=jd_config, | |
| old_model=old_model, | |
| new_model=new_model, | |
| feature_columns=FEATURE_COLUMNS, | |
| ) | |
| logger.info("=" * 60) | |
| logger.info("EXPERIMENT COMPLETE") | |
| logger.info("New model: %s", new_model_path) | |
| logger.info( | |
| "To swap into production: copy %s %s", | |
| new_model_path, old_model_path, | |
| ) | |
| logger.info("(Manual, deliberate action only — verify top-10 first)") | |
| logger.info("=" * 60) | |
| return | |
| logger.info("STEP 2: Stratified sampling of 500 candidates...") | |
| random.seed(42) | |
| from features import build_feature_vector | |
| import lightgbm as lgb | |
| with open(old_model_path, "rb") as f: | |
| old_model_for_ranking = pickle.load(f) | |
| logger.info("Computing feature vectors for all Stage 1 candidates...") | |
| all_feature_rows = [] | |
| all_fv_ids = [] | |
| consistency_scores_all: Dict[str, float] = {} | |
| for cid in stage1_ids: | |
| candidate = stage1_candidates.get(cid) | |
| if candidate is None: | |
| continue | |
| bs = bm25_scores.get(cid, 0.0) | |
| try: | |
| fv = build_feature_vector(candidate, jd_config, bm25_score=bs, stage1_bm25_median=stage1_bm25_median) | |
| row = [fv[col] for col in FEATURE_COLUMNS] | |
| consistency_scores_all[cid] = float(fv.get("consistency_score", 1.0)) | |
| except Exception: | |
| row = [0.0] * len(FEATURE_COLUMNS) | |
| consistency_scores_all[cid] = 1.0 | |
| all_feature_rows.append(row) | |
| all_fv_ids.append(cid) | |
| X_all = np.array(all_feature_rows, dtype=np.float32) | |
| logger.info("Feature matrix (Stage 1): shape=%s", X_all.shape) | |
| raw_scores = old_model_for_ranking.predict(X_all) | |
| lgbm_ranked = sorted(zip(all_fv_ids, raw_scores), key=lambda x: -x[1]) | |
| lgbm_rank_map = {cid: rank for rank, (cid, _) in enumerate(lgbm_ranked, 1)} | |
| TOTAL = 500 | |
| N_A, N_B, N_C = 75, 100, 325 | |
| MIN_LOW_CONS = 25 | |
| top100_cids = [cid for cid, _ in lgbm_ranked[:100]] | |
| ranks_101_300 = [cid for cid, _ in lgbm_ranked[100:300]] | |
| ranks_301_plus = [cid for cid, _ in lgbm_ranked[300:]] | |
| stratum_a = random.sample(top100_cids, min(N_A, len(top100_cids))) | |
| stratum_b = random.sample(ranks_101_300, min(N_B, len(ranks_101_300))) | |
| low_cons_pool = [cid for cid in ranks_301_plus if consistency_scores_all.get(cid, 1.0) < 0.5] | |
| guaranteed_low = random.sample(low_cons_pool, min(MIN_LOW_CONS, len(low_cons_pool))) | |
| remaining_c = [cid for cid in ranks_301_plus if cid not in guaranteed_low] | |
| fill_c = random.sample(remaining_c, max(0, N_C - len(guaranteed_low))) | |
| stratum_c = guaranteed_low + fill_c | |
| sample_ids = list(dict.fromkeys(stratum_a + stratum_b + stratum_c))[:TOTAL] | |
| logger.info( | |
| "Stratum sizes: A=%d (top-50 + 25 from 51-150), B=%d (51-150), C=%d (151+)", | |
| len(stratum_a), len(stratum_b), len(stratum_c), | |
| ) | |
| logger.info("Low-consistency guaranteed in Stratum C: %d (target: ≥%d)", | |
| len(guaranteed_low), MIN_LOW_CONS) | |
| logger.info("Total sample pool: %d candidates", len(sample_ids)) | |
| logger.info("STEP 3: Generating pairwise matchups (5 opponents per candidate)...") | |
| N_OPPONENTS = 5 | |
| seen_pairs: set = set() | |
| pairs: List[Tuple[str, str]] = [] | |
| for cid_a in sample_ids: | |
| pool = [c for c in sample_ids if c != cid_a] | |
| random.shuffle(pool) | |
| count = 0 | |
| for cid_b in pool: | |
| key = frozenset({cid_a, cid_b}) | |
| if key not in seen_pairs and count < N_OPPONENTS: | |
| seen_pairs.add(key) | |
| pairs.append((cid_a, cid_b)) | |
| count += 1 | |
| logger.info("Unique pairs generated: %d", len(pairs)) | |
| logger.info("STEP 6: Annotating pairs with %s (%s)...", provider, model) | |
| existing_annotations: List[dict] = [] | |
| existing_pair_keys: set = set() | |
| if os.path.isfile(annotations_path): | |
| logger.info("Found existing annotations file — loading for resumability...") | |
| with open(annotations_path, "r", encoding="utf-8") as f: | |
| for line in f: | |
| line = line.strip() | |
| if not line: | |
| continue | |
| try: | |
| ann = json.loads(line) | |
| existing_annotations.append(ann) | |
| existing_pair_keys.add(frozenset({ann["candidate_a"], ann["candidate_b"]})) | |
| except json.JSONDecodeError: | |
| pass | |
| logger.info("Loaded %d existing annotations (will skip these pairs)", len(existing_annotations)) | |
| remaining_pairs = [(a, b) for a, b in pairs if frozenset({a, b}) not in existing_pair_keys] | |
| logger.info("Pairs remaining to annotate: %d of %d", len(remaining_pairs), len(pairs)) | |
| jd_summary = build_jd_summary(jd_config) | |
| client = None | |
| if provider == "groq": | |
| from groq import Groq | |
| client = Groq(api_key=args.api_key) | |
| logger.info("Groq client initialized (model: %s)", model) | |
| elif provider == "anthropic": | |
| import anthropic as _anthropic | |
| client = _anthropic.Anthropic(api_key=args.api_key) | |
| logger.info("Anthropic client initialized (model: %s)", model) | |
| elif provider == "cerebras": | |
| from cerebras.cloud.sdk import Cerebras | |
| client = Cerebras(api_key=args.api_key) | |
| logger.info("Cerebras client initialized (model: %s)", model) | |
| else: | |
| logger.info("Ollama provider: calls go directly to localhost:11434 via requests") | |
| logger.info("Running 5-call timing probe for Ollama...") | |
| probe_pairs = remaining_pairs[:5] if len(remaining_pairs) >= 5 else pairs[:5] | |
| probe_times = [] | |
| probe_inp = [] | |
| probe_out = [] | |
| for i, (a, b) in enumerate(probe_pairs): | |
| sa = build_candidate_summary(stage1_candidates.get(a, {"candidate_id": a})) | |
| sb = build_candidate_summary(stage1_candidates.get(b, {"candidate_id": b})) | |
| t0 = time.time() | |
| _, inp, out = get_pairwise_judgment(client, provider, model, jd_summary, sa, sb, i) | |
| elapsed = time.time() - t0 | |
| probe_times.append(elapsed) | |
| probe_inp.append(inp) | |
| probe_out.append(out) | |
| time.sleep(call_sleep) | |
| avg_secs = sum(probe_times) / len(probe_times) | |
| avg_inp = sum(probe_inp) / len(probe_inp) | |
| avg_out = sum(probe_out) / len(probe_out) | |
| est_min = (avg_secs + call_sleep) * len(remaining_pairs) / 60 | |
| est_cost = (avg_inp * len(remaining_pairs) / 1e6 * price_in + | |
| avg_out * len(remaining_pairs) / 1e6 * price_out) | |
| print("\n" + "=" * 50) | |
| print("=== RUN ESTIMATE ===") | |
| print(f"Provider: {provider} ({model})") | |
| print(f"Pairs to annotate: {len(remaining_pairs)}") | |
| if provider in ("groq", "anthropic", "cerebras"): | |
| print(f"Avg input tokens per call: {avg_inp:.0f}") | |
| else: | |
| print(f"Avg seconds per call: {avg_secs:.1f}s") | |
| print(f"Estimated cost: {'FREE' if est_cost == 0 else f'${est_cost:.2f}'}") | |
| print(f"Estimated time: ~{est_min:.0f} min ({est_min/60:.1f} hrs)") | |
| if provider == "ollama": | |
| print(f"GPU acceleration: {'YES' if avg_secs < 2.0 else 'NO — running on CPU (slow)'}") | |
| print("=" * 50) | |
| confirm = input("Proceed with full run? (yes/no): ").strip().lower() | |
| if confirm != "yes": | |
| logger.info("User declined — exiting. Run again to resume.") | |
| sys.exit(0) | |
| logger.info("Starting full annotation run (%d pairs remaining)...", len(remaining_pairs)) | |
| total_inp = 0 | |
| total_out = 0 | |
| annot_file = open(annotations_path, "a", encoding="utf-8") | |
| try: | |
| for idx, (cid_a, cid_b) in enumerate(remaining_pairs): | |
| sa = build_candidate_summary(stage1_candidates.get(cid_a, {"candidate_id": cid_a})) | |
| sb = build_candidate_summary(stage1_candidates.get(cid_b, {"candidate_id": cid_b})) | |
| verdict, inp, out = get_pairwise_judgment( | |
| client, provider, model, jd_summary, sa, sb, idx | |
| ) | |
| total_inp += inp | |
| total_out += out | |
| record = { | |
| "pair_id": idx, | |
| "candidate_a": cid_a, | |
| "candidate_b": cid_b, | |
| "verdict": verdict, | |
| "input_tokens": inp, | |
| "output_tokens": out, | |
| } | |
| annot_file.write(json.dumps(record) + "\n") | |
| annot_file.flush() | |
| existing_annotations.append(record) | |
| time.sleep(call_sleep) | |
| if (idx + 1) % 100 == 0: | |
| cost_so_far = (total_inp / 1e6 * price_in + total_out / 1e6 * price_out) | |
| logger.info( | |
| "Progress: %d/%d pairs | cost: $%.2f | elapsed: ~%d min", | |
| idx + 1, len(remaining_pairs), cost_so_far, int((idx+1)*(avg_secs+call_sleep)/60) | |
| ) | |
| except KeyboardInterrupt: | |
| logger.info("") | |
| logger.info("=" * 60) | |
| logger.info("INTERRUPTED by user (Ctrl+C) — progress saved cleanly.") | |
| logger.info("Pairs completed so far: %d", len(existing_annotations)) | |
| logger.info("Annotations file: %s", annotations_path) | |
| logger.info("Re-run the same command to resume from pair %d.", len(existing_annotations)) | |
| logger.info("=" * 60) | |
| annot_file.close() | |
| sys.exit(0) | |
| finally: | |
| annot_file.close() | |
| actual_cost = total_inp / 1e6 * price_in + total_out / 1e6 * price_out | |
| logger.info( | |
| "Annotation complete. Total tokens: %d input / %d output. " | |
| "Actual total cost: $%.2f", | |
| total_inp, total_out, actual_cost, | |
| ) | |
| logger.info("STEP 7: Computing Elo scores from pairwise verdicts...") | |
| elo_scores = compute_elo_scores(existing_annotations, sample_ids) | |
| elo_vals = list(elo_scores.values()) | |
| logger.info( | |
| "Elo distribution: min=%.1f, max=%.1f, mean=%.1f, std=%.1f", | |
| min(elo_vals), max(elo_vals), | |
| sum(elo_vals)/len(elo_vals), | |
| float(np.std(elo_vals)), | |
| ) | |
| winners = sum(1 for v in elo_vals if v > 1500) | |
| logger.info("Elo above 1500 (winners): %d | at/below 1500 (losers): %d", | |
| winners, len(elo_vals) - winners) | |
| logger.info("STEP 8: Converting Elo scores to 0-3 relevance labels...") | |
| labels = elo_to_labels(elo_scores) | |
| dist = {0: 0, 1: 0, 2: 0, 3: 0} | |
| for v in labels.values(): | |
| dist[v] += 1 | |
| logger.info("Label distribution: 0=%d, 1=%d, 2=%d, 3=%d", | |
| dist[0], dist[1], dist[2], dist[3]) | |
| if dist[3] < 30: | |
| logger.warning( | |
| "Only %d candidates with label 3 — training signal may be sparse.", dist[3] | |
| ) | |
| logger.info("STEP 9: Extracting feature matrix for %d annotated candidates...", len(sample_ids)) | |
| train_rows = [] | |
| train_ids = [] | |
| for cid in sample_ids: | |
| candidate = stage1_candidates.get(cid) | |
| if candidate is None: | |
| continue | |
| bs = bm25_scores.get(cid, 0.0) | |
| try: | |
| fv = build_feature_vector(candidate, jd_config, bm25_score=bs, stage1_bm25_median=stage1_bm25_median) | |
| row = [fv[col] for col in FEATURE_COLUMNS] | |
| except Exception as e: | |
| logger.warning("Feature extraction failed for %s: %s", cid, e) | |
| row = [0.0] * len(FEATURE_COLUMNS) | |
| train_rows.append(row) | |
| train_ids.append(cid) | |
| X_train_full = np.array(train_rows, dtype=np.float32) | |
| y_full = np.array([labels.get(cid, 0) for cid in train_ids], dtype=np.int32) | |
| logger.info("Feature matrix (%d): shape=%s", len(sample_ids), X_train_full.shape) | |
| logger.info("STEP 10: Training LightGBM on LLM pairwise labels...") | |
| random.seed(42) | |
| n_val = int(len(train_ids) * 0.2) | |
| perm = list(range(len(train_ids))) | |
| random.shuffle(perm) | |
| val_idx = perm[:n_val] | |
| train_idx = perm[n_val:] | |
| X_tr = X_train_full[train_idx] | |
| y_tr = y_full[train_idx] | |
| X_vl = X_train_full[val_idx] | |
| y_vl = y_full[val_idx] | |
| logger.info("Train/val split: %d train, %d val", len(train_idx), len(val_idx)) | |
| dist_tr = {k: int((y_tr == k).sum()) for k in [0,1,2,3]} | |
| logger.info("Train label distribution: 0=%d, 1=%d, 2=%d, 3=%d", *[dist_tr[k] for k in [0,1,2,3]]) | |
| train_ds = lgb.Dataset(X_tr, label=y_tr, group=[len(train_idx)], feature_name=FEATURE_COLUMNS) | |
| val_ds = lgb.Dataset(X_vl, label=y_vl, group=[len(val_idx)], feature_name=FEATURE_COLUMNS, reference=train_ds) | |
| params = { | |
| "objective": "lambdarank", | |
| "metric": "ndcg", | |
| "eval_at": [5, 10, 50], | |
| "num_leaves": 63, | |
| "learning_rate": 0.05, | |
| "min_child_samples": 20, | |
| "subsample": 0.8, | |
| "colsample_bytree": 0.8, | |
| "random_state": 42, | |
| "n_jobs": -1, | |
| "verbose": -1, | |
| } | |
| t0 = time.time() | |
| new_model = lgb.train( | |
| params, | |
| train_ds, | |
| num_boost_round=300, | |
| valid_sets=[val_ds], | |
| callbacks=[ | |
| lgb.early_stopping(stopping_rounds=30, verbose=False), | |
| lgb.log_evaluation(period=50), | |
| ], | |
| ) | |
| logger.info("LightGBM training complete in %.1fs", time.time() - t0) | |
| importances = sorted( | |
| zip(FEATURE_COLUMNS, new_model.feature_importance(importance_type="gain")), | |
| key=lambda x: x[1], reverse=True, | |
| ) | |
| logger.info("Top 5 feature importances (gain):") | |
| for fname, imp in importances[:5]: | |
| logger.info(" %s: %.2f", fname, imp) | |
| with open(new_model_path, "wb") as f: | |
| pickle.dump(new_model, f) | |
| logger.info("New model saved to: %s", new_model_path) | |
| logger.info("lgbm_model.pkl untouched: %s", old_model_path) | |
| logger.info("STEP 11: Generating model comparison report...") | |
| with open(old_model_path, "rb") as f: | |
| old_model_final = pickle.load(f) | |
| print_model_comparison( | |
| stage1_candidates=stage1_candidates, | |
| stage1_ids=stage1_ids, | |
| bm25_scores=bm25_scores, | |
| stage1_bm25_median=stage1_bm25_median, | |
| jd_config=jd_config, | |
| old_model=old_model_final, | |
| new_model=new_model, | |
| feature_columns=FEATURE_COLUMNS, | |
| ) | |
| logger.info("=" * 60) | |
| logger.info("EXPERIMENT COMPLETE") | |
| logger.info("Annotations: %s", annotations_path) | |
| logger.info("New model: %s", new_model_path) | |
| logger.info( | |
| "To swap into production: copy %s %s (manual, deliberate action only)", | |
| new_model_path, old_model_path, | |
| ) | |
| logger.info("=" * 60) | |
| if __name__ == "__main__": | |
| main() | |